diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-07 17:02:19 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-09 05:15:13 (GMT) |
commit | 41cb9e831db3d66973292fbdb4c13fb658ac9f59 (patch) | |
tree | dec6b37654de3f0d6d12dca6c413b2890cd8ef9f /sugar_network | |
parent | 90f74541ec4925bad47466e39517c22ff7eadfe4 (diff) |
Fix node synchronization; remove usage stats, it should be standalone project/process
Diffstat (limited to 'sugar_network')
-rw-r--r-- | sugar_network/db/blobs.py | 15 | ||||
-rw-r--r-- | sugar_network/db/resource.py | 7 | ||||
-rw-r--r-- | sugar_network/db/volume.py | 38 | ||||
-rw-r--r-- | sugar_network/node/__init__.py | 25 | ||||
-rw-r--r-- | sugar_network/node/downloads.py | 128 | ||||
-rw-r--r-- | sugar_network/node/files.py | 183 | ||||
-rw-r--r-- | sugar_network/node/master.py | 288 | ||||
-rw-r--r-- | sugar_network/node/model.py | 67 | ||||
-rw-r--r-- | sugar_network/node/routes.py | 89 | ||||
-rw-r--r-- | sugar_network/node/slave.py | 203 | ||||
-rw-r--r-- | sugar_network/node/stats_user.py | 127 | ||||
-rw-r--r-- | sugar_network/toolkit/__init__.py | 2 | ||||
-rw-r--r-- | sugar_network/toolkit/http.py | 16 | ||||
-rw-r--r-- | sugar_network/toolkit/rrd.py | 296 |
14 files changed, 245 insertions, 1239 deletions
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index cd795c6..52bc324 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -18,7 +18,7 @@ import logging import hashlib import mimetypes from contextlib import contextmanager -from os.path import exists, abspath, join, dirname +from os.path import exists, abspath, join, dirname, isdir from sugar_network import toolkit from sugar_network.toolkit.router import File @@ -104,10 +104,16 @@ class Blobs(object): path = self.path(digest) if exists(path + _META_SUFFIX): return File(path, digest, _read_meta(path)) + elif isdir(path): + return _lsdir(path, digest) def delete(self, path): self._delete(path, None) + def populate(self, path=None, recursive=True): + for __ in self.diff([[1, None]], path or '', recursive): + pass + def diff(self, r, path=None, recursive=True): if path is None: is_files = False @@ -228,3 +234,10 @@ def _read_meta(path): key, value = line.split(':', 1) meta[key] = value.strip() return meta + + +def _lsdir(root, rel_root): + for filename in os.listdir(root): + path = join(root, filename) + if exists(path + _META_SUFFIX): + yield File(path, join(rel_root, filename), _read_meta(path)) diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py index 71a3efd..9d94929 100644 --- a/sugar_network/db/resource.py +++ b/sugar_network/db/resource.py @@ -123,7 +123,6 @@ class Resource(object): def diff(self, r): patch = {} - last_seqno = None for name, prop in self.metadata.items(): if name == 'seqno' or prop.acl & ACL.CALC: continue @@ -133,16 +132,16 @@ class Resource(object): seqno = meta.get('seqno') if not ranges.contains(r, seqno): continue - last_seqno = max(seqno, last_seqno) value = meta.get('value') if isinstance(prop, Aggregated): value_ = {} for key, agg in value.items(): - if ranges.contains(r, agg.pop('seqno')): + agg_seqno = agg.pop('seqno') + if ranges.contains(r, agg_seqno): value_[key] = agg value = value_ patch[name] = {'mtime': meta['mtime'], 'value': value} - return last_seqno, patch + return patch def format_patch(self, props): if not props: diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py index 5ec5683..5d9bac1 100644 --- a/sugar_network/db/volume.py +++ b/sugar_network/db/volume.py @@ -76,8 +76,15 @@ class Volume(dict): for __ in cls.populate(): coroutine.dispatch() - def diff(self, r, files=None, one_way=False): + def diff(self, r, exclude=None, files=None, one_way=False): + if exclude: + include = deepcopy(r) + ranges.exclude(include, exclude) + else: + include = r last_seqno = None + found = False + try: for resource, directory in self.items(): if one_way and directory.resource.one_way: @@ -90,33 +97,34 @@ class Volume(dict): query += str(end) docs, __ = directory.find(query=query, order_by='seqno') for doc in docs: - seqno, patch = doc.diff(r) - if not patch: - continue - yield {'guid': doc.guid, 'patch': patch} - last_seqno = max(last_seqno, seqno) - for blob in self.blobs.diff(r): + patch = doc.diff(include) + if patch: + yield {'guid': doc.guid, 'patch': patch} + found = True + last_seqno = max(last_seqno, doc['seqno']) + for blob in self.blobs.diff(include): seqno = int(blob.pop('x-seqno')) yield blob + found = True last_seqno = max(last_seqno, seqno) for dirpath in files or []: - for blob in self.blobs.diff(r, dirpath): + for blob in self.blobs.diff(include, dirpath): seqno = int(blob.pop('x-seqno')) yield blob + found = True last_seqno = max(last_seqno, seqno) except StopIteration: pass - if last_seqno: - commit_r = deepcopy(r) + if found: + commit_r = include if exclude else deepcopy(r) ranges.exclude(commit_r, last_seqno + 1, None) ranges.exclude(r, None, last_seqno) yield {'commit': commit_r} def patch(self, records): directory = None - commit_r = [] - merged_r = [] + committed = [] seqno = None for record in records: @@ -137,12 +145,10 @@ class Volume(dict): commit = record.get('commit') if commit is not None: - ranges.include(commit_r, commit) + ranges.include(committed, commit) continue - if seqno is not None: - ranges.include(merged_r, seqno, seqno) - return commit_r, merged_r + return seqno, committed def __enter__(self): return self diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py index a4360b4..14d675c 100644 --- a/sugar_network/node/__init__.py +++ b/sugar_network/node/__init__.py @@ -34,28 +34,17 @@ certfile = Option( name='certfile') data_root = Option( - 'path to a directory to place server data', + 'path to a directory to place node data', default='/var/lib/sugar-network', name='data_root') find_limit = Option( 'limit the resulting list for search requests', default=64, type_cast=int, name='find-limit') -stats_root = Option( - 'path to the root directory for placing stats', - default='/var/lib/sugar-network/stats', name='stats_root') +mode = Option( + 'node running mode, should be one of "slave", "proxy", or, "master"', + default='slave') -files_root = Option( - 'path to a directory to keep files synchronized between nodes', - default='/var/lib/sugar-network/files', name='files_root') - -pull_timeout = Option( - 'delay in seconds to return to sync-pull requester to wait until ' - 'pull request will be ready', - default=30, type_cast=int) - -sync_layers = Option( - 'comma separated list of layers to restrict Sugar Network ' - 'synchronization content', - default=[], type_cast=Option.list_cast, - type_repr=Option.list_repr, name='sync-layers') +master_api = Option( + 'master API url either to connect to (for slave or proxy nodes), or,' + 'to provide from (for master nodes)') diff --git a/sugar_network/node/downloads.py b/sugar_network/node/downloads.py deleted file mode 100644 index b7156bc..0000000 --- a/sugar_network/node/downloads.py +++ /dev/null @@ -1,128 +0,0 @@ -# Copyright (C) 2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -"""Persistent pool with temporary files prepared to download.""" - -import os -import json -import hashlib -import logging -from glob import glob -from os.path import join, splitext, exists - -from sugar_network.toolkit import pylru, coroutine, exception - - -# Maximum numer of postponed pulls master can handle at the same time -_POOL_SIZE = 256 -_TAG_SUFFIX = '.tag' - -_logger = logging.getLogger('node.downloads') - - -class Pool(object): - - def __init__(self, root): - self._pool = pylru.lrucache(_POOL_SIZE, lambda __, dl: dl.pop()) - if not exists(root): - os.makedirs(root) - self._root = root - - for tag_path in glob(join(root, '*.tag')): - path, __ = splitext(tag_path) - if exists(path): - try: - with file(tag_path) as f: - key, tag = json.load(f) - pool_key = json.dumps(key) - self._pool[pool_key] = _Download(key, tag, path) - continue - except Exception: - exception('Cannot open %r download, recreate', tag_path) - os.unlink(path) - os.unlink(tag_path) - - def get(self, key): - key = json.dumps(key) - if key in self._pool: - return self._pool[key] - - def set(self, key, tag, fetcher, *args, **kwargs): - pool_key = json.dumps(key) - path = join(self._root, hashlib.md5(pool_key).hexdigest()) - - def do_fetch(): - try: - complete = fetcher(*args, path=path, **kwargs) - except Exception: - exception('Error while fetching %r', self) - if exists(path): - os.unlink(path) - return True - with file(path + _TAG_SUFFIX, 'w') as f: - json.dump([key, tag], f) - return complete - - job = coroutine.spawn(do_fetch) - dl = self._pool[pool_key] = _Download(key, tag, path, job) - return dl - - def remove(self, key): - key = json.dumps(key) - if key in self._pool: - self._pool.peek(key).pop() - del self._pool[key] - - -class _Download(dict): - - def __init__(self, key, tag, path, job=None): - self.tag = tag - self._key = key - self._path = path - self._job = job - - def __repr__(self): - return '<Download %r path=%r>' % (self._key, self._path) - - @property - def ready(self): - # pylint: disable-msg=E1101 - return self._job is None or self._job.dead - - @property - def complete(self): - return self._job is not None and self._job.value - - @property - def length(self): - if exists(self._path): - return os.stat(self._path).st_size - - def open(self): - if exists(self._path): - return file(self._path, 'rb') - - def pop(self): - if self._job is not None and not self._job.dead: - _logger.debug('Abort fetching %r', self) - self._job.kill() - - if exists(self._path): - os.unlink(self._path) - if exists(self._path + _TAG_SUFFIX): - os.unlink(self._path + _TAG_SUFFIX) - - _logger.debug('Throw out %r from the pool', self) diff --git a/sugar_network/node/files.py b/sugar_network/node/files.py deleted file mode 100644 index 4fb64ca..0000000 --- a/sugar_network/node/files.py +++ /dev/null @@ -1,183 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import json -import logging -from bisect import bisect_left -from shutil import copyfileobj -from os.path import join, exists, relpath, lexists, dirname - -from sugar_network import toolkit -from sugar_network.toolkit import coroutine - - -_logger = logging.getLogger('node.sync_files') - - -def merge(files_path, packet): - files_path = files_path.rstrip(os.sep) - if not exists(files_path): - os.makedirs(files_path) - commit_seq = None - - for record in packet: - op = record.get('op') - if op == 'update': - path = join(files_path, record['path']) - if not exists(dirname(path)): - os.makedirs(dirname(path)) - with toolkit.new_file(path) as f: - copyfileobj(record['blob'], f) - elif op == 'delete': - path = join(files_path, record['path']) - if lexists(path): - os.unlink(path) - elif op == 'commit': - commit_seq = record['sequence'] - - return commit_seq - - -class Index(object): - - def __init__(self, files_path, index_path, seqno): - self._files_path = files_path.rstrip(os.sep) - self._index_path = index_path - self._seqno = seqno - self._index = [] - self._stamp = 0 - self._mutex = coroutine.Lock() - - if exists(self._index_path): - with file(self._index_path) as f: - self._index, self._stamp = json.load(f) - - if not exists(self._files_path): - os.makedirs(self._files_path) - - def sync(self): - with self._mutex: - return self._sync() - - def diff(self, in_seq, out_seq=None, **kwargs): - if out_seq is None: - out_seq = toolkit.Sequence([]) - is_initial_diff = not out_seq - - # Below calls will trigger coroutine switches, thius, - # avoid changing `self._index` by different coroutines. - with self._mutex: - self._sync() - - _logger.debug('Start sync: in_seq=%r', in_seq) - - files = 0 - deleted = 0 - pos = 0 - - try: - for start, end in in_seq: - pos = bisect_left(self._index, [start, None, None], pos) - for pos, (seqno, path, mtime) in \ - enumerate(self._index[pos:]): - if end is not None and seqno > end: - break - coroutine.dispatch() - if mtime < 0: - yield {'op': 'delete', 'path': path} - deleted += 1 - else: - blob_path = join(self._files_path, path) - yield {'op': 'update', - 'path': path, - 'blob_size': os.stat(blob_path).st_size, - 'blob': toolkit.iter_file(blob_path), - } - out_seq.include(start, seqno) - start = seqno - files += 1 - except StopIteration: - pass - - if is_initial_diff: - # There is only one diff, so, we can stretch it to remove holes - out_seq.stretch() - yield {'op': 'commit', 'sequence': out_seq} - - _logger.debug('Stop sync: in_seq=%r out_seq=%r updates=%r deletes=%r', - in_seq, out_seq, files, deleted) - - def _sync(self): - if os.stat(self._files_path).st_mtime <= self._stamp: - return False - - new_files = set() - updates = 0 - deletes = 0 - - # Populate list of new files at first - for root, __, files in os.walk(self._files_path): - coroutine.dispatch() - rel_root = relpath(root, self._files_path) - if rel_root == '.': - rel_root = '' - else: - rel_root += os.sep - for filename in files: - coroutine.dispatch() - path = join(root, filename) - if os.lstat(path).st_mtime > self._stamp: - new_files.add(rel_root + filename) - - # Check for updates for already tracked files - tail = [] - for pos, (__, rel_path, mtime) in enumerate(self._index[:]): - coroutine.dispatch() - path = join(self._files_path, rel_path) - existing = lexists(path) - if existing == (mtime >= 0) and \ - (not existing or os.lstat(path).st_mtime == mtime): - continue - if existing: - new_files.discard(rel_path) - pos -= len(tail) - self._index = self._index[:pos] + self._index[pos + 1:] - tail.append([ - self._seqno.next(), - rel_path, - int(os.lstat(path).st_mtime) if existing else -1, - ]) - if existing: - updates += 1 - else: - deletes += 1 - self._index.extend(tail) - - _logger.debug('Updated %r index: new=%r updates=%r deletes=%r', - self._files_path, len(self._files_path), updates, deletes) - - # Finally, add new files - for rel_path in sorted(new_files): - coroutine.dispatch() - mtime = os.lstat(join(self._files_path, rel_path)).st_mtime - self._index.append([self._seqno.next(), rel_path, mtime]) - - self._stamp = os.stat(self._files_path).st_mtime - if self._seqno.commit(): - with toolkit.new_file(self._index_path) as f: - json.dump((self._index, self._stamp), f) - - return True diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py index c7c22e0..61d32fb 100644 --- a/sugar_network/node/master.py +++ b/sugar_network/node/master.py @@ -1,4 +1,4 @@ -# Copyright (C) 2013 Aleksey Lim +# Copyright (C) 2013-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -13,17 +13,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import json -import base64 import logging -from Cookie import SimpleCookie -from os.path import join +from urlparse import urlsplit -from sugar_network import node, toolkit -from sugar_network.node import sync, stats_user, files, model, downloads, obs +from sugar_network import toolkit +from sugar_network.node import obs, master_api from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL -from sugar_network.toolkit import http, enforce +from sugar_network.toolkit.coroutine import this +from sugar_network.toolkit import http, parcel, pylru, ranges, enforce RESOURCES = ( @@ -33,215 +31,107 @@ RESOURCES = ( 'sugar_network.model.user', ) -_ONE_WAY_DOCUMENTS = ['report'] - _logger = logging.getLogger('node.master') class MasterRoutes(NodeRoutes): - def __init__(self, guid, volume, **kwargs): - NodeRoutes.__init__(self, guid, volume=volume, **kwargs) - - self._pulls = { - 'pull': lambda **kwargs: - ('diff', None, model.diff(self.volume, - ignore_documents=_ONE_WAY_DOCUMENTS, **kwargs)), - 'files_pull': lambda **kwargs: - ('files_diff', None, self._files.diff(**kwargs)), - } - - self._pull_queue = downloads.Pool( - join(toolkit.cachedir.value, 'pulls')) - self._files = None - - if node.files_root.value: - self._files = files.Index(node.files_root.value, - join(volume.root, 'files.index'), volume.seqno) - - @route('POST', cmd='sync', - acl=ACL.AUTH) - def sync(self, request): - reply, cookie = self._push(sync.decode(request.content_stream)) - exclude_seq = None - if len(cookie.sent) == 1: - exclude_seq = cookie.sent.values()[0] - for op, layer, seq in cookie: - reply.append(self._pulls[op](in_seq=seq, - exclude_seq=exclude_seq, layer=layer)) - return sync.encode(reply, src=self.guid) + def __init__(self, **kwargs): + NodeRoutes.__init__(self, urlsplit(master_api.value).netloc, **kwargs) + self._pulls = pylru.lrucache(1024) + + @route('POST', cmd='sync', arguments={'accept_length': int}) + def sync(self, accept_length): + return parcel.encode(self._push() + (self._pull() or []), + limit=accept_length, header={'from': self.guid}, + on_complete=this.cookie.clear) @route('POST', cmd='push') - def push(self, request, response): - reply, cookie = self._push(sync.package_decode(request.content_stream)) - # Read passed cookie only after excluding `merged_seq`. - # If there is `pull` out of currently pushed packet, excluding - # `merged_seq` should not affect it. - cookie.update(_Cookie(request)) - cookie.store(response) - return sync.package_encode(reply, src=self.guid) - - @route('GET', cmd='pull', - mime_type='application/octet-stream', - arguments={'accept_length': int}) - def pull(self, request, response, accept_length=None): - cookie = _Cookie(request) - if not cookie: - _logger.warning('Requested full dump in pull command') - cookie.append(('pull', None, toolkit.Sequence([[1, None]]))) - cookie.append(('files_pull', None, toolkit.Sequence([[1, None]]))) - - exclude_seq = None - if len(cookie.sent) == 1: - exclude_seq = toolkit.Sequence(cookie.sent.values()[0]) - - reply = None - for pull_key in cookie: - op, layer, seq = pull_key - - pull = self._pull_queue.get(pull_key) - if pull is not None: - if not pull.ready: - continue - if not pull.tag: - self._pull_queue.remove(pull_key) - cookie.remove(pull_key) - continue - if accept_length is None or pull.length <= accept_length: - _logger.debug('Found ready to use %r', pull) - if pull.complete: - cookie.remove(pull_key) - else: - seq.exclude(pull.tag) - reply = pull.open() - break - _logger.debug('Existing %r is too big, will recreate', pull) - self._pull_queue.remove(pull_key) - - out_seq = toolkit.Sequence() - pull = self._pull_queue.set(pull_key, out_seq, - sync.sneakernet_encode, - [self._pulls[op](in_seq=seq, out_seq=out_seq, - exclude_seq=exclude_seq, layer=layer, - fetch_blobs=True)], - limit=accept_length, src=self.guid) - _logger.debug('Start new %r', pull) + def push(self): + return parcel.encode(self._push(), header={'from': self.guid}) + @route('GET', cmd='pull', arguments={'accept_length': int}) + def pull(self, accept_length): + reply = self._pull() if reply is None: - if cookie: - _logger.debug('No ready pulls') - # TODO Might be useful to set meaningful value here - cookie.delay = node.pull_timeout.value - else: - _logger.debug('Nothing to pull') - - cookie.store(response) - return reply + return None + return parcel.encode(reply, limit=accept_length, + header={'from': self.guid}, on_complete=this.cookie.clear) @route('PUT', ['context', None], cmd='presolve', acl=ACL.AUTH, mime_type='application/json') def presolve(self, request): - enforce(node.files_root.value, http.BadRequest, 'Disabled') - aliases = self.volume['context'].get(request.guid)['aliases'] + aliases = this.volume['context'].get(request.guid)['aliases'] enforce(aliases, http.BadRequest, 'Nothing to presolve') - return obs.presolve(None, aliases, node.files_root.value) + return obs.presolve(None, aliases, this.volume.blobs.path('packages')) def status(self): result = NodeRoutes.status(self) - result['level'] = 'master' + result['mode'] = 'master' return result - def _push(self, stream): + def _push(self): + cookie = this.cookie reply = [] - cookie = _Cookie() - - for packet in stream: - src = packet['src'] - enforce(packet['dst'] == self.guid, 'Misaddressed packet') - - if packet.name == 'pull': - pull_seq = cookie['pull', packet['layer'] or None] - pull_seq.include(packet['sequence']) - cookie.sent.setdefault(src, toolkit.Sequence()) - elif packet.name == 'files_pull': - if self._files is not None: - cookie['files_pull'].include(packet['sequence']) - elif packet.name == 'diff': - seq, ack_seq = model.merge(self.volume, packet) - reply.append(('ack', { - 'ack': ack_seq, - 'sequence': seq, - 'dst': src, - }, None)) - sent_seq = cookie.sent.setdefault(src, toolkit.Sequence()) - sent_seq.include(ack_seq) - elif packet.name == 'stats_diff': - reply.append(('stats_ack', { - 'sequence': stats_user.merge(packet), - 'dst': src, - }, None)) - - return reply, cookie - - -class _Cookie(list): - - def __init__(self, request=None): - list.__init__(self) - - self.sent = {} - self.delay = 0 - - if request is not None: - self.update(self._get_cookie(request, 'sugar_network_pull') or []) - self.sent = self._get_cookie(request, 'sugar_network_sent') or {} - - def __repr__(self): - return '<Cookie pull=%s sent=%r>' % (list.__repr__(self), self.sent) - - def update(self, that): - for op, layer, seq in that: - self[op, layer].include(seq) - - def store(self, response): - response.set('set-cookie', []) - if self: - _logger.debug('Postpone %r in cookie', self) - self._set_cookie(response, 'sugar_network_pull', - base64.b64encode(json.dumps(self))) - self._set_cookie(response, 'sugar_network_sent', - base64.b64encode(json.dumps(self.sent))) - self._set_cookie(response, 'sugar_network_delay', self.delay) + + for packet in parcel.decode( + this.request.content_stream, this.request.content_length): + sender = packet['from'] + enforce(packet['to'] == self.guid, http.BadRequest, + 'Misaddressed packet') + if packet.name == 'push': + seqno, push_r = this.volume.patch(packet) + ack_r = [] if seqno is None else [[seqno, seqno]] + ack = {'ack': ack_r, 'ranges': push_r, 'to': sender} + reply.append(('ack', ack, None)) + cookie.setdefault('ack', {}) \ + .setdefault(sender, []) \ + .append((push_r, ack_r)) + elif packet.name == 'pull': + cookie.setdefault('ack', {}).setdefault(sender, []) + ranges.include(cookie.setdefault('pull', []), packet['ranges']) + elif packet.name == 'request': + cookie.setdefault('request', []).append(packet.header) + + return reply + + def _pull(self): + processed = this.cookie.get('id') + if processed in self._pulls: + cookie = this.cookie = self._pulls[processed] + if not cookie: + return None else: - self._unset_cookie(response, 'sugar_network_pull') - self._unset_cookie(response, 'sugar_network_sent') - self._unset_cookie(response, 'sugar_network_delay') - - def __getitem__(self, key): - if not isinstance(key, tuple): - key = (key, None) - for op, layer, seq in self: - if (op, layer) == key: - return seq - seq = toolkit.Sequence() - self.append(key + (seq,)) - return seq - - def _get_cookie(self, request, name): - cookie_str = request.environ.get('HTTP_COOKIE') - if not cookie_str: - return - cookie = SimpleCookie() - cookie.load(cookie_str) - if name not in cookie: - return - value = cookie.get(name).value - if value != 'unset_%s' % name: - return json.loads(base64.b64decode(value)) - - def _set_cookie(self, response, name, value, age=3600): - cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, value, age) - response.get('set-cookie').append(cookie) - - def _unset_cookie(self, response, name): - self._set_cookie(response, name, 'unset_%s' % name, 0) + cookie = this.cookie + cookie['id'] = toolkit.uuid() + self._pulls[cookie['id']] = cookie + + pull_r = cookie.get('pull') + if not pull_r: + return [] + + reply = [] + exclude = [] + acks = cookie.get('ack') + if acks: + acked = {} + for req in cookie.get('request') or []: + ack_r = None + for push_r, ack_r in acks.get(req['origin']) or []: + if req['ranges'] == push_r: + break + else: + continue + ranges.include(acked.setdefault(req['from'], []), ack_r) + reply.append(('ack', {'to': req['from'], 'ack': ack_r}, [])) + for node, ack_ranges in acks.items(): + acked_r = acked.setdefault(node, []) + for __, i in ack_ranges: + ranges.include(acked_r, i) + r = reduce(lambda x, y: ranges.intersect(x, y), acked.values()) + ranges.include(exclude, r) + + push = this.volume.diff(pull_r, exclude, one_way=True, files=['']) + reply.append(('push', None, push)) + + return reply diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py index 9ead01c..90e50c2 100644 --- a/sugar_network/node/model.py +++ b/sugar_network/node/model.py @@ -16,7 +16,7 @@ import bisect import logging -from sugar_network import db, toolkit +from sugar_network import db from sugar_network.model import Release, context as base_context from sugar_network.node import obs from sugar_network.toolkit.router import ACL @@ -105,71 +105,6 @@ class Context(base_context.Context): return value -def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None, - ignore_documents=None, **kwargs): - if out_seq is None: - out_seq = toolkit.Sequence([]) - is_the_only_seq = not out_seq - if layer: - if isinstance(layer, basestring): - layer = [layer] - layer.append('common') - try: - for resource, directory in volume.items(): - if ignore_documents and resource in ignore_documents: - continue - coroutine.dispatch() - directory.commit() - yield {'resource': resource} - for guid, patch in directory.diff(in_seq, exclude_seq, - layer=layer if resource == 'context' else None): - adiff = {} - adiff_seq = toolkit.Sequence() - for prop, meta, seqno in patch: - adiff[prop] = meta - adiff_seq.include(seqno, seqno) - if adiff: - yield {'guid': guid, 'diff': adiff} - out_seq.include(adiff_seq) - if is_the_only_seq: - # There is only one diff, so, we can stretch it to remove all holes - out_seq.stretch() - except StopIteration: - pass - - yield {'commit': out_seq} - - -def merge(volume, records): - directory = None - commit_seq = toolkit.Sequence() - merged_seq = toolkit.Sequence() - synced = False - - for record in records: - resource_ = record.get('resource') - if resource_: - directory = volume[resource_] - continue - - if 'guid' in record: - seqno, merged = directory.merge(**record) - synced = synced or merged - if seqno is not None: - merged_seq.include(seqno, seqno) - continue - - commit = record.get('commit') - if commit is not None: - commit_seq.include(commit) - continue - - if synced: - this.broadcast({'event': 'sync'}) - - return commit_seq, merged_seq - - def solve(volume, top_context, lsb_id=None, lsb_release=None, stability=None, requires=None): top_context = volume['context'][top_context] diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index 6d5d200..86e4ce1 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -18,14 +18,13 @@ import time import logging import hashlib from ConfigParser import ConfigParser -from os.path import join, isdir, exists +from os.path import join, exists -from sugar_network import db, node, toolkit -from sugar_network.db import blobs +from sugar_network import db, node from sugar_network.model import FrontRoutes, load_bundle -from sugar_network.node import stats_user, model +from sugar_network.node import model # pylint: disable-msg=W0611 -from sugar_network.toolkit.router import route, preroute, postroute, ACL +from sugar_network.toolkit.router import route, preroute, postroute, ACL, File from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute from sugar_network.toolkit.spec import parse_requires, parse_version from sugar_network.toolkit.bundle import Bundle @@ -53,10 +52,6 @@ class NodeRoutes(db.Routes, FrontRoutes): def guid(self): return self._guid - @route('GET', cmd='logon', acl=ACL.AUTH) - def logon(self): - pass - @route('GET', cmd='whoami', mime_type='application/json') def whoami(self, request, response): roles = [] @@ -66,7 +61,7 @@ class NodeRoutes(db.Routes, FrontRoutes): @route('GET', cmd='status', mime_type='application/json') def status(self): - return {'guid': self._guid, + return {'guid': self.guid, 'seqno': { 'db': self.volume.seqno.value, 'releases': self.volume.releases_seqno.value, @@ -80,49 +75,39 @@ class NodeRoutes(db.Routes, FrontRoutes): @fallbackroute('GET', ['packages']) def route_packages(self, request, response): - enforce(node.files_root.value, http.BadRequest, 'Disabled') - - if request.path and request.path[-1] == 'updates': - root = join(node.files_root.value, *request.path[:-1]) - enforce(isdir(root), http.NotFound, 'Directory was not found') + path = this.request.path + if path and path[-1] == 'updates': result = [] last_modified = 0 - for filename in os.listdir(root): - if '.' in filename: + for blob in this.volume.blobs.diff( + [[this.request.if_modified_since + 1, None]], + join(*path[:-1]), recursive=False): + if '.' in blob.name: continue - path = join(root, filename) - mtime = int(os.stat(path).st_mtime) - if mtime > request.if_modified_since: - result.append(filename) - last_modified = max(last_modified, mtime) + result.append(blob.name) + last_modified = max(last_modified, blob.mtime) response.content_type = 'application/json' if last_modified: response.last_modified = last_modified return result - path = join(node.files_root.value, *request.path) - enforce(exists(path), http.NotFound, 'File was not found') - if not isdir(path): - return toolkit.iter_file(path) - - result = [] - for filename in os.listdir(path): - if filename.endswith('.rpm') or filename.endswith('.deb'): - continue - result.append(filename) - - response.content_type = 'application/json' - return result + blob = this.volume.blobs.get(join(*path)) + if isinstance(blob, File): + return blob + else: + response.content_type = 'application/json' + return [i.name for i in blob if '.' not in i.name] @route('POST', ['context'], cmd='submit', arguments={'initial': False}, mime_type='application/json', acl=ACL.AUTH) - def submit_release(self, request, initial): - blob = blobs.post(request.content_stream, request.content_type) + def submit_release(self, initial): + blob = this.volume.blobs.post( + this.request.content_stream, this.request.content_type) try: context, release = load_bundle(blob, initial=initial) except Exception: - blobs.delete(blob.digest) + this.volume.blobs.delete(blob.digest) raise this.call(method='POST', path=['context', context, 'releases'], content_type='application/json', content=release) @@ -156,29 +141,7 @@ class NodeRoutes(db.Routes, FrontRoutes): arguments={'requires': list}) def get_clone(self, request, response): solution = self.solve(request) - return blobs.get(solution[request.guid]['blob']) - - @route('GET', ['user', None], cmd='stats-info', - mime_type='application/json', acl=ACL.AUTH) - def user_stats_info(self, request): - status = {} - for rdb in stats_user.get_rrd(request.guid): - status[rdb.name] = rdb.last + stats_user.stats_user_step.value - - # TODO Process client configuration in more general manner - return {'enable': True, - 'step': stats_user.stats_user_step.value, - 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], - 'status': status, - } - - @route('POST', ['user', None], cmd='stats-upload', acl=ACL.AUTH) - def user_stats_upload(self, request): - name = request.content['name'] - values = request.content['values'] - rrd = stats_user.get_rrd(request.guid) - for timestamp, values in values: - rrd[name].put(values, timestamp) + return this.volume.blobs.get(solution[request.guid]['blob']) @preroute def preroute(self, op, request, response): @@ -204,7 +167,7 @@ class NodeRoutes(db.Routes, FrontRoutes): def on_create(self, request, props): if request.resource == 'user': - with file(blobs.get(props['pubkey']).path) as f: + with file(this.volume.blobs.get(props['pubkey']).path) as f: props['guid'] = str(hashlib.sha1(f.read()).hexdigest()) db.Routes.on_create(self, request, props) @@ -223,7 +186,7 @@ class NodeRoutes(db.Routes, FrontRoutes): from M2Crypto import RSA pubkey = self.volume['user'][auth.login]['pubkey'] - key = RSA.load_pub_key(blobs.get(pubkey).path) + key = RSA.load_pub_key(this.volume.blobs.get(pubkey).path) data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest() enforce(key.verify(data, auth.signature.decode('hex')), http.Forbidden, 'Bad credentials') diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py index 2d60ea8..333e6ea 100644 --- a/sugar_network/node/slave.py +++ b/sugar_network/node/slave.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2013 Aleksey Lim +# Copyright (C) 2012-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,13 +21,12 @@ from urlparse import urlsplit from os.path import join, dirname, exists, isabs from gettext import gettext as _ -from sugar_network import node, toolkit -from sugar_network.client import api_url -from sugar_network.node import sync, stats_user, files, model +from sugar_network import toolkit +from sugar_network.node import master_api from sugar_network.node.routes import NodeRoutes from sugar_network.toolkit.router import route, ACL from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, enforce +from sugar_network.toolkit import http, parcel, ranges, enforce _logger = logging.getLogger('node.slave') @@ -35,148 +34,94 @@ _logger = logging.getLogger('node.slave') class SlaveRoutes(NodeRoutes): - def __init__(self, key_path, volume_): - self._auth = http.SugarAuth(key_path) - NodeRoutes.__init__(self, self._auth.login, volume_) - - self._push_seq = toolkit.PersistentSequence( - join(volume_.root, 'push.sequence'), [1, None]) - self._pull_seq = toolkit.PersistentSequence( - join(volume_.root, 'pull.sequence'), [1, None]) - self._files_seq = toolkit.PersistentSequence( - join(volume_.root, 'files.sequence'), [1, None]) - self._master_guid = urlsplit(api_url.value).netloc - self._offline_session = None - - @route('POST', cmd='online-sync', acl=ACL.LOCAL) + def __init__(self, volume, **kwargs): + self._creds = http.SugarAuth( + join(volume.root, 'etc', 'private', 'node')) + NodeRoutes.__init__(self, self._creds.login, volume=volume, **kwargs) + vardir = join(volume.root, 'var') + self._push_r = toolkit.Bin(join(vardir, 'push.ranges'), [[1, None]]) + self._pull_r = toolkit.Bin(join(vardir, 'pull.ranges'), [[1, None]]) + self._master_guid = urlsplit(master_api.value).netloc + + @route('POST', cmd='online_sync', acl=ACL.LOCAL, + arguments={'no_pull': bool}) def online_sync(self, no_pull=False): - conn = http.Connection(api_url.value, auth=self._auth) - - # TODO `http.Connection` should handle re-POSTing without - # loosing payload after authentication - conn.get(cmd='logon') - - push = [('diff', None, model.diff(self.volume, self._push_seq))] - if not no_pull: - push.extend([ - ('pull', { - 'sequence': self._pull_seq, - 'layer': node.sync_layers.value, - }, None), - ('files_pull', {'sequence': self._files_seq}, None), - ]) - if stats_user.stats_user.value: - push.append(('stats_diff', None, stats_user.diff())) + self._export(not no_pull) + conn = http.Connection(master_api.value) response = conn.request('POST', - data=sync.encode(push, src=self.guid, dst=self._master_guid), + data=parcel.encode(self._export(not no_pull), header={ + 'from': self.guid, + 'to': self._master_guid, + }), params={'cmd': 'sync'}, headers={'Transfer-Encoding': 'chunked'}) - self._import(sync.decode(response.raw), None) + self._import(parcel.decode(response.raw)) - @route('POST', cmd='offline-sync', acl=ACL.LOCAL) + @route('POST', cmd='offline_sync', acl=ACL.LOCAL) def offline_sync(self, path): - enforce(node.sync_layers.value, - '--sync-layers is not specified, the full master dump ' - 'might be too big and should be limited') - enforce(isabs(path), 'Argument \'path\' should be an absolute path') - - _logger.debug('Start %r synchronization session in %r', - self._offline_session, path) + enforce(isabs(path), "Argument 'path' is not an absolute path") + _logger.debug('Start offline synchronization in %r', path) if not exists(path): os.makedirs(path) - try: - self._offline_session = self._offline_sync(path, - **(self._offline_session or {})) - except Exception: - toolkit.exception(_logger, 'Failed to complete synchronization') - self._offline_session = None - raise - - if self._offline_session is None: - _logger.debug('Synchronization completed') - else: - _logger.debug('Postpone synchronization with %r session', - self._offline_session) - - def status(self): - result = NodeRoutes.status(self) - result['level'] = 'slave' - return result - - def _offline_sync(self, path, push_seq=None, stats_seq=None, session=None): - push = [] - - if push_seq is None: - push_seq = toolkit.Sequence(self._push_seq) - if stats_seq is None: - stats_seq = {} - if session is None: - session = toolkit.uuid() - push.append(('pull', { - 'sequence': self._pull_seq, - 'layer': node.sync_layers.value, - }, None)) - push.append(('files_pull', {'sequence': self._files_seq}, None)) - this.broadcast({ 'event': 'sync_progress', 'progress': _('Reading sneakernet packages'), }) - self._import(sync.sneakernet_decode(path), push_seq) - - offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync') - if exists(offline_script): - shutil.copy(offline_script, path) + requests = self._import(parcel.decode_dir(path)) this.broadcast({ 'event': 'sync_progress', 'progress': _('Generating new sneakernet package'), }) + offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync') + if exists(offline_script): + shutil.copy(offline_script, path) + parcel.encode_dir(requests + self._export(True), root=path, header={ + 'from': self.guid, + 'to': self._master_guid, + }) + + _logger.debug('Synchronization completed') + + def status(self): + result = NodeRoutes.status(self) + result['mode'] = 'slave' + return result + + def _import(self, package): + requests = [] - diff_seq = toolkit.Sequence([]) - push.append(('diff', None, - model.diff(self.volume, push_seq, diff_seq))) - if stats_user.stats_user.value: - push.append(('stats_diff', None, stats_user.diff(stats_seq))) - complete = sync.sneakernet_encode(push, root=path, - src=self.guid, dst=self._master_guid, api_url=api_url.value, - session=session) - if not complete: - push_seq.exclude(diff_seq) - return {'push_seq': push_seq, - 'stats_seq': stats_seq, - 'session': session, - } - - def _import(self, package, push_seq): for packet in package: - from_master = (packet['src'] == self._master_guid) - addressed = (packet['dst'] == self.guid) - - if packet.name == 'diff': - _logger.debug('Processing %r', packet) - seq, __ = model.merge(self.volume, packet, shift_seqno=False) - if from_master and seq: - self._pull_seq.exclude(seq) - self._pull_seq.commit() - - elif from_master: - if packet.name == 'ack' and addressed: - _logger.debug('Processing %r', packet) - if push_seq: - push_seq.exclude(packet['sequence']) - self._pull_seq.exclude(packet['ack']) - self._pull_seq.commit() - self._push_seq.exclude(packet['sequence']) - self._push_seq.commit() - elif packet.name == 'stats_ack' and addressed: - _logger.debug('Processing %r', packet) - stats_user.commit(packet['sequence']) - elif packet.name == 'files_diff': - _logger.debug('Processing %r', packet) - seq = files.merge(node.files_root.value, packet) - if seq: - self._files_seq.exclude(seq) - self._files_seq.commit() + sender = packet['from'] + from_master = (sender == self._master_guid) + if packet.name == 'push': + seqno, committed = this.volume.patch(packet) + if seqno is not None: + if from_master: + ranges.exclude(self._pull_r.value, committed) + self._pull_r.commit() + else: + requests.append(('request', { + 'origin': sender, + 'ranges': committed, + }, [])) + ranges.exclude(self._push_r.value, seqno, seqno) + self._push_r.commit() + elif packet.name == 'ack' and from_master and \ + packet['to'] == self.guid: + ranges.exclude(self._pull_r.value, packet['ack']) + self._pull_r.commit() + if packet['ranges']: + ranges.exclude(self._push_r.value, packet['ranges']) + self._push_r.commit() + + return requests + + def _export(self, pull): + export = [] + if pull: + export.append(('pull', {'ranges': self._pull_r.value}, None)) + export.append(('push', None, self.volume.diff(self._push_r.value))) + return export diff --git a/sugar_network/node/stats_user.py b/sugar_network/node/stats_user.py deleted file mode 100644 index 0b96449..0000000 --- a/sugar_network/node/stats_user.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import logging -from os.path import join, exists, isdir - -from sugar_network import node, toolkit -from sugar_network.toolkit.rrd import Rrd -from sugar_network.toolkit import Option, pylru, enforce - - -stats_user = Option( - 'accept personalized users statistics', - default=False, type_cast=Option.bool_cast, action='store_true') - -stats_user_step = Option( - 'step interval in seconds for users\' RRD databases', - default=60, type_cast=int) - -stats_user_rras = Option( - 'comma separated list of RRAs for users\' RRD databases', - default=[ - 'RRA:AVERAGE:0.5:1:4320', # one day with 60s step - 'RRA:AVERAGE:0.5:5:2016', # one week with 5min step - ], - type_cast=Option.list_cast, type_repr=Option.list_repr) - -_logger = logging.getLogger('node.stats_user') -_user_cache = pylru.lrucache(32) - - -def get_rrd(user): - if user in _user_cache: - return _user_cache[user] - else: - rrd = _user_cache[user] = Rrd(_rrd_path(user), - stats_user_step.value, stats_user_rras.value) - return rrd - - -def diff(in_info=None): - if in_info is None: - in_info = {} - out_info = {} - - try: - for user, rrd in _walk_rrd(join(node.stats_root.value, 'user')): - in_info.setdefault(user, {}) - out_info.setdefault(user, {}) - - for db in rrd: - yield {'db': db.name, 'user': user} - - in_seq = in_info[user].get(db.name) - if in_seq is None: - in_seq = toolkit.PersistentSequence( - join(rrd.root, db.name + '.push'), [1, None]) - in_info[user][db.name] = in_seq - elif in_seq is not toolkit.Sequence: - in_seq = in_info[user][db.name] = toolkit.Sequence(in_seq) - out_seq = out_info[user].setdefault(db.name, - toolkit.Sequence()) - - for start, end in in_seq: - for timestamp, values in \ - db.get(max(start, db.first), end or db.last): - yield {'timestamp': timestamp, 'values': values} - in_seq.exclude(start, timestamp) - out_seq.include(start, timestamp) - start = timestamp - except StopIteration: - pass - - yield {'commit': out_info} - - -def merge(packet): - db = None - seq = None - - for record in packet: - if 'db' in record: - db = get_rrd(record['user'])[record['db']] - elif 'commit' in record: - seq = record['commit'] - else: - enforce(db is not None, 'Malformed user stats diff') - db.put(record['values'], record['timestamp']) - - return seq - - -def commit(info): - for user, dbs in info.items(): - for db_name, merged in dbs.items(): - seq = toolkit.PersistentSequence( - _rrd_path(user, db_name + '.push'), [1, None]) - seq.exclude(merged) - seq.commit() - - -def _walk_rrd(root): - if not exists(root): - return - for users_dirname in os.listdir(root): - users_dir = join(root, users_dirname) - if not isdir(users_dir): - continue - for user in os.listdir(users_dir): - yield user, Rrd(join(users_dir, user), stats_user_step.value) - - -def _rrd_path(user, *args): - return join(node.stats_root.value, 'user', user[:2], user, *args) diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index d3d9b88..89a9d0f 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -523,7 +523,7 @@ class Bin(object): json.dump(self.value, f) f.flush() os.fsync(f.fileno()) - self._orig_value = self.value + self._orig_value = deepcopy(self.value) return True def __enter__(self): diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index d280035..254e6f3 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -112,8 +112,8 @@ class Connection(object): _Session = None - def __init__(self, api_url='', auth=None, max_retries=0, **session_args): - self.api_url = api_url + def __init__(self, api='', auth=None, max_retries=0, **session_args): + self.api = api self.auth = auth self._max_retries = max_retries self._session_args = session_args @@ -121,7 +121,7 @@ class Connection(object): self._nonce = None def __repr__(self): - return '<Connection api_url=%s>' % self.api_url + return '<Connection api=%s>' % self.api def __enter__(self): return self @@ -203,7 +203,7 @@ class Connection(object): if not path: path = [''] if not isinstance(path, basestring): - path = '/'.join([i.strip('/') for i in [self.api_url] + path]) + path = '/'.join([i.strip('/') for i in [self.api] + path]) try_ = 0 while True: @@ -283,7 +283,7 @@ class Connection(object): break path = reply.headers['location'] if path.startswith('/'): - path = self.api_url + path + path = self.api + path if request.method != 'HEAD': if reply.headers.get('Content-Type') == 'application/json': @@ -380,7 +380,7 @@ class SugarAuth(object): key_dir = dirname(self._key_path) if exists(self._key_path): - if os.stat(key_dir) & 077: + if os.stat(key_dir).st_mode & 077: os.chmod(key_dir, 0700) self._key = RSA.load_key(self._key_path) return @@ -432,7 +432,7 @@ class _Subscription(object): if try_ == 0: raise toolkit.exception('Failed to read from %r subscription, ' - 'will resubscribe', self._client.api_url) + 'will resubscribe', self._client.api) self._content = None return _parse_event(line) @@ -441,7 +441,7 @@ class _Subscription(object): return self._content params.update(self._condition) params['cmd'] = 'subscribe' - _logger.debug('Subscribe to %r, %r', self._client.api_url, params) + _logger.debug('Subscribe to %r, %r', self._client.api, params) response = self._client.request('GET', params=params) self._content = response.raw return self._content diff --git a/sugar_network/toolkit/rrd.py b/sugar_network/toolkit/rrd.py deleted file mode 100644 index bafe6d1..0000000 --- a/sugar_network/toolkit/rrd.py +++ /dev/null @@ -1,296 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -"""Convenient access to RRD databases.""" - -import re -import os -import time -import json -import bisect -import logging -from os.path import exists, join, splitext - - -_DB_FILENAME_RE = re.compile('(.*?)(-[0-9]+){0,1}\\.rrd$') -_INFO_RE = re.compile('([^[]+)\\[([^]]+)\\]\\.(.*)$') - -_FETCH_PAGE = 256 - -_logger = logging.getLogger('rrd') -_rrdtool = None - - -class Rrd(object): - - def __init__(self, root, step, rras=None): - global _rrdtool - - import rrdtool - _rrdtool = rrdtool - - self._root = root - self._step = step - # rrdtool knows nothing about `unicode` - self._rras = [i.encode('utf8') for i in rras or []] - self._dbsets = {} - - if not exists(self._root): - os.makedirs(self._root) - - for filename in os.listdir(self._root): - match = _DB_FILENAME_RE.match(filename) - if match is not None: - name, revision = match.groups() - self.get(name).load(filename, int(revision or 0)) - - def __iter__(self): - for i in self._dbsets.values(): - yield i - - def __getitem__(self, name): - return self.get(name) - - def __contains__(self, name): - return name in self._dbsets - - @property - def root(self): - return self._root - - @property - def step(self): - return self._step - - def get(self, name): - db = self._dbsets.get(name) - if db is None: - db = _DbSet(self._root, name, self._step, self._rras) - self._dbsets[name] = db - return db - - -class _DbSet(object): - - def __init__(self, root, name, step, rras): - self._root = root - self.name = name - self._step = step - self._rras = rras - self._revisions = [] - self._fields = None - self._field_names = None - self.__db = None - - @property - def fields(self): - return self._field_names - - @fields.setter - def fields(self, fields): - self._field_names = fields.keys() - self._field_names.sort() - self._fields = [str(fields[i]) for i in self._field_names] - _logger.debug('Set %r fields for %r', self._fields, self.name) - - @property - def first(self): - if not self._revisions: - return - return self._revisions[0].first - - @property - def last(self): - if not self._revisions: - return - return self._revisions[-1].last - - @property - def last_ds(self): - if not self._revisions or not self._field_names: - return {} - info = _rrdtool.info(self._revisions[-1].path) - result = {} - for field in self._field_names: - result[field] = float(info.get('ds[%s].last_ds' % field) or 0) - return result - - def load(self, filename, revision): - _logger.debug('Load %s database from %s with revision %s', - filename, self._root, revision) - db = _Db(join(self._root, filename), revision) - bisect.insort(self._revisions, db) - return db - - def put(self, values, timestamp=None): - if not self.fields: - _logger.debug('Parse fields from the first put') - self.fields = dict([ - (i, 'DS:%s:GAUGE:%s:U:U' % (i, self._step * 2)) - for i in values]) - - if not timestamp: - timestamp = int(time.time()) - timestamp = timestamp / self._step * self._step - - db = self._get_db(timestamp) - if db is None: - return - - if timestamp <= db.last: - _logger.warning('Database %s updated at %s, %s in the past', - db.path, db.last, timestamp) - return - - value = [str(timestamp)] - for name in self._field_names: - value.append(str(values[name])) - - _logger.debug('Put %r to %s', value, db.path) - - db.put(':'.join(value), timestamp) - - def get(self, start=None, end=None, resolution=None): - if not self._revisions: - return - - if not resolution: - resolution = self._step - - if start is None: - start = self._revisions[0].first - if end is None: - end = self._revisions[-1].last - - revisions = [] - for db in reversed(self._revisions): - revisions.append(db) - if db.last <= start: - break - - start = start - start % self._step - self._step - last = min(end, start + _FETCH_PAGE * resolution) - last -= last % self._step + self._step - - for db in reversed(revisions): - db_end = min(last, db.last - self._step) - if start > db_end: - break - (row_start, start, row_step), __, rows = _rrdtool.fetch( - str(db.path), - 'AVERAGE', - '--start', str(start), - '--end', str(db_end), - '--resolution', str(resolution)) - for raw_row in rows: - row_start += row_step - if row_start > end: - break - row = {} - for i, value in enumerate(raw_row): - row[db.field_names[i]] = value or .0 - yield row_start, row - start = db_end + 1 - - def _get_db(self, timestamp): - if self.__db is None and self._fields: - if self._revisions: - db = self._revisions[-1] - if db.last >= timestamp: - _logger.warning( - 'Database %s updated at %s, %s in the past', - db.path, db.last, timestamp) - return None - if db.step != self._step or db.rras != self._rras or \ - db.field_names != self._field_names: - db = self._create_db(db.revision + 1, db.last) - else: - db = self._create_db(0, timestamp) - self.__db = db - return self.__db - - def _create_db(self, revision, timestamp): - filename = self.name - if revision: - filename += '-%s' % revision - filename += '.rrd' - - _logger.debug('Create %s database in %s start=%s step=%s', - filename, self._root, timestamp, self._step) - - _rrdtool.create( - str(join(self._root, filename)), - '--start', str(timestamp - self._step), - '--step', str(self._step), - *(self._fields + self._rras)) - - return self.load(filename, revision) - - -class _Db(object): - - def __init__(self, path, revision=0): - self.path = str(path) - self._meta_path = splitext(path)[0] + '.meta' - self.revision = revision - self.fields = [] - self.field_names = [] - self.rras = [] - - info = _rrdtool.info(self.path) - self.step = info['step'] - self.first = 0 - self.last = info['last_update'] - - fields = {} - rras = {} - - for key, value in info.items(): - match = _INFO_RE.match(key) - if match is None: - continue - prefix, key, prop = match.groups() - if prefix == 'ds': - fields.setdefault(key, {}) - fields[key][prop] = value - if prefix == 'rra': - rras.setdefault(key, {}) - rras[key][prop] = value - - for index in sorted([int(i) for i in rras.keys()]): - rra = rras[str(index)] - self.rras.append( - 'RRA:%(cf)s:%(xff)s:%(pdp_per_row)s:%(rows)s' % rra) - - for name in sorted(fields.keys()): - props = fields[name] - props['name'] = name - self.fields.append(props) - self.field_names.append(name) - - if exists(self._meta_path): - with file(self._meta_path) as f: - self.first = json.load(f).get('first') - - def put(self, value, timestamp): - if not self.first: - with file(self._meta_path, 'w') as f: - json.dump({'first': timestamp}, f) - self.first = timestamp - _rrdtool.update(self.path, str(value)) - self.last = _rrdtool.info(self.path)['last_update'] - - def __cmp__(self, other): - return cmp(self.revision, other.revision) |