Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2014-03-07 17:02:19 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2014-03-09 05:15:13 (GMT)
commit41cb9e831db3d66973292fbdb4c13fb658ac9f59 (patch)
treedec6b37654de3f0d6d12dca6c413b2890cd8ef9f /sugar_network
parent90f74541ec4925bad47466e39517c22ff7eadfe4 (diff)
Fix node synchronization; remove usage stats, it should be standalone project/process
Diffstat (limited to 'sugar_network')
-rw-r--r--sugar_network/db/blobs.py15
-rw-r--r--sugar_network/db/resource.py7
-rw-r--r--sugar_network/db/volume.py38
-rw-r--r--sugar_network/node/__init__.py25
-rw-r--r--sugar_network/node/downloads.py128
-rw-r--r--sugar_network/node/files.py183
-rw-r--r--sugar_network/node/master.py288
-rw-r--r--sugar_network/node/model.py67
-rw-r--r--sugar_network/node/routes.py89
-rw-r--r--sugar_network/node/slave.py203
-rw-r--r--sugar_network/node/stats_user.py127
-rw-r--r--sugar_network/toolkit/__init__.py2
-rw-r--r--sugar_network/toolkit/http.py16
-rw-r--r--sugar_network/toolkit/rrd.py296
14 files changed, 245 insertions, 1239 deletions
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py
index cd795c6..52bc324 100644
--- a/sugar_network/db/blobs.py
+++ b/sugar_network/db/blobs.py
@@ -18,7 +18,7 @@ import logging
import hashlib
import mimetypes
from contextlib import contextmanager
-from os.path import exists, abspath, join, dirname
+from os.path import exists, abspath, join, dirname, isdir
from sugar_network import toolkit
from sugar_network.toolkit.router import File
@@ -104,10 +104,16 @@ class Blobs(object):
path = self.path(digest)
if exists(path + _META_SUFFIX):
return File(path, digest, _read_meta(path))
+ elif isdir(path):
+ return _lsdir(path, digest)
def delete(self, path):
self._delete(path, None)
+ def populate(self, path=None, recursive=True):
+ for __ in self.diff([[1, None]], path or '', recursive):
+ pass
+
def diff(self, r, path=None, recursive=True):
if path is None:
is_files = False
@@ -228,3 +234,10 @@ def _read_meta(path):
key, value = line.split(':', 1)
meta[key] = value.strip()
return meta
+
+
+def _lsdir(root, rel_root):
+ for filename in os.listdir(root):
+ path = join(root, filename)
+ if exists(path + _META_SUFFIX):
+ yield File(path, join(rel_root, filename), _read_meta(path))
diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py
index 71a3efd..9d94929 100644
--- a/sugar_network/db/resource.py
+++ b/sugar_network/db/resource.py
@@ -123,7 +123,6 @@ class Resource(object):
def diff(self, r):
patch = {}
- last_seqno = None
for name, prop in self.metadata.items():
if name == 'seqno' or prop.acl & ACL.CALC:
continue
@@ -133,16 +132,16 @@ class Resource(object):
seqno = meta.get('seqno')
if not ranges.contains(r, seqno):
continue
- last_seqno = max(seqno, last_seqno)
value = meta.get('value')
if isinstance(prop, Aggregated):
value_ = {}
for key, agg in value.items():
- if ranges.contains(r, agg.pop('seqno')):
+ agg_seqno = agg.pop('seqno')
+ if ranges.contains(r, agg_seqno):
value_[key] = agg
value = value_
patch[name] = {'mtime': meta['mtime'], 'value': value}
- return last_seqno, patch
+ return patch
def format_patch(self, props):
if not props:
diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py
index 5ec5683..5d9bac1 100644
--- a/sugar_network/db/volume.py
+++ b/sugar_network/db/volume.py
@@ -76,8 +76,15 @@ class Volume(dict):
for __ in cls.populate():
coroutine.dispatch()
- def diff(self, r, files=None, one_way=False):
+ def diff(self, r, exclude=None, files=None, one_way=False):
+ if exclude:
+ include = deepcopy(r)
+ ranges.exclude(include, exclude)
+ else:
+ include = r
last_seqno = None
+ found = False
+
try:
for resource, directory in self.items():
if one_way and directory.resource.one_way:
@@ -90,33 +97,34 @@ class Volume(dict):
query += str(end)
docs, __ = directory.find(query=query, order_by='seqno')
for doc in docs:
- seqno, patch = doc.diff(r)
- if not patch:
- continue
- yield {'guid': doc.guid, 'patch': patch}
- last_seqno = max(last_seqno, seqno)
- for blob in self.blobs.diff(r):
+ patch = doc.diff(include)
+ if patch:
+ yield {'guid': doc.guid, 'patch': patch}
+ found = True
+ last_seqno = max(last_seqno, doc['seqno'])
+ for blob in self.blobs.diff(include):
seqno = int(blob.pop('x-seqno'))
yield blob
+ found = True
last_seqno = max(last_seqno, seqno)
for dirpath in files or []:
- for blob in self.blobs.diff(r, dirpath):
+ for blob in self.blobs.diff(include, dirpath):
seqno = int(blob.pop('x-seqno'))
yield blob
+ found = True
last_seqno = max(last_seqno, seqno)
except StopIteration:
pass
- if last_seqno:
- commit_r = deepcopy(r)
+ if found:
+ commit_r = include if exclude else deepcopy(r)
ranges.exclude(commit_r, last_seqno + 1, None)
ranges.exclude(r, None, last_seqno)
yield {'commit': commit_r}
def patch(self, records):
directory = None
- commit_r = []
- merged_r = []
+ committed = []
seqno = None
for record in records:
@@ -137,12 +145,10 @@ class Volume(dict):
commit = record.get('commit')
if commit is not None:
- ranges.include(commit_r, commit)
+ ranges.include(committed, commit)
continue
- if seqno is not None:
- ranges.include(merged_r, seqno, seqno)
- return commit_r, merged_r
+ return seqno, committed
def __enter__(self):
return self
diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py
index a4360b4..14d675c 100644
--- a/sugar_network/node/__init__.py
+++ b/sugar_network/node/__init__.py
@@ -34,28 +34,17 @@ certfile = Option(
name='certfile')
data_root = Option(
- 'path to a directory to place server data',
+ 'path to a directory to place node data',
default='/var/lib/sugar-network', name='data_root')
find_limit = Option(
'limit the resulting list for search requests',
default=64, type_cast=int, name='find-limit')
-stats_root = Option(
- 'path to the root directory for placing stats',
- default='/var/lib/sugar-network/stats', name='stats_root')
+mode = Option(
+ 'node running mode, should be one of "slave", "proxy", or, "master"',
+ default='slave')
-files_root = Option(
- 'path to a directory to keep files synchronized between nodes',
- default='/var/lib/sugar-network/files', name='files_root')
-
-pull_timeout = Option(
- 'delay in seconds to return to sync-pull requester to wait until '
- 'pull request will be ready',
- default=30, type_cast=int)
-
-sync_layers = Option(
- 'comma separated list of layers to restrict Sugar Network '
- 'synchronization content',
- default=[], type_cast=Option.list_cast,
- type_repr=Option.list_repr, name='sync-layers')
+master_api = Option(
+ 'master API url either to connect to (for slave or proxy nodes), or,'
+ 'to provide from (for master nodes)')
diff --git a/sugar_network/node/downloads.py b/sugar_network/node/downloads.py
deleted file mode 100644
index b7156bc..0000000
--- a/sugar_network/node/downloads.py
+++ /dev/null
@@ -1,128 +0,0 @@
-# Copyright (C) 2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""Persistent pool with temporary files prepared to download."""
-
-import os
-import json
-import hashlib
-import logging
-from glob import glob
-from os.path import join, splitext, exists
-
-from sugar_network.toolkit import pylru, coroutine, exception
-
-
-# Maximum numer of postponed pulls master can handle at the same time
-_POOL_SIZE = 256
-_TAG_SUFFIX = '.tag'
-
-_logger = logging.getLogger('node.downloads')
-
-
-class Pool(object):
-
- def __init__(self, root):
- self._pool = pylru.lrucache(_POOL_SIZE, lambda __, dl: dl.pop())
- if not exists(root):
- os.makedirs(root)
- self._root = root
-
- for tag_path in glob(join(root, '*.tag')):
- path, __ = splitext(tag_path)
- if exists(path):
- try:
- with file(tag_path) as f:
- key, tag = json.load(f)
- pool_key = json.dumps(key)
- self._pool[pool_key] = _Download(key, tag, path)
- continue
- except Exception:
- exception('Cannot open %r download, recreate', tag_path)
- os.unlink(path)
- os.unlink(tag_path)
-
- def get(self, key):
- key = json.dumps(key)
- if key in self._pool:
- return self._pool[key]
-
- def set(self, key, tag, fetcher, *args, **kwargs):
- pool_key = json.dumps(key)
- path = join(self._root, hashlib.md5(pool_key).hexdigest())
-
- def do_fetch():
- try:
- complete = fetcher(*args, path=path, **kwargs)
- except Exception:
- exception('Error while fetching %r', self)
- if exists(path):
- os.unlink(path)
- return True
- with file(path + _TAG_SUFFIX, 'w') as f:
- json.dump([key, tag], f)
- return complete
-
- job = coroutine.spawn(do_fetch)
- dl = self._pool[pool_key] = _Download(key, tag, path, job)
- return dl
-
- def remove(self, key):
- key = json.dumps(key)
- if key in self._pool:
- self._pool.peek(key).pop()
- del self._pool[key]
-
-
-class _Download(dict):
-
- def __init__(self, key, tag, path, job=None):
- self.tag = tag
- self._key = key
- self._path = path
- self._job = job
-
- def __repr__(self):
- return '<Download %r path=%r>' % (self._key, self._path)
-
- @property
- def ready(self):
- # pylint: disable-msg=E1101
- return self._job is None or self._job.dead
-
- @property
- def complete(self):
- return self._job is not None and self._job.value
-
- @property
- def length(self):
- if exists(self._path):
- return os.stat(self._path).st_size
-
- def open(self):
- if exists(self._path):
- return file(self._path, 'rb')
-
- def pop(self):
- if self._job is not None and not self._job.dead:
- _logger.debug('Abort fetching %r', self)
- self._job.kill()
-
- if exists(self._path):
- os.unlink(self._path)
- if exists(self._path + _TAG_SUFFIX):
- os.unlink(self._path + _TAG_SUFFIX)
-
- _logger.debug('Throw out %r from the pool', self)
diff --git a/sugar_network/node/files.py b/sugar_network/node/files.py
deleted file mode 100644
index 4fb64ca..0000000
--- a/sugar_network/node/files.py
+++ /dev/null
@@ -1,183 +0,0 @@
-# Copyright (C) 2012-2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import json
-import logging
-from bisect import bisect_left
-from shutil import copyfileobj
-from os.path import join, exists, relpath, lexists, dirname
-
-from sugar_network import toolkit
-from sugar_network.toolkit import coroutine
-
-
-_logger = logging.getLogger('node.sync_files')
-
-
-def merge(files_path, packet):
- files_path = files_path.rstrip(os.sep)
- if not exists(files_path):
- os.makedirs(files_path)
- commit_seq = None
-
- for record in packet:
- op = record.get('op')
- if op == 'update':
- path = join(files_path, record['path'])
- if not exists(dirname(path)):
- os.makedirs(dirname(path))
- with toolkit.new_file(path) as f:
- copyfileobj(record['blob'], f)
- elif op == 'delete':
- path = join(files_path, record['path'])
- if lexists(path):
- os.unlink(path)
- elif op == 'commit':
- commit_seq = record['sequence']
-
- return commit_seq
-
-
-class Index(object):
-
- def __init__(self, files_path, index_path, seqno):
- self._files_path = files_path.rstrip(os.sep)
- self._index_path = index_path
- self._seqno = seqno
- self._index = []
- self._stamp = 0
- self._mutex = coroutine.Lock()
-
- if exists(self._index_path):
- with file(self._index_path) as f:
- self._index, self._stamp = json.load(f)
-
- if not exists(self._files_path):
- os.makedirs(self._files_path)
-
- def sync(self):
- with self._mutex:
- return self._sync()
-
- def diff(self, in_seq, out_seq=None, **kwargs):
- if out_seq is None:
- out_seq = toolkit.Sequence([])
- is_initial_diff = not out_seq
-
- # Below calls will trigger coroutine switches, thius,
- # avoid changing `self._index` by different coroutines.
- with self._mutex:
- self._sync()
-
- _logger.debug('Start sync: in_seq=%r', in_seq)
-
- files = 0
- deleted = 0
- pos = 0
-
- try:
- for start, end in in_seq:
- pos = bisect_left(self._index, [start, None, None], pos)
- for pos, (seqno, path, mtime) in \
- enumerate(self._index[pos:]):
- if end is not None and seqno > end:
- break
- coroutine.dispatch()
- if mtime < 0:
- yield {'op': 'delete', 'path': path}
- deleted += 1
- else:
- blob_path = join(self._files_path, path)
- yield {'op': 'update',
- 'path': path,
- 'blob_size': os.stat(blob_path).st_size,
- 'blob': toolkit.iter_file(blob_path),
- }
- out_seq.include(start, seqno)
- start = seqno
- files += 1
- except StopIteration:
- pass
-
- if is_initial_diff:
- # There is only one diff, so, we can stretch it to remove holes
- out_seq.stretch()
- yield {'op': 'commit', 'sequence': out_seq}
-
- _logger.debug('Stop sync: in_seq=%r out_seq=%r updates=%r deletes=%r',
- in_seq, out_seq, files, deleted)
-
- def _sync(self):
- if os.stat(self._files_path).st_mtime <= self._stamp:
- return False
-
- new_files = set()
- updates = 0
- deletes = 0
-
- # Populate list of new files at first
- for root, __, files in os.walk(self._files_path):
- coroutine.dispatch()
- rel_root = relpath(root, self._files_path)
- if rel_root == '.':
- rel_root = ''
- else:
- rel_root += os.sep
- for filename in files:
- coroutine.dispatch()
- path = join(root, filename)
- if os.lstat(path).st_mtime > self._stamp:
- new_files.add(rel_root + filename)
-
- # Check for updates for already tracked files
- tail = []
- for pos, (__, rel_path, mtime) in enumerate(self._index[:]):
- coroutine.dispatch()
- path = join(self._files_path, rel_path)
- existing = lexists(path)
- if existing == (mtime >= 0) and \
- (not existing or os.lstat(path).st_mtime == mtime):
- continue
- if existing:
- new_files.discard(rel_path)
- pos -= len(tail)
- self._index = self._index[:pos] + self._index[pos + 1:]
- tail.append([
- self._seqno.next(),
- rel_path,
- int(os.lstat(path).st_mtime) if existing else -1,
- ])
- if existing:
- updates += 1
- else:
- deletes += 1
- self._index.extend(tail)
-
- _logger.debug('Updated %r index: new=%r updates=%r deletes=%r',
- self._files_path, len(self._files_path), updates, deletes)
-
- # Finally, add new files
- for rel_path in sorted(new_files):
- coroutine.dispatch()
- mtime = os.lstat(join(self._files_path, rel_path)).st_mtime
- self._index.append([self._seqno.next(), rel_path, mtime])
-
- self._stamp = os.stat(self._files_path).st_mtime
- if self._seqno.commit():
- with toolkit.new_file(self._index_path) as f:
- json.dump((self._index, self._stamp), f)
-
- return True
diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py
index c7c22e0..61d32fb 100644
--- a/sugar_network/node/master.py
+++ b/sugar_network/node/master.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2013 Aleksey Lim
+# Copyright (C) 2013-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -13,17 +13,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import json
-import base64
import logging
-from Cookie import SimpleCookie
-from os.path import join
+from urlparse import urlsplit
-from sugar_network import node, toolkit
-from sugar_network.node import sync, stats_user, files, model, downloads, obs
+from sugar_network import toolkit
+from sugar_network.node import obs, master_api
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
-from sugar_network.toolkit import http, enforce
+from sugar_network.toolkit.coroutine import this
+from sugar_network.toolkit import http, parcel, pylru, ranges, enforce
RESOURCES = (
@@ -33,215 +31,107 @@ RESOURCES = (
'sugar_network.model.user',
)
-_ONE_WAY_DOCUMENTS = ['report']
-
_logger = logging.getLogger('node.master')
class MasterRoutes(NodeRoutes):
- def __init__(self, guid, volume, **kwargs):
- NodeRoutes.__init__(self, guid, volume=volume, **kwargs)
-
- self._pulls = {
- 'pull': lambda **kwargs:
- ('diff', None, model.diff(self.volume,
- ignore_documents=_ONE_WAY_DOCUMENTS, **kwargs)),
- 'files_pull': lambda **kwargs:
- ('files_diff', None, self._files.diff(**kwargs)),
- }
-
- self._pull_queue = downloads.Pool(
- join(toolkit.cachedir.value, 'pulls'))
- self._files = None
-
- if node.files_root.value:
- self._files = files.Index(node.files_root.value,
- join(volume.root, 'files.index'), volume.seqno)
-
- @route('POST', cmd='sync',
- acl=ACL.AUTH)
- def sync(self, request):
- reply, cookie = self._push(sync.decode(request.content_stream))
- exclude_seq = None
- if len(cookie.sent) == 1:
- exclude_seq = cookie.sent.values()[0]
- for op, layer, seq in cookie:
- reply.append(self._pulls[op](in_seq=seq,
- exclude_seq=exclude_seq, layer=layer))
- return sync.encode(reply, src=self.guid)
+ def __init__(self, **kwargs):
+ NodeRoutes.__init__(self, urlsplit(master_api.value).netloc, **kwargs)
+ self._pulls = pylru.lrucache(1024)
+
+ @route('POST', cmd='sync', arguments={'accept_length': int})
+ def sync(self, accept_length):
+ return parcel.encode(self._push() + (self._pull() or []),
+ limit=accept_length, header={'from': self.guid},
+ on_complete=this.cookie.clear)
@route('POST', cmd='push')
- def push(self, request, response):
- reply, cookie = self._push(sync.package_decode(request.content_stream))
- # Read passed cookie only after excluding `merged_seq`.
- # If there is `pull` out of currently pushed packet, excluding
- # `merged_seq` should not affect it.
- cookie.update(_Cookie(request))
- cookie.store(response)
- return sync.package_encode(reply, src=self.guid)
-
- @route('GET', cmd='pull',
- mime_type='application/octet-stream',
- arguments={'accept_length': int})
- def pull(self, request, response, accept_length=None):
- cookie = _Cookie(request)
- if not cookie:
- _logger.warning('Requested full dump in pull command')
- cookie.append(('pull', None, toolkit.Sequence([[1, None]])))
- cookie.append(('files_pull', None, toolkit.Sequence([[1, None]])))
-
- exclude_seq = None
- if len(cookie.sent) == 1:
- exclude_seq = toolkit.Sequence(cookie.sent.values()[0])
-
- reply = None
- for pull_key in cookie:
- op, layer, seq = pull_key
-
- pull = self._pull_queue.get(pull_key)
- if pull is not None:
- if not pull.ready:
- continue
- if not pull.tag:
- self._pull_queue.remove(pull_key)
- cookie.remove(pull_key)
- continue
- if accept_length is None or pull.length <= accept_length:
- _logger.debug('Found ready to use %r', pull)
- if pull.complete:
- cookie.remove(pull_key)
- else:
- seq.exclude(pull.tag)
- reply = pull.open()
- break
- _logger.debug('Existing %r is too big, will recreate', pull)
- self._pull_queue.remove(pull_key)
-
- out_seq = toolkit.Sequence()
- pull = self._pull_queue.set(pull_key, out_seq,
- sync.sneakernet_encode,
- [self._pulls[op](in_seq=seq, out_seq=out_seq,
- exclude_seq=exclude_seq, layer=layer,
- fetch_blobs=True)],
- limit=accept_length, src=self.guid)
- _logger.debug('Start new %r', pull)
+ def push(self):
+ return parcel.encode(self._push(), header={'from': self.guid})
+ @route('GET', cmd='pull', arguments={'accept_length': int})
+ def pull(self, accept_length):
+ reply = self._pull()
if reply is None:
- if cookie:
- _logger.debug('No ready pulls')
- # TODO Might be useful to set meaningful value here
- cookie.delay = node.pull_timeout.value
- else:
- _logger.debug('Nothing to pull')
-
- cookie.store(response)
- return reply
+ return None
+ return parcel.encode(reply, limit=accept_length,
+ header={'from': self.guid}, on_complete=this.cookie.clear)
@route('PUT', ['context', None], cmd='presolve',
acl=ACL.AUTH, mime_type='application/json')
def presolve(self, request):
- enforce(node.files_root.value, http.BadRequest, 'Disabled')
- aliases = self.volume['context'].get(request.guid)['aliases']
+ aliases = this.volume['context'].get(request.guid)['aliases']
enforce(aliases, http.BadRequest, 'Nothing to presolve')
- return obs.presolve(None, aliases, node.files_root.value)
+ return obs.presolve(None, aliases, this.volume.blobs.path('packages'))
def status(self):
result = NodeRoutes.status(self)
- result['level'] = 'master'
+ result['mode'] = 'master'
return result
- def _push(self, stream):
+ def _push(self):
+ cookie = this.cookie
reply = []
- cookie = _Cookie()
-
- for packet in stream:
- src = packet['src']
- enforce(packet['dst'] == self.guid, 'Misaddressed packet')
-
- if packet.name == 'pull':
- pull_seq = cookie['pull', packet['layer'] or None]
- pull_seq.include(packet['sequence'])
- cookie.sent.setdefault(src, toolkit.Sequence())
- elif packet.name == 'files_pull':
- if self._files is not None:
- cookie['files_pull'].include(packet['sequence'])
- elif packet.name == 'diff':
- seq, ack_seq = model.merge(self.volume, packet)
- reply.append(('ack', {
- 'ack': ack_seq,
- 'sequence': seq,
- 'dst': src,
- }, None))
- sent_seq = cookie.sent.setdefault(src, toolkit.Sequence())
- sent_seq.include(ack_seq)
- elif packet.name == 'stats_diff':
- reply.append(('stats_ack', {
- 'sequence': stats_user.merge(packet),
- 'dst': src,
- }, None))
-
- return reply, cookie
-
-
-class _Cookie(list):
-
- def __init__(self, request=None):
- list.__init__(self)
-
- self.sent = {}
- self.delay = 0
-
- if request is not None:
- self.update(self._get_cookie(request, 'sugar_network_pull') or [])
- self.sent = self._get_cookie(request, 'sugar_network_sent') or {}
-
- def __repr__(self):
- return '<Cookie pull=%s sent=%r>' % (list.__repr__(self), self.sent)
-
- def update(self, that):
- for op, layer, seq in that:
- self[op, layer].include(seq)
-
- def store(self, response):
- response.set('set-cookie', [])
- if self:
- _logger.debug('Postpone %r in cookie', self)
- self._set_cookie(response, 'sugar_network_pull',
- base64.b64encode(json.dumps(self)))
- self._set_cookie(response, 'sugar_network_sent',
- base64.b64encode(json.dumps(self.sent)))
- self._set_cookie(response, 'sugar_network_delay', self.delay)
+
+ for packet in parcel.decode(
+ this.request.content_stream, this.request.content_length):
+ sender = packet['from']
+ enforce(packet['to'] == self.guid, http.BadRequest,
+ 'Misaddressed packet')
+ if packet.name == 'push':
+ seqno, push_r = this.volume.patch(packet)
+ ack_r = [] if seqno is None else [[seqno, seqno]]
+ ack = {'ack': ack_r, 'ranges': push_r, 'to': sender}
+ reply.append(('ack', ack, None))
+ cookie.setdefault('ack', {}) \
+ .setdefault(sender, []) \
+ .append((push_r, ack_r))
+ elif packet.name == 'pull':
+ cookie.setdefault('ack', {}).setdefault(sender, [])
+ ranges.include(cookie.setdefault('pull', []), packet['ranges'])
+ elif packet.name == 'request':
+ cookie.setdefault('request', []).append(packet.header)
+
+ return reply
+
+ def _pull(self):
+ processed = this.cookie.get('id')
+ if processed in self._pulls:
+ cookie = this.cookie = self._pulls[processed]
+ if not cookie:
+ return None
else:
- self._unset_cookie(response, 'sugar_network_pull')
- self._unset_cookie(response, 'sugar_network_sent')
- self._unset_cookie(response, 'sugar_network_delay')
-
- def __getitem__(self, key):
- if not isinstance(key, tuple):
- key = (key, None)
- for op, layer, seq in self:
- if (op, layer) == key:
- return seq
- seq = toolkit.Sequence()
- self.append(key + (seq,))
- return seq
-
- def _get_cookie(self, request, name):
- cookie_str = request.environ.get('HTTP_COOKIE')
- if not cookie_str:
- return
- cookie = SimpleCookie()
- cookie.load(cookie_str)
- if name not in cookie:
- return
- value = cookie.get(name).value
- if value != 'unset_%s' % name:
- return json.loads(base64.b64decode(value))
-
- def _set_cookie(self, response, name, value, age=3600):
- cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, value, age)
- response.get('set-cookie').append(cookie)
-
- def _unset_cookie(self, response, name):
- self._set_cookie(response, name, 'unset_%s' % name, 0)
+ cookie = this.cookie
+ cookie['id'] = toolkit.uuid()
+ self._pulls[cookie['id']] = cookie
+
+ pull_r = cookie.get('pull')
+ if not pull_r:
+ return []
+
+ reply = []
+ exclude = []
+ acks = cookie.get('ack')
+ if acks:
+ acked = {}
+ for req in cookie.get('request') or []:
+ ack_r = None
+ for push_r, ack_r in acks.get(req['origin']) or []:
+ if req['ranges'] == push_r:
+ break
+ else:
+ continue
+ ranges.include(acked.setdefault(req['from'], []), ack_r)
+ reply.append(('ack', {'to': req['from'], 'ack': ack_r}, []))
+ for node, ack_ranges in acks.items():
+ acked_r = acked.setdefault(node, [])
+ for __, i in ack_ranges:
+ ranges.include(acked_r, i)
+ r = reduce(lambda x, y: ranges.intersect(x, y), acked.values())
+ ranges.include(exclude, r)
+
+ push = this.volume.diff(pull_r, exclude, one_way=True, files=[''])
+ reply.append(('push', None, push))
+
+ return reply
diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py
index 9ead01c..90e50c2 100644
--- a/sugar_network/node/model.py
+++ b/sugar_network/node/model.py
@@ -16,7 +16,7 @@
import bisect
import logging
-from sugar_network import db, toolkit
+from sugar_network import db
from sugar_network.model import Release, context as base_context
from sugar_network.node import obs
from sugar_network.toolkit.router import ACL
@@ -105,71 +105,6 @@ class Context(base_context.Context):
return value
-def diff(volume, in_seq, out_seq=None, exclude_seq=None, layer=None,
- ignore_documents=None, **kwargs):
- if out_seq is None:
- out_seq = toolkit.Sequence([])
- is_the_only_seq = not out_seq
- if layer:
- if isinstance(layer, basestring):
- layer = [layer]
- layer.append('common')
- try:
- for resource, directory in volume.items():
- if ignore_documents and resource in ignore_documents:
- continue
- coroutine.dispatch()
- directory.commit()
- yield {'resource': resource}
- for guid, patch in directory.diff(in_seq, exclude_seq,
- layer=layer if resource == 'context' else None):
- adiff = {}
- adiff_seq = toolkit.Sequence()
- for prop, meta, seqno in patch:
- adiff[prop] = meta
- adiff_seq.include(seqno, seqno)
- if adiff:
- yield {'guid': guid, 'diff': adiff}
- out_seq.include(adiff_seq)
- if is_the_only_seq:
- # There is only one diff, so, we can stretch it to remove all holes
- out_seq.stretch()
- except StopIteration:
- pass
-
- yield {'commit': out_seq}
-
-
-def merge(volume, records):
- directory = None
- commit_seq = toolkit.Sequence()
- merged_seq = toolkit.Sequence()
- synced = False
-
- for record in records:
- resource_ = record.get('resource')
- if resource_:
- directory = volume[resource_]
- continue
-
- if 'guid' in record:
- seqno, merged = directory.merge(**record)
- synced = synced or merged
- if seqno is not None:
- merged_seq.include(seqno, seqno)
- continue
-
- commit = record.get('commit')
- if commit is not None:
- commit_seq.include(commit)
- continue
-
- if synced:
- this.broadcast({'event': 'sync'})
-
- return commit_seq, merged_seq
-
-
def solve(volume, top_context, lsb_id=None, lsb_release=None,
stability=None, requires=None):
top_context = volume['context'][top_context]
diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py
index 6d5d200..86e4ce1 100644
--- a/sugar_network/node/routes.py
+++ b/sugar_network/node/routes.py
@@ -18,14 +18,13 @@ import time
import logging
import hashlib
from ConfigParser import ConfigParser
-from os.path import join, isdir, exists
+from os.path import join, exists
-from sugar_network import db, node, toolkit
-from sugar_network.db import blobs
+from sugar_network import db, node
from sugar_network.model import FrontRoutes, load_bundle
-from sugar_network.node import stats_user, model
+from sugar_network.node import model
# pylint: disable-msg=W0611
-from sugar_network.toolkit.router import route, preroute, postroute, ACL
+from sugar_network.toolkit.router import route, preroute, postroute, ACL, File
from sugar_network.toolkit.router import Unauthorized, Request, fallbackroute
from sugar_network.toolkit.spec import parse_requires, parse_version
from sugar_network.toolkit.bundle import Bundle
@@ -53,10 +52,6 @@ class NodeRoutes(db.Routes, FrontRoutes):
def guid(self):
return self._guid
- @route('GET', cmd='logon', acl=ACL.AUTH)
- def logon(self):
- pass
-
@route('GET', cmd='whoami', mime_type='application/json')
def whoami(self, request, response):
roles = []
@@ -66,7 +61,7 @@ class NodeRoutes(db.Routes, FrontRoutes):
@route('GET', cmd='status', mime_type='application/json')
def status(self):
- return {'guid': self._guid,
+ return {'guid': self.guid,
'seqno': {
'db': self.volume.seqno.value,
'releases': self.volume.releases_seqno.value,
@@ -80,49 +75,39 @@ class NodeRoutes(db.Routes, FrontRoutes):
@fallbackroute('GET', ['packages'])
def route_packages(self, request, response):
- enforce(node.files_root.value, http.BadRequest, 'Disabled')
-
- if request.path and request.path[-1] == 'updates':
- root = join(node.files_root.value, *request.path[:-1])
- enforce(isdir(root), http.NotFound, 'Directory was not found')
+ path = this.request.path
+ if path and path[-1] == 'updates':
result = []
last_modified = 0
- for filename in os.listdir(root):
- if '.' in filename:
+ for blob in this.volume.blobs.diff(
+ [[this.request.if_modified_since + 1, None]],
+ join(*path[:-1]), recursive=False):
+ if '.' in blob.name:
continue
- path = join(root, filename)
- mtime = int(os.stat(path).st_mtime)
- if mtime > request.if_modified_since:
- result.append(filename)
- last_modified = max(last_modified, mtime)
+ result.append(blob.name)
+ last_modified = max(last_modified, blob.mtime)
response.content_type = 'application/json'
if last_modified:
response.last_modified = last_modified
return result
- path = join(node.files_root.value, *request.path)
- enforce(exists(path), http.NotFound, 'File was not found')
- if not isdir(path):
- return toolkit.iter_file(path)
-
- result = []
- for filename in os.listdir(path):
- if filename.endswith('.rpm') or filename.endswith('.deb'):
- continue
- result.append(filename)
-
- response.content_type = 'application/json'
- return result
+ blob = this.volume.blobs.get(join(*path))
+ if isinstance(blob, File):
+ return blob
+ else:
+ response.content_type = 'application/json'
+ return [i.name for i in blob if '.' not in i.name]
@route('POST', ['context'], cmd='submit',
arguments={'initial': False},
mime_type='application/json', acl=ACL.AUTH)
- def submit_release(self, request, initial):
- blob = blobs.post(request.content_stream, request.content_type)
+ def submit_release(self, initial):
+ blob = this.volume.blobs.post(
+ this.request.content_stream, this.request.content_type)
try:
context, release = load_bundle(blob, initial=initial)
except Exception:
- blobs.delete(blob.digest)
+ this.volume.blobs.delete(blob.digest)
raise
this.call(method='POST', path=['context', context, 'releases'],
content_type='application/json', content=release)
@@ -156,29 +141,7 @@ class NodeRoutes(db.Routes, FrontRoutes):
arguments={'requires': list})
def get_clone(self, request, response):
solution = self.solve(request)
- return blobs.get(solution[request.guid]['blob'])
-
- @route('GET', ['user', None], cmd='stats-info',
- mime_type='application/json', acl=ACL.AUTH)
- def user_stats_info(self, request):
- status = {}
- for rdb in stats_user.get_rrd(request.guid):
- status[rdb.name] = rdb.last + stats_user.stats_user_step.value
-
- # TODO Process client configuration in more general manner
- return {'enable': True,
- 'step': stats_user.stats_user_step.value,
- 'rras': ['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'],
- 'status': status,
- }
-
- @route('POST', ['user', None], cmd='stats-upload', acl=ACL.AUTH)
- def user_stats_upload(self, request):
- name = request.content['name']
- values = request.content['values']
- rrd = stats_user.get_rrd(request.guid)
- for timestamp, values in values:
- rrd[name].put(values, timestamp)
+ return this.volume.blobs.get(solution[request.guid]['blob'])
@preroute
def preroute(self, op, request, response):
@@ -204,7 +167,7 @@ class NodeRoutes(db.Routes, FrontRoutes):
def on_create(self, request, props):
if request.resource == 'user':
- with file(blobs.get(props['pubkey']).path) as f:
+ with file(this.volume.blobs.get(props['pubkey']).path) as f:
props['guid'] = str(hashlib.sha1(f.read()).hexdigest())
db.Routes.on_create(self, request, props)
@@ -223,7 +186,7 @@ class NodeRoutes(db.Routes, FrontRoutes):
from M2Crypto import RSA
pubkey = self.volume['user'][auth.login]['pubkey']
- key = RSA.load_pub_key(blobs.get(pubkey).path)
+ key = RSA.load_pub_key(this.volume.blobs.get(pubkey).path)
data = hashlib.sha1('%s:%s' % (auth.login, auth.nonce)).digest()
enforce(key.verify(data, auth.signature.decode('hex')),
http.Forbidden, 'Bad credentials')
diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py
index 2d60ea8..333e6ea 100644
--- a/sugar_network/node/slave.py
+++ b/sugar_network/node/slave.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012-2013 Aleksey Lim
+# Copyright (C) 2012-2014 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -21,13 +21,12 @@ from urlparse import urlsplit
from os.path import join, dirname, exists, isabs
from gettext import gettext as _
-from sugar_network import node, toolkit
-from sugar_network.client import api_url
-from sugar_network.node import sync, stats_user, files, model
+from sugar_network import toolkit
+from sugar_network.node import master_api
from sugar_network.node.routes import NodeRoutes
from sugar_network.toolkit.router import route, ACL
from sugar_network.toolkit.coroutine import this
-from sugar_network.toolkit import http, enforce
+from sugar_network.toolkit import http, parcel, ranges, enforce
_logger = logging.getLogger('node.slave')
@@ -35,148 +34,94 @@ _logger = logging.getLogger('node.slave')
class SlaveRoutes(NodeRoutes):
- def __init__(self, key_path, volume_):
- self._auth = http.SugarAuth(key_path)
- NodeRoutes.__init__(self, self._auth.login, volume_)
-
- self._push_seq = toolkit.PersistentSequence(
- join(volume_.root, 'push.sequence'), [1, None])
- self._pull_seq = toolkit.PersistentSequence(
- join(volume_.root, 'pull.sequence'), [1, None])
- self._files_seq = toolkit.PersistentSequence(
- join(volume_.root, 'files.sequence'), [1, None])
- self._master_guid = urlsplit(api_url.value).netloc
- self._offline_session = None
-
- @route('POST', cmd='online-sync', acl=ACL.LOCAL)
+ def __init__(self, volume, **kwargs):
+ self._creds = http.SugarAuth(
+ join(volume.root, 'etc', 'private', 'node'))
+ NodeRoutes.__init__(self, self._creds.login, volume=volume, **kwargs)
+ vardir = join(volume.root, 'var')
+ self._push_r = toolkit.Bin(join(vardir, 'push.ranges'), [[1, None]])
+ self._pull_r = toolkit.Bin(join(vardir, 'pull.ranges'), [[1, None]])
+ self._master_guid = urlsplit(master_api.value).netloc
+
+ @route('POST', cmd='online_sync', acl=ACL.LOCAL,
+ arguments={'no_pull': bool})
def online_sync(self, no_pull=False):
- conn = http.Connection(api_url.value, auth=self._auth)
-
- # TODO `http.Connection` should handle re-POSTing without
- # loosing payload after authentication
- conn.get(cmd='logon')
-
- push = [('diff', None, model.diff(self.volume, self._push_seq))]
- if not no_pull:
- push.extend([
- ('pull', {
- 'sequence': self._pull_seq,
- 'layer': node.sync_layers.value,
- }, None),
- ('files_pull', {'sequence': self._files_seq}, None),
- ])
- if stats_user.stats_user.value:
- push.append(('stats_diff', None, stats_user.diff()))
+ self._export(not no_pull)
+ conn = http.Connection(master_api.value)
response = conn.request('POST',
- data=sync.encode(push, src=self.guid, dst=self._master_guid),
+ data=parcel.encode(self._export(not no_pull), header={
+ 'from': self.guid,
+ 'to': self._master_guid,
+ }),
params={'cmd': 'sync'},
headers={'Transfer-Encoding': 'chunked'})
- self._import(sync.decode(response.raw), None)
+ self._import(parcel.decode(response.raw))
- @route('POST', cmd='offline-sync', acl=ACL.LOCAL)
+ @route('POST', cmd='offline_sync', acl=ACL.LOCAL)
def offline_sync(self, path):
- enforce(node.sync_layers.value,
- '--sync-layers is not specified, the full master dump '
- 'might be too big and should be limited')
- enforce(isabs(path), 'Argument \'path\' should be an absolute path')
-
- _logger.debug('Start %r synchronization session in %r',
- self._offline_session, path)
+ enforce(isabs(path), "Argument 'path' is not an absolute path")
+ _logger.debug('Start offline synchronization in %r', path)
if not exists(path):
os.makedirs(path)
- try:
- self._offline_session = self._offline_sync(path,
- **(self._offline_session or {}))
- except Exception:
- toolkit.exception(_logger, 'Failed to complete synchronization')
- self._offline_session = None
- raise
-
- if self._offline_session is None:
- _logger.debug('Synchronization completed')
- else:
- _logger.debug('Postpone synchronization with %r session',
- self._offline_session)
-
- def status(self):
- result = NodeRoutes.status(self)
- result['level'] = 'slave'
- return result
-
- def _offline_sync(self, path, push_seq=None, stats_seq=None, session=None):
- push = []
-
- if push_seq is None:
- push_seq = toolkit.Sequence(self._push_seq)
- if stats_seq is None:
- stats_seq = {}
- if session is None:
- session = toolkit.uuid()
- push.append(('pull', {
- 'sequence': self._pull_seq,
- 'layer': node.sync_layers.value,
- }, None))
- push.append(('files_pull', {'sequence': self._files_seq}, None))
-
this.broadcast({
'event': 'sync_progress',
'progress': _('Reading sneakernet packages'),
})
- self._import(sync.sneakernet_decode(path), push_seq)
-
- offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync')
- if exists(offline_script):
- shutil.copy(offline_script, path)
+ requests = self._import(parcel.decode_dir(path))
this.broadcast({
'event': 'sync_progress',
'progress': _('Generating new sneakernet package'),
})
+ offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync')
+ if exists(offline_script):
+ shutil.copy(offline_script, path)
+ parcel.encode_dir(requests + self._export(True), root=path, header={
+ 'from': self.guid,
+ 'to': self._master_guid,
+ })
+
+ _logger.debug('Synchronization completed')
+
+ def status(self):
+ result = NodeRoutes.status(self)
+ result['mode'] = 'slave'
+ return result
+
+ def _import(self, package):
+ requests = []
- diff_seq = toolkit.Sequence([])
- push.append(('diff', None,
- model.diff(self.volume, push_seq, diff_seq)))
- if stats_user.stats_user.value:
- push.append(('stats_diff', None, stats_user.diff(stats_seq)))
- complete = sync.sneakernet_encode(push, root=path,
- src=self.guid, dst=self._master_guid, api_url=api_url.value,
- session=session)
- if not complete:
- push_seq.exclude(diff_seq)
- return {'push_seq': push_seq,
- 'stats_seq': stats_seq,
- 'session': session,
- }
-
- def _import(self, package, push_seq):
for packet in package:
- from_master = (packet['src'] == self._master_guid)
- addressed = (packet['dst'] == self.guid)
-
- if packet.name == 'diff':
- _logger.debug('Processing %r', packet)
- seq, __ = model.merge(self.volume, packet, shift_seqno=False)
- if from_master and seq:
- self._pull_seq.exclude(seq)
- self._pull_seq.commit()
-
- elif from_master:
- if packet.name == 'ack' and addressed:
- _logger.debug('Processing %r', packet)
- if push_seq:
- push_seq.exclude(packet['sequence'])
- self._pull_seq.exclude(packet['ack'])
- self._pull_seq.commit()
- self._push_seq.exclude(packet['sequence'])
- self._push_seq.commit()
- elif packet.name == 'stats_ack' and addressed:
- _logger.debug('Processing %r', packet)
- stats_user.commit(packet['sequence'])
- elif packet.name == 'files_diff':
- _logger.debug('Processing %r', packet)
- seq = files.merge(node.files_root.value, packet)
- if seq:
- self._files_seq.exclude(seq)
- self._files_seq.commit()
+ sender = packet['from']
+ from_master = (sender == self._master_guid)
+ if packet.name == 'push':
+ seqno, committed = this.volume.patch(packet)
+ if seqno is not None:
+ if from_master:
+ ranges.exclude(self._pull_r.value, committed)
+ self._pull_r.commit()
+ else:
+ requests.append(('request', {
+ 'origin': sender,
+ 'ranges': committed,
+ }, []))
+ ranges.exclude(self._push_r.value, seqno, seqno)
+ self._push_r.commit()
+ elif packet.name == 'ack' and from_master and \
+ packet['to'] == self.guid:
+ ranges.exclude(self._pull_r.value, packet['ack'])
+ self._pull_r.commit()
+ if packet['ranges']:
+ ranges.exclude(self._push_r.value, packet['ranges'])
+ self._push_r.commit()
+
+ return requests
+
+ def _export(self, pull):
+ export = []
+ if pull:
+ export.append(('pull', {'ranges': self._pull_r.value}, None))
+ export.append(('push', None, self.volume.diff(self._push_r.value)))
+ return export
diff --git a/sugar_network/node/stats_user.py b/sugar_network/node/stats_user.py
deleted file mode 100644
index 0b96449..0000000
--- a/sugar_network/node/stats_user.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# Copyright (C) 2012-2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import logging
-from os.path import join, exists, isdir
-
-from sugar_network import node, toolkit
-from sugar_network.toolkit.rrd import Rrd
-from sugar_network.toolkit import Option, pylru, enforce
-
-
-stats_user = Option(
- 'accept personalized users statistics',
- default=False, type_cast=Option.bool_cast, action='store_true')
-
-stats_user_step = Option(
- 'step interval in seconds for users\' RRD databases',
- default=60, type_cast=int)
-
-stats_user_rras = Option(
- 'comma separated list of RRAs for users\' RRD databases',
- default=[
- 'RRA:AVERAGE:0.5:1:4320', # one day with 60s step
- 'RRA:AVERAGE:0.5:5:2016', # one week with 5min step
- ],
- type_cast=Option.list_cast, type_repr=Option.list_repr)
-
-_logger = logging.getLogger('node.stats_user')
-_user_cache = pylru.lrucache(32)
-
-
-def get_rrd(user):
- if user in _user_cache:
- return _user_cache[user]
- else:
- rrd = _user_cache[user] = Rrd(_rrd_path(user),
- stats_user_step.value, stats_user_rras.value)
- return rrd
-
-
-def diff(in_info=None):
- if in_info is None:
- in_info = {}
- out_info = {}
-
- try:
- for user, rrd in _walk_rrd(join(node.stats_root.value, 'user')):
- in_info.setdefault(user, {})
- out_info.setdefault(user, {})
-
- for db in rrd:
- yield {'db': db.name, 'user': user}
-
- in_seq = in_info[user].get(db.name)
- if in_seq is None:
- in_seq = toolkit.PersistentSequence(
- join(rrd.root, db.name + '.push'), [1, None])
- in_info[user][db.name] = in_seq
- elif in_seq is not toolkit.Sequence:
- in_seq = in_info[user][db.name] = toolkit.Sequence(in_seq)
- out_seq = out_info[user].setdefault(db.name,
- toolkit.Sequence())
-
- for start, end in in_seq:
- for timestamp, values in \
- db.get(max(start, db.first), end or db.last):
- yield {'timestamp': timestamp, 'values': values}
- in_seq.exclude(start, timestamp)
- out_seq.include(start, timestamp)
- start = timestamp
- except StopIteration:
- pass
-
- yield {'commit': out_info}
-
-
-def merge(packet):
- db = None
- seq = None
-
- for record in packet:
- if 'db' in record:
- db = get_rrd(record['user'])[record['db']]
- elif 'commit' in record:
- seq = record['commit']
- else:
- enforce(db is not None, 'Malformed user stats diff')
- db.put(record['values'], record['timestamp'])
-
- return seq
-
-
-def commit(info):
- for user, dbs in info.items():
- for db_name, merged in dbs.items():
- seq = toolkit.PersistentSequence(
- _rrd_path(user, db_name + '.push'), [1, None])
- seq.exclude(merged)
- seq.commit()
-
-
-def _walk_rrd(root):
- if not exists(root):
- return
- for users_dirname in os.listdir(root):
- users_dir = join(root, users_dirname)
- if not isdir(users_dir):
- continue
- for user in os.listdir(users_dir):
- yield user, Rrd(join(users_dir, user), stats_user_step.value)
-
-
-def _rrd_path(user, *args):
- return join(node.stats_root.value, 'user', user[:2], user, *args)
diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py
index d3d9b88..89a9d0f 100644
--- a/sugar_network/toolkit/__init__.py
+++ b/sugar_network/toolkit/__init__.py
@@ -523,7 +523,7 @@ class Bin(object):
json.dump(self.value, f)
f.flush()
os.fsync(f.fileno())
- self._orig_value = self.value
+ self._orig_value = deepcopy(self.value)
return True
def __enter__(self):
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index d280035..254e6f3 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -112,8 +112,8 @@ class Connection(object):
_Session = None
- def __init__(self, api_url='', auth=None, max_retries=0, **session_args):
- self.api_url = api_url
+ def __init__(self, api='', auth=None, max_retries=0, **session_args):
+ self.api = api
self.auth = auth
self._max_retries = max_retries
self._session_args = session_args
@@ -121,7 +121,7 @@ class Connection(object):
self._nonce = None
def __repr__(self):
- return '<Connection api_url=%s>' % self.api_url
+ return '<Connection api=%s>' % self.api
def __enter__(self):
return self
@@ -203,7 +203,7 @@ class Connection(object):
if not path:
path = ['']
if not isinstance(path, basestring):
- path = '/'.join([i.strip('/') for i in [self.api_url] + path])
+ path = '/'.join([i.strip('/') for i in [self.api] + path])
try_ = 0
while True:
@@ -283,7 +283,7 @@ class Connection(object):
break
path = reply.headers['location']
if path.startswith('/'):
- path = self.api_url + path
+ path = self.api + path
if request.method != 'HEAD':
if reply.headers.get('Content-Type') == 'application/json':
@@ -380,7 +380,7 @@ class SugarAuth(object):
key_dir = dirname(self._key_path)
if exists(self._key_path):
- if os.stat(key_dir) & 077:
+ if os.stat(key_dir).st_mode & 077:
os.chmod(key_dir, 0700)
self._key = RSA.load_key(self._key_path)
return
@@ -432,7 +432,7 @@ class _Subscription(object):
if try_ == 0:
raise
toolkit.exception('Failed to read from %r subscription, '
- 'will resubscribe', self._client.api_url)
+ 'will resubscribe', self._client.api)
self._content = None
return _parse_event(line)
@@ -441,7 +441,7 @@ class _Subscription(object):
return self._content
params.update(self._condition)
params['cmd'] = 'subscribe'
- _logger.debug('Subscribe to %r, %r', self._client.api_url, params)
+ _logger.debug('Subscribe to %r, %r', self._client.api, params)
response = self._client.request('GET', params=params)
self._content = response.raw
return self._content
diff --git a/sugar_network/toolkit/rrd.py b/sugar_network/toolkit/rrd.py
deleted file mode 100644
index bafe6d1..0000000
--- a/sugar_network/toolkit/rrd.py
+++ /dev/null
@@ -1,296 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""Convenient access to RRD databases."""
-
-import re
-import os
-import time
-import json
-import bisect
-import logging
-from os.path import exists, join, splitext
-
-
-_DB_FILENAME_RE = re.compile('(.*?)(-[0-9]+){0,1}\\.rrd$')
-_INFO_RE = re.compile('([^[]+)\\[([^]]+)\\]\\.(.*)$')
-
-_FETCH_PAGE = 256
-
-_logger = logging.getLogger('rrd')
-_rrdtool = None
-
-
-class Rrd(object):
-
- def __init__(self, root, step, rras=None):
- global _rrdtool
-
- import rrdtool
- _rrdtool = rrdtool
-
- self._root = root
- self._step = step
- # rrdtool knows nothing about `unicode`
- self._rras = [i.encode('utf8') for i in rras or []]
- self._dbsets = {}
-
- if not exists(self._root):
- os.makedirs(self._root)
-
- for filename in os.listdir(self._root):
- match = _DB_FILENAME_RE.match(filename)
- if match is not None:
- name, revision = match.groups()
- self.get(name).load(filename, int(revision or 0))
-
- def __iter__(self):
- for i in self._dbsets.values():
- yield i
-
- def __getitem__(self, name):
- return self.get(name)
-
- def __contains__(self, name):
- return name in self._dbsets
-
- @property
- def root(self):
- return self._root
-
- @property
- def step(self):
- return self._step
-
- def get(self, name):
- db = self._dbsets.get(name)
- if db is None:
- db = _DbSet(self._root, name, self._step, self._rras)
- self._dbsets[name] = db
- return db
-
-
-class _DbSet(object):
-
- def __init__(self, root, name, step, rras):
- self._root = root
- self.name = name
- self._step = step
- self._rras = rras
- self._revisions = []
- self._fields = None
- self._field_names = None
- self.__db = None
-
- @property
- def fields(self):
- return self._field_names
-
- @fields.setter
- def fields(self, fields):
- self._field_names = fields.keys()
- self._field_names.sort()
- self._fields = [str(fields[i]) for i in self._field_names]
- _logger.debug('Set %r fields for %r', self._fields, self.name)
-
- @property
- def first(self):
- if not self._revisions:
- return
- return self._revisions[0].first
-
- @property
- def last(self):
- if not self._revisions:
- return
- return self._revisions[-1].last
-
- @property
- def last_ds(self):
- if not self._revisions or not self._field_names:
- return {}
- info = _rrdtool.info(self._revisions[-1].path)
- result = {}
- for field in self._field_names:
- result[field] = float(info.get('ds[%s].last_ds' % field) or 0)
- return result
-
- def load(self, filename, revision):
- _logger.debug('Load %s database from %s with revision %s',
- filename, self._root, revision)
- db = _Db(join(self._root, filename), revision)
- bisect.insort(self._revisions, db)
- return db
-
- def put(self, values, timestamp=None):
- if not self.fields:
- _logger.debug('Parse fields from the first put')
- self.fields = dict([
- (i, 'DS:%s:GAUGE:%s:U:U' % (i, self._step * 2))
- for i in values])
-
- if not timestamp:
- timestamp = int(time.time())
- timestamp = timestamp / self._step * self._step
-
- db = self._get_db(timestamp)
- if db is None:
- return
-
- if timestamp <= db.last:
- _logger.warning('Database %s updated at %s, %s in the past',
- db.path, db.last, timestamp)
- return
-
- value = [str(timestamp)]
- for name in self._field_names:
- value.append(str(values[name]))
-
- _logger.debug('Put %r to %s', value, db.path)
-
- db.put(':'.join(value), timestamp)
-
- def get(self, start=None, end=None, resolution=None):
- if not self._revisions:
- return
-
- if not resolution:
- resolution = self._step
-
- if start is None:
- start = self._revisions[0].first
- if end is None:
- end = self._revisions[-1].last
-
- revisions = []
- for db in reversed(self._revisions):
- revisions.append(db)
- if db.last <= start:
- break
-
- start = start - start % self._step - self._step
- last = min(end, start + _FETCH_PAGE * resolution)
- last -= last % self._step + self._step
-
- for db in reversed(revisions):
- db_end = min(last, db.last - self._step)
- if start > db_end:
- break
- (row_start, start, row_step), __, rows = _rrdtool.fetch(
- str(db.path),
- 'AVERAGE',
- '--start', str(start),
- '--end', str(db_end),
- '--resolution', str(resolution))
- for raw_row in rows:
- row_start += row_step
- if row_start > end:
- break
- row = {}
- for i, value in enumerate(raw_row):
- row[db.field_names[i]] = value or .0
- yield row_start, row
- start = db_end + 1
-
- def _get_db(self, timestamp):
- if self.__db is None and self._fields:
- if self._revisions:
- db = self._revisions[-1]
- if db.last >= timestamp:
- _logger.warning(
- 'Database %s updated at %s, %s in the past',
- db.path, db.last, timestamp)
- return None
- if db.step != self._step or db.rras != self._rras or \
- db.field_names != self._field_names:
- db = self._create_db(db.revision + 1, db.last)
- else:
- db = self._create_db(0, timestamp)
- self.__db = db
- return self.__db
-
- def _create_db(self, revision, timestamp):
- filename = self.name
- if revision:
- filename += '-%s' % revision
- filename += '.rrd'
-
- _logger.debug('Create %s database in %s start=%s step=%s',
- filename, self._root, timestamp, self._step)
-
- _rrdtool.create(
- str(join(self._root, filename)),
- '--start', str(timestamp - self._step),
- '--step', str(self._step),
- *(self._fields + self._rras))
-
- return self.load(filename, revision)
-
-
-class _Db(object):
-
- def __init__(self, path, revision=0):
- self.path = str(path)
- self._meta_path = splitext(path)[0] + '.meta'
- self.revision = revision
- self.fields = []
- self.field_names = []
- self.rras = []
-
- info = _rrdtool.info(self.path)
- self.step = info['step']
- self.first = 0
- self.last = info['last_update']
-
- fields = {}
- rras = {}
-
- for key, value in info.items():
- match = _INFO_RE.match(key)
- if match is None:
- continue
- prefix, key, prop = match.groups()
- if prefix == 'ds':
- fields.setdefault(key, {})
- fields[key][prop] = value
- if prefix == 'rra':
- rras.setdefault(key, {})
- rras[key][prop] = value
-
- for index in sorted([int(i) for i in rras.keys()]):
- rra = rras[str(index)]
- self.rras.append(
- 'RRA:%(cf)s:%(xff)s:%(pdp_per_row)s:%(rows)s' % rra)
-
- for name in sorted(fields.keys()):
- props = fields[name]
- props['name'] = name
- self.fields.append(props)
- self.field_names.append(name)
-
- if exists(self._meta_path):
- with file(self._meta_path) as f:
- self.first = json.load(f).get('first')
-
- def put(self, value, timestamp):
- if not self.first:
- with file(self._meta_path, 'w') as f:
- json.dump({'first': timestamp}, f)
- self.first = timestamp
- _rrdtool.update(self.path, str(value))
- self.last = _rrdtool.info(self.path)['last_update']
-
- def __cmp__(self, other):
- return cmp(self.revision, other.revision)