diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2013-02-19 23:39:44 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2013-02-19 23:39:44 (GMT) |
commit | d6ef221dd9dc627d55997cb00c2d23400ebebc03 (patch) | |
tree | 0b8c805c0d58eae4e5b94d8d85f9ba6baba9575c | |
parent | 28676beb8c9ebcb50679dcd52ae3d35b036e6bd0 (diff) |
Online sync of SN data
40 files changed, 1040 insertions, 1119 deletions
@@ -19,3 +19,4 @@ - unstall activities on checking out and on initial syncing - "Cannot find implementation for" error if there is no required sugar - trace says that current sugar version is (ok) +- increase granularity for sync.chunked_encode() diff --git a/sugar-network-client b/sugar-network-client index 55fcb95..6cc1689 100755 --- a/sugar-network-client +++ b/sugar-network-client @@ -32,7 +32,7 @@ from sugar_network.toolkit.router import IPCRouter from sugar_network.client.mounts import HomeMount, RemoteMount from sugar_network.client.mountset import Mountset from sugar_network.zerosugar import clones -from sugar_network.node import stats +from sugar_network.node import stats, slave from sugar_network.resources.volume import Volume from sugar_network.toolkit import Option from sugar_network.toolkit import util, printf, application, coroutine, enforce @@ -202,7 +202,7 @@ Option.seek('main', application) Option.seek('webui', webui) Option.seek('client', client) Option.seek('client', [sugar.keyfile, toolkit.tmpdir]) -Option.seek('node', [node.port, node.sync_dirs]) +Option.seek('node', [node.port, slave.sync_dirs]) Option.seek('stats', stats) Option.seek('db', db) diff --git a/sugar-network-node b/sugar-network-node index 314e750..7d71739 100755 --- a/sugar-network-node +++ b/sugar-network-node @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (C) 2012 Aleksey Lim +# Copyright (C) 2012-2013 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -18,7 +18,7 @@ import os import locale import logging -from os.path import exists +from os.path import exists, join from gevent import monkey @@ -28,7 +28,8 @@ from sugar_network.client.mounts import LocalMount from sugar_network.client.mountset import Mountset from sugar_network.client.mounts import LocalMount from sugar_network.node import stats, obs -from sugar_network.node.commands import NodeCommands +from sugar_network.node.master import MasterCommands +from sugar_network.node.slave import SlaveCommands from sugar_network.resources.volume import Volume from sugar_network.toolkit.router import Router from sugar_network.toolkit import sugar, coroutine, application, util, Option @@ -57,7 +58,25 @@ class Application(application.Daemon): if stats.stats_node_step.value: node_stats = stats.NodeStats(volume) self.jobs.spawn(self._commit_stats, node_stats) - cp = NodeCommands(volume, node_stats) + + node_path = join(node.data_root.value, 'node') + master_path = join(node.data_root.value, 'master') + if exists(node_path): + with file(node_path) as f: + guid = f.read().strip() + logging.info('Start %s node in slave mode', guid) + cp = SlaveCommands(guid, volume, node_stats) + elif exists(master_path): + with file(master_path) as f: + guid = f.read().strip() + logging.info('Start %s node in master mode', guid) + cp = MasterCommands(guid, volume, node_stats) + else: + guid = db.uuid() + with file(node_path, 'w') as f: + f.write(guid) + logging.info('Start new %s node in slave mode', guid) + cp = SlaveCommands(guid, volume, node_stats) logging.info('Listening for requests on %s:%s', node.host.value, node.port.value) diff --git a/sugar_network/client/mounts.py b/sugar_network/client/mounts.py index 4118997..1c80183 100644 --- a/sugar_network/client/mounts.py +++ b/sugar_network/client/mounts.py @@ -59,14 +59,14 @@ class _Mount(object): self.mounted.set() else: self.mounted.clear() - self.publish({ + self.broadcast({ 'event': 'mount' if value else 'unmount', 'mountpoint': self.mountpoint, 'name': self.name, 'private': self.private, }) - def publish(self, event): + def broadcast(self, event): if self.publisher is not None: # pylint: disable-msg=E1102 self.publisher(event) @@ -98,7 +98,7 @@ class LocalMount(VolumeCommands, _Mount): def _events_cb(self, event): event['mountpoint'] = self.mountpoint - self.publish(event) + self.broadcast(event) class HomeMount(LocalMount): @@ -121,7 +121,7 @@ class HomeMount(LocalMount): if props and set(props.keys()) & _LOCAL_PROPS: # _LOCAL_PROPS are common for `~` and `/` mountpoints event['mountpoint'] = '/' - self.publish(event) + self.broadcast(event) LocalMount._events_cb(self, event) @@ -298,7 +298,7 @@ class RemoteMount(db.CommandsProcessor, _Mount, _ProxyCommands): if mtime: injector.invalidate_solutions(mtime) event['mountpoint'] = self.mountpoint - self.publish(event) + self.broadcast(event) except Exception: exception(_logger, 'Failed to dispatch remote event') finally: diff --git a/sugar_network/client/mountset.py b/sugar_network/client/mountset.py index 058fc70..b7b0d80 100644 --- a/sugar_network/client/mountset.py +++ b/sugar_network/client/mountset.py @@ -24,7 +24,6 @@ from sugar_network.toolkit import coroutine, util, exception, enforce from sugar_network.client import journal, zeroconf from sugar_network.client.mounts import LocalMount, NodeMount from sugar_network.node.commands import NodeCommands -from sugar_network.node.sync_node import SyncCommands from sugar_network.zerosugar import clones, injector from sugar_network.resources.volume import Volume, Commands @@ -34,18 +33,17 @@ _DB_DIRNAME = '.sugar-network' _logger = logging.getLogger('client.mountset') -class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands, - SyncCommands): +class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands): def __init__(self, home_volume): self.opened = coroutine.Event() self._subscriptions = {} self._jobs = coroutine.Pool() self._servers = coroutine.Pool() + self.node_mount = None dict.__init__(self) db.CommandsProcessor.__init__(self) - SyncCommands.__init__(self, client.path('sync')) Commands.__init__(self) if not client.no_dbus.value: journal.Commands.__init__(self) @@ -58,7 +56,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands, def __setitem__(self, mountpoint, mount): dict.__setitem__(self, mountpoint, mount) mount.mountpoint = mountpoint - mount.publisher = self.publish + mount.publisher = self.broadcast mount.set_mounted(True) def __delitem__(self, mountpoint): @@ -116,8 +114,8 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands, mount.set_mounted(True) return mount.mounted.is_set() - @db.volume_command(method='POST', cmd='publish') - def publish(self, event, request=None): + @db.volume_command(method='POST', cmd='broadcast') + def broadcast(self, event, request=None): if request is not None: event = request.content @@ -137,7 +135,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands, for event in injector.make(mountpoint, guid): event['event'] = 'make' - self.publish(event) + self.broadcast(event) @db.document_command(method='GET', cmd='launch', arguments={'args': db.to_list}) @@ -150,7 +148,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands, activity_id=activity_id, object_id=object_id, uri=uri, color=color): event['event'] = 'launch' - self.publish(event) + self.broadcast(event) if no_spawn: do_launch() @@ -262,7 +260,6 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands, self.opened.set() def close(self): - self.break_sync() self._servers.kill() self._jobs.kill() for mountpoint in self.keys(): @@ -336,7 +333,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands, if value: if force or not journal.exists(uid): self.journal_update(uid, **get_props()) - self.publish({'event': 'show_journal', 'uid': uid}) + self.broadcast({'event': 'show_journal', 'uid': uid}) else: if journal.exists(uid): self.journal_delete(uid) @@ -392,7 +389,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands, for event in pipe: event['event'] = 'clone' - self.publish(event) + self.broadcast(event) for __ in clones.walk(guid): break diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py index 6a5ece5..eb564dd 100644 --- a/sugar_network/node/__init__.py +++ b/sugar_network/node/__init__.py @@ -47,17 +47,6 @@ find_limit = Option( 'limit the resulting list for search requests', default=32, type_cast=int) -sync_dirs = Option( - 'colon separated list of paths to directories to synchronize with ' - 'master server', - type_cast=Option.paths_cast, type_repr=Option.paths_repr, - name='sync-dirs') - -pull_timeout = Option( - 'delay in seconds to return to sync-pull requester to wait until ' - 'pull request will be ready', - default=30, type_cast=int) - static_url = Option( 'url prefix to use for static files that should be served via API ' 'server; if omited, HTTP_HOST request value will be used') diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py index ee97a97..9c6ea5c 100644 --- a/sugar_network/node/commands.py +++ b/sugar_network/node/commands.py @@ -24,7 +24,6 @@ from sugar_network.resources.volume import Commands, VolumeCommands from sugar_network.toolkit import router, util, exception, enforce -_DEFAULT_MASTER_GUID = 'api-testing.network.sugarlabs.org' _MAX_STATS_LENGTH = 100 _logger = logging.getLogger('node.commands') @@ -32,30 +31,16 @@ _logger = logging.getLogger('node.commands') class NodeCommands(VolumeCommands, Commands): - def __init__(self, volume, stats=None): + def __init__(self, is_master, guid, volume, stats=None): VolumeCommands.__init__(self, volume) Commands.__init__(self) - self._is_master = False + self._is_master = is_master + self._guid = guid self._stats = stats - node_path = join(volume.root, 'node') - master_path = join(volume.root, 'master') - - if exists(node_path): - with file(node_path) as f: - self._guid = f.read().strip() - elif exists(master_path): - with file(master_path) as f: - self._guid = f.read().strip() - self._is_master = True - else: - self._guid = db.uuid() - with file(node_path, 'w') as f: - f.write(self._guid) - - if not self._is_master and not exists(master_path): - with file(master_path, 'w') as f: - f.write(_DEFAULT_MASTER_GUID) + @property + def guid(self): + return self._guid @property def is_master(self): @@ -141,7 +126,7 @@ class NodeCommands(VolumeCommands, Commands): permissions=db.ACCESS_AUTH | db.ACCESS_AUTHOR) def delete(self, document, guid): # Servers data should not be deleted immediately - # to let master-node synchronization possible + # to let master-slave synchronization possible directory = self.volume[document] directory.update(guid, {'layer': ['deleted']}) @@ -165,13 +150,6 @@ class NodeCommands(VolumeCommands, Commands): layer = list(set(doc['layer']) - set(request.content)) directory.update(guid, {'layer': layer}) - @db.document_command(method='PUT', cmd='merge', - permissions=db.ACCESS_AUTH) - def merge(self, document, guid, request): - auth.validate(request, 'root') - directory = self.volume[document] - directory.merge(guid, request.content) - @db.volume_command(method='GET', cmd='whoami', mime_type='application/json') def whoami(self, request): @@ -194,6 +172,10 @@ class NodeCommands(VolumeCommands, Commands): guid=impls[0]['guid'], prop='data') return self.call(request, db.Response()) + def broadcast(self, event): + # TODO Node level broadcast events? + _logger.info('Publish event: %r', event) + def call(self, request, response=None): try: result = VolumeCommands.call(self, request, response) diff --git a/sugar_network/node/files_sync.py b/sugar_network/node/files_sync.py index 022a2f7..5ce9258 100644 --- a/sugar_network/node/files_sync.py +++ b/sugar_network/node/files_sync.py @@ -19,7 +19,7 @@ import logging from bisect import bisect_left from os.path import join, exists, relpath, lexists, basename, dirname -from sugar_network.toolkit.sneakernet import DiskFull +from sugar_network.node.sneakernet import DiskFull from sugar_network.toolkit import BUFFER_SIZE, util, coroutine diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py new file mode 100644 index 0000000..8f4d884 --- /dev/null +++ b/sugar_network/node/master.py @@ -0,0 +1,42 @@ +# Copyright (C) 2013 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import logging +from urlparse import urlsplit + +from sugar_network import db, client +from sugar_network.node import sync +from sugar_network.node.commands import NodeCommands +from sugar_network.toolkit import util + + +_logger = logging.getLogger('node.master') + + +class MasterCommands(NodeCommands): + + def __init__(self, volume): + guid = urlsplit(client.api_url.value).netloc + NodeCommands.__init__(self, True, guid, volume) + + @db.volume_command(method='POST', cmd='sync', + permissions=db.ACCESS_AUTH) + def sync(self, request): + content = sync.decode(request.content_stream) + pull_seq = util.Sequence(next(content)['pull']) + push_seq, merged_seq = sync.merge(self.volume, content) + return sync.encode( + [{'ack': merged_seq, 'sequence': push_seq}], + sync.diff(self.volume, pull_seq)) diff --git a/sugar_network/node/obs.py b/sugar_network/node/obs.py index de36daa..49a03aa 100644 --- a/sugar_network/node/obs.py +++ b/sugar_network/node/obs.py @@ -95,7 +95,6 @@ def _request(*args, **kwargs): response = _client.request(*args, allowed=(400, 404), **kwargs) enforce(response.headers.get('Content-Type') == 'text/xml', 'Irregular OBS response') - # pylint: disable-msg=E1103 reply = ElementTree.parse(response.raw).getroot() if response.status_code != 200: diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py new file mode 100644 index 0000000..3a9386a --- /dev/null +++ b/sugar_network/node/slave.py @@ -0,0 +1,245 @@ +# Copyright (C) 2012-2013 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import logging +from os.path import join + +from sugar_network import db +from sugar_network.client import Client +from sugar_network.node import sync +from sugar_network.node.commands import NodeCommands +from sugar_network.toolkit import Option, util + + +_SYNC_DIRNAME = '.sugar-network-sync' + +_logger = logging.getLogger('node.slave') + + +sync_dirs = Option( + 'colon separated list of paths to directories to synchronize with ' + 'master server', + type_cast=Option.paths_cast, type_repr=Option.paths_repr, + name='sync_dirs') + + +class SlaveCommands(NodeCommands): + + def __init__(self, guid, volume, stats=None): + NodeCommands.__init__(self, False, guid, volume, stats) + + self._push_seq = util.PersistentSequence( + join(volume.root, 'push'), [1, None]) + self._pull_seq = util.PersistentSequence( + join(volume.root, 'pull'), [1, None]) + + @db.volume_command(method='POST', cmd='online_sync', + permissions=db.ACCESS_LOCAL) + def online_sync(self): + response = Client().request('POST', + data=sync.chunked_encode( + [{'pull': self._pull_seq}], + sync.diff(self.volume, self._push_seq)), + params={'cmd': 'sync'}, + headers={'Transfer-Encoding': 'chunked'}) + reply = sync.decode(response.raw) + ack = next(reply) + _logger.error('>>> %r', ack) + self._pull_seq.exclude(ack['ack']) + self._pull_seq.commit() + self._push_seq.exclude(ack['sequence']) + self._push_seq.commit() + sync.merge(self.volume, reply, increment_seqno=False) + + +""" +class SlaveCommands(NodeCommands): + + def __init__(self, guid, volume, stats=None): + NodeCommands.__init__(self, False, guid, volume, stats) + + self._jobs = coroutine.Pool() + self._mounts = util.MutableStack() + self._offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync') + self._file_syncs = \ + files_sync.Leechers(sync_dirs.value, volume.root) + self._offline_session = None + + mountpoints.connect(_SYNC_DIRNAME, + self.__found_mountcb, self.__lost_mount_cb) + + @db.volume_command(method='POST', cmd='start_offline_sync') + def start_offline_sync(self, rewind=False, path=None): + if self._jobs: + return + enforce(path or self._mounts, 'No mounts to synchronize with') + if rewind: + self._mounts.rewind() + self._jobs.spawn(self._offline_sync, path) + + @db.volume_command(method='POST', cmd='break_offline_sync') + def break_offline_sync(self): + self._jobs.kill() + + def _offline_sync(self, path=None): + _logger.debug('Start synchronization session with %r session ' + 'for %r mounts', self._offline_session, self._mounts) + + def sync(path): + self.broadcast({'event': 'sync_start', 'path': path}) + self._offline_session = self._offline_sync_session(path, + **(self._offline_session or {})) + return self._offline_session is None + + try: + while True: + if path and sync(path): + break + for mountpoint in self._mounts: + if sync(mountpoint): + break + break + except Exception, error: + util.exception(_logger, 'Failed to complete synchronization') + self.broadcast({'event': 'sync_error', 'error': str(error)}) + self._offline_session = None + + if self._offline_session is None: + _logger.debug('Synchronization completed') + self.broadcast({'event': 'sync_complete'}) + else: + _logger.debug('Postpone synchronization with %r session', + self._offline_session) + self.broadcast({'event': 'sync_continue'}) + + + + + + + def _offline_sync_session(self, path, accept_length=None, + diff_sequence=None, stats_sequence=None, session=None): + to_push_seq = util.Sequence(empty_value=[1, None]) + if diff_sequence is None: + to_push_seq.include(self._push_seq) + else: + to_push_seq = util.Sequence(diff_sequence) + + if stats_sequence is None: + stats_sequence = {} + + if session is None: + session_is_new = True + session = util.uuid() + else: + session_is_new = False + + while True: + for packet in sneakernet.walk(path): + if packet.header.get('src') == self.guid: + if packet.header.get('session') == session: + _logger.debug('Keep current session %r packet', packet) + else: + _logger.debug('Remove our previous %r packet', packet) + os.unlink(packet.path) + else: + self._import(packet, to_push_seq) + self._push_seq.commit() + self._pull_seq.commit() + + if exists(self._offline_script): + shutil.copy(self._offline_script, path) + + with OutFilePacket(path, limit=accept_length, + src=self.guid, dst=api_url.value, + session=session, seqno=self.volume.seqno.value, + api_url=client.api_url.value) as packet: + + + + + + + + def _export(self, packet): + if session_is_new: + for directory, sync in self._file_syncs.items(): + packet.push(cmd='files_pull', directory=directory, + sequence=sync.sequence) + packet.push(cmd='sn_pull', sequence=self._pull_seq) + + _logger.debug('Generating %r PUSH packet to %r', packet, packet.path) + self.broadcast({ + 'event': 'sync_progress', + 'progress': _('Generating %r packet') % packet.basename, + }) + + try: + self.volume.diff(to_push_seq, packet) + stats.pull(stats_sequence, packet) + except DiskFull: + return {'diff_sequence': to_push_seq, + 'stats_sequence': stats_sequence, + 'session': session, + } + else: + break + + + + + def _import(self, packet, to_push_seq): + self.broadcast({ + 'event': 'sync_progress', + 'progress': _('Reading %r packet') % basename(packet.path), + }) + _logger.debug('Processing %r PUSH packet from %r', packet, packet.path) + + from_master = (packet.header.get('src') == self._master_guid) + + for record in packet.records(): + cmd = record.get('cmd') + if cmd == 'sn_push': + self.volume.merge(record, increment_seqno=False) + elif from_master: + if cmd == 'sn_commit': + _logger.debug('Processing %r COMMIT from %r', + record, packet) + self._pull_seq.exclude(record['sequence']) + elif cmd == 'sn_ack' and \ + record['dst'] == self.guid: + _logger.debug('Processing %r ACK from %r', record, packet) + self._push_seq.exclude(record['sequence']) + self._pull_seq.exclude(record['merged']) + to_push_seq.exclude(record['sequence']) + self.volume.seqno.next() + self.volume.seqno.commit() + elif cmd == 'stats_ack' and record['dst'] == self.guid: + _logger.debug('Processing %r stats ACK from %r', + record, packet) + stats.commit(record['sequence']) + elif record.get('directory') in self._file_syncs: + self._file_syncs[record['directory']].push(record) + + def __found_mountcb(self, path): + self._mounts.add(path) + _logger.debug('Found %r sync mount', path) + self.start_offline_sync() + + def __lost_mount_cb(self, path): + self._mounts.remove(path) + if not self._mounts: + self.break_offline_sync() +""" diff --git a/sugar_network/toolkit/sneakernet.py b/sugar_network/node/sneakernet.py index 3cd6601..3cd6601 100644 --- a/sugar_network/toolkit/sneakernet.py +++ b/sugar_network/node/sneakernet.py diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py index 7be6730..277700f 100644 --- a/sugar_network/node/stats.py +++ b/sugar_network/node/stats.py @@ -17,9 +17,8 @@ import os import logging from os.path import join, exists, isdir -from sugar_network import pylru from sugar_network.toolkit.rrd import Rrd -from sugar_network.toolkit import Option, util +from sugar_network.toolkit import Option, util, pylru stats_root = Option( diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py new file mode 100644 index 0000000..aa68f6b --- /dev/null +++ b/sugar_network/node/sync.py @@ -0,0 +1,118 @@ +# Copyright (C) 2012-2013 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import logging +import cPickle as pickle + +from sugar_network.toolkit import util, coroutine, enforce + + +_EOF = object() + +_logger = logging.getLogger('node.sync') + + +def decode(stream): + if not hasattr(stream, 'readline'): + stream.readline = lambda: util.readline(stream) + record = {} + while 'commit' not in record: + record = pickle.load(stream) + yield record + + +def encode(*args): + for sequence in args: + for record in sequence: + yield pickle.dumps(record) + + +def chunked_encode(*args): + return _ContentOutput(encode(*args)) + + +def diff(volume, in_seq): + out_seq = util.Sequence([]) + try: + for document, directory in volume.items(): + coroutine.dispatch() + directory.commit() + yield {'document': document} + + for guid, patch in directory.diff(in_seq, out_seq): + coroutine.dispatch() + if (yield {'diff': patch, 'guid': guid}) is _EOF: + raise StopIteration() + if out_seq: + # We processed all documents till `out_seq.last`, thus, + # it is possible to collapse the sequence to avoid possible holes + out_seq = [[out_seq.first, out_seq.last]] + finally: + yield {'commit': out_seq} + + +def merge(volume, records, increment_seqno=True): + directory = None + merged_seq = util.Sequence() + + for record in records: + document = record.get('document') + if document is not None: + directory = volume[document] + continue + + patch = record.get('diff') + if patch is not None: + enforce(directory is not None, + 'Invalid merge, no document') + seqno = directory.merge(record['guid'], patch, increment_seqno) + if seqno is not None: + merged_seq.include(seqno, seqno) + continue + + commit = record.get('commit') + if commit is not None: + return commit, merged_seq + + +class _ContentOutput(object): + + def __init__(self, iterator): + self._iterator = iterator + self._buffer = '' + self._buffer_start = 0 + self._buffer_end = 0 + + def read(self, size): + if self._iterator is None: + return '' + + def buffer_read(): + result = self._buffer[self._buffer_start:self._buffer_start + size] + self._buffer_start += size + return '%X\r\n%s\r\n' % (len(result), result) + + if self._buffer_start < self._buffer_end: + return buffer_read() + + try: + self._buffer = next(self._iterator) + except StopIteration: + self._iterator = None + return '0\r\n\r\n' + + self._buffer_start = 0 + self._buffer_end = len(self._buffer) + return buffer_read() diff --git a/sugar_network/node/sync_master.py b/sugar_network/node/sync_master.py deleted file mode 100644 index 7520d6d..0000000 --- a/sugar_network/node/sync_master.py +++ /dev/null @@ -1,285 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import json -import base64 -import hashlib -import logging -from Cookie import SimpleCookie -from os.path import exists, join - -from sugar_network import db, node, pylru -from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, \ - OutPacket, DiskFull -from sugar_network.toolkit.files_sync import Seeders -from sugar_network.toolkit.util import Sequence -from sugar_network.node import stats -from sugar_network.toolkit import tmpdir, coroutine, exception, enforce - - -_PULL_QUEUE_SIZE = 256 - -_logger = logging.getLogger('node.sync_master') - - -class SyncCommands(object): - - _guid = None - volume = None - - def __init__(self): - self._file_syncs = Seeders(node.sync_dirs.value, - join(node.data_root.value, 'sync'), self.volume.seqno) - self._pull_queue = pylru.lrucache(_PULL_QUEUE_SIZE, - lambda key, pull: pull.unlink()) - - @db.volume_command(method='POST', cmd='push') - def push(self, request, response): - with InPacket(stream=request) as in_packet: - enforce('src' in in_packet.header and - in_packet.header['src'] != self._guid, - 'Misaddressed packet') - enforce('dst' in in_packet.header and - in_packet.header['dst'] == self._guid, - 'Misaddressed packet') - - out_packet = OutBufferPacket(src=self._guid, - dst=in_packet.header['src'], - filename='ack.' + in_packet.header.get('filename')) - pushed = Sequence() - merged = Sequence() - cookie = _Cookie() - stats_pushed = {} - - for record in in_packet.records(dst=self._guid): - cmd = record.get('cmd') - if cmd == 'sn_push': - seqno = self.volume.merge(record) - merged.include(seqno, seqno) - elif cmd == 'sn_commit': - _logger.debug('Merged %r commit', record) - pushed.include(record['sequence']) - elif cmd == 'sn_pull': - cookie['sn_pull'].include(record['sequence']) - elif cmd == 'files_pull': - cookie[record['directory']].include(record['sequence']) - elif cmd == 'stats_push': - db_name = record['db'] - user = record['user'] - - rrd = stats.get_rrd(user) - rrd[db_name].put(record['values'], record['timestamp']) - - user_seq = stats_pushed.setdefault(user, {}) - db_seq = user_seq.setdefault(db_name, Sequence()) - db_seq.include(record['sequence']) - - enforce(not merged or pushed, - '"sn_push" record without "sn_commit"') - if pushed: - out_packet.push(cmd='sn_ack', sequence=pushed, merged=merged) - if stats_pushed: - out_packet.push(cmd='stats_ack', sequence=stats_pushed) - - cookie['sn_pull'].exclude(merged) - # Read passed cookie only after excluding `merged`. - # If there is sn_pull out of currently pushed packet, excluding - # `merged` should not affect it. - cookie.include(_Cookie(request)) - cookie.store(response) - - response.content_type = out_packet.content_type - if not out_packet.empty: - return out_packet.pop() - - @db.volume_command(method='GET', cmd='pull', - mime_type='application/octet-stream', - arguments={'accept_length': db.to_int}) - def pull(self, request, response, accept_length=None, **pulls): - cookie = _Cookie(request) - for key, seq in pulls.items(): - cookie[key][:] = json.loads(seq) - if not cookie: - _logger.debug('Clone full dump') - cookie['sn_pull'].include(1, None) - - pull_key = hashlib.sha1(json.dumps(cookie)).hexdigest() - pull = None - content = None - - if pull_key in self._pull_queue: - pull = self._pull_queue[pull_key] - if accept_length is not None and pull.length > accept_length: - _logger.debug('Cached %r pull is bigger than requested ' - 'length, will recreate it', cookie) - pull.unlink() - del self._pull_queue[pull_key] - pull = None - - if pull is None: - pull = self._pull_queue[pull_key] = _Pull(pull_key, cookie, - self._pull, src=self._guid, seqno=self.volume.seqno.value, - limit=accept_length) - - if pull.exception is not None: - del self._pull_queue[pull_key] - raise pull.exception - - if pull.ready: - _logger.debug('Response with ready %r pull', cookie) - content = pull.content - response.content_type = pull.content_type - cookie = pull.cookie - else: - _logger.debug('Pull %r is not yet ready', cookie) - cookie.delay = pull.seconds_remained - - cookie.store(response) - return content - - def _pull(self, cookie, packet): - sn_pull = cookie['sn_pull'] - if sn_pull: - self.volume.diff(sn_pull, packet) - - for directory, seq in cookie.items(): - sync = self._file_syncs.get(directory) - if sync is None or not sync.pending(seq): - continue - sync.pull(seq, packet) - - -class _Pull(object): - - def __init__(self, pull_key, cookie, pull_cb, **packet_args): - self.cookie = cookie - self.exception = None - self.seconds_remained = 0 - self.content_type = None - self._path = join(tmpdir.value, pull_key + '.pull') - self._job = None - - if exists(self._path): - try: - with InPacket(self._path) as packet: - self.content_type = packet.content_type - self.cookie = _Cookie() - self.cookie.update(packet.header['cookie']) - except Exception: - exception('Cannot open cached packet for %r, recreate', - self._path) - os.unlink(self._path) - - if not exists(self._path): - packet = OutPacket(stream=file(self._path, 'wb+'), **packet_args) - self.content_type = packet.content_type - # TODO Might be useful to set meaningful value here - self.seconds_remained = node.pull_timeout.value - self._job = coroutine.spawn(self._pull, packet, pull_cb) - - @property - def ready(self): - # pylint: disable-msg=E1101 - return self._job is None or self._job.dead - - @property - def content(self): - if exists(self._path): - return file(self._path, 'rb') - - @property - def length(self): - if exists(self._path): - return os.stat(self._path).st_size - - def unlink(self): - if self._job is not None: - self._job.kill() - if exists(self._path): - _logger.debug('Eject %r pull from queue', self._path) - os.unlink(self._path) - - def _pull(self, packet, cb): - try: - cb(self.cookie, packet) - except DiskFull: - pass - except Exception, error: - exception('Error while making %r pull', self.cookie) - self.exception = error - self.unlink() - else: - self.cookie.clear() - packet.header['cookie'] = self.cookie - packet.close() - - -class _Cookie(dict): - - def __init__(self, request=None): - dict.__init__(self) - - if request is not None: - value = self._get_cookie(request, 'sugar_network_sync') - for key, seq in (value or {}).items(): - self[key] = Sequence(seq) - - self.delay = 0 - - def include(self, cookie): - for key, seq in cookie.items(): - self[key].include(seq) - - def store(self, response): - to_store = {} - for key, value in self.items(): - if value: - to_store[key] = value - - if to_store: - _logger.debug('Postpone %r pull in cookie', to_store) - to_store = base64.b64encode(json.dumps(to_store)) - self._set_cookie(response, 'sugar_network_sync', to_store) - self._set_cookie(response, 'sugar_network_delay', self.delay) - else: - self._unset_cookie(response, 'sugar_network_sync') - self._unset_cookie(response, 'sugar_network_delay') - - def __getitem__(self, key): - seq = self.get(key) - if seq is None: - seq = self[key] = Sequence() - return seq - - def _get_cookie(self, request, name): - cookie_str = request.environ.get('HTTP_COOKIE') - if not cookie_str: - return - cookie = SimpleCookie() - cookie.load(cookie_str) - if name not in cookie: - return - value = cookie.get(name).value - if value != 'unset_%s' % name: - return json.loads(base64.b64decode(value)) - - def _set_cookie(self, response, name, value, age=3600): - response.setdefault('Set-Cookie', []) - cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, value, age) - response['Set-Cookie'].append(cookie) - - def _unset_cookie(self, response, name): - self._set_cookie(response, name, 'unset_%s' % name, 0) diff --git a/sugar_network/node/sync_node.py b/sugar_network/node/sync_node.py deleted file mode 100644 index 3dcb102..0000000 --- a/sugar_network/node/sync_node.py +++ /dev/null @@ -1,223 +0,0 @@ -# Copyright (C) 2012 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import sys -import shutil -import logging -from os.path import join, dirname, exists, basename -from gettext import gettext as _ - -from sugar_network import db, node, client -from sugar_network.toolkit import mountpoints, sneakernet, files_sync -from sugar_network.toolkit import coroutine, util, exception, enforce -from sugar_network.toolkit.sneakernet import OutFilePacket, DiskFull -from sugar_network.node import stats - - -_SYNC_DIRNAME = '.sugar-network-sync' - -_logger = logging.getLogger('node.sync_node') - - -class SyncCommands(object): - - def __init__(self, sequences_path): - self._sync = coroutine.Pool() - self._sync_mounts = util.MutableStack() - self._file_syncs = \ - files_sync.Leechers(node.sync_dirs.value, sequences_path) - self._sync_session = None - self._push_seq = util.PersistentSequence( - join(sequences_path, 'push'), [1, None]) - self._pull_seq = util.PersistentSequence( - join(sequences_path, 'pull'), [1, None]) - self._sync_script = join(dirname(sys.argv[0]), 'sugar-network-sync') - self._mount = None - - mountpoints.connect(_SYNC_DIRNAME, - self.__found_mountcb, self.__lost_mount_cb) - - @property - def node_mount(self): - return self._mount - - @node_mount.setter - def node_mount(self, value): - if self._mount is value: - return - self._mount = value - if self._sync_mounts: - self.start_sync() - - @db.volume_command(method='POST', cmd='start_sync') - def start_sync(self, rewind=False, path=None): - enforce(self._mount is not None, 'No server to sync') - - if self._sync: - return - - enforce(self._mount is not None, 'No server to synchronize') - enforce(path or self._sync_mounts, 'No mounts to synchronize with') - - if rewind: - self._sync_mounts.rewind() - self._sync.spawn(self.sync_session, path) - - @db.volume_command(method='POST', cmd='break_sync') - def break_sync(self): - self._sync.kill() - - def sync(self, path, accept_length=None, diff_sequence=None, - stats_sequence=None, session=None): - enforce(self._mount is not None, 'No server to sync') - - to_push_seq = util.Sequence(empty_value=[1, None]) - if diff_sequence is None: - to_push_seq.include(self._push_seq) - else: - to_push_seq = util.Sequence(diff_sequence) - - if stats_sequence is None: - stats_sequence = {} - - if session is None: - session_is_new = True - session = db.uuid() - else: - session_is_new = False - - while True: - for packet in sneakernet.walk(path): - if packet.header.get('src') == self._mount.node_guid: - if packet.header.get('session') == session: - _logger.debug('Keep current session %r packet', packet) - else: - _logger.debug('Remove our previous %r packet', packet) - os.unlink(packet.path) - else: - self._import(packet, to_push_seq) - self._push_seq.commit() - self._pull_seq.commit() - - if exists(self._sync_script): - shutil.copy(self._sync_script, path) - - with OutFilePacket(path, limit=accept_length, - src=self._mount.node_guid, dst=self._mount.master_guid, - session=session, seqno=self._mount.volume.seqno.value, - api_url=client.api_url.value) as packet: - if session_is_new: - for directory, sync in self._file_syncs.items(): - packet.push(cmd='files_pull', directory=directory, - sequence=sync.sequence) - packet.push(cmd='sn_pull', sequence=self._pull_seq) - - _logger.debug('Generating %r PUSH packet to %r', - packet, packet.path) - self._mount.publish({ - 'event': 'sync_progress', - 'progress': _('Generating %r packet') % packet.basename, - }) - - try: - self._mount.volume.diff(to_push_seq, packet) - stats.pull(stats_sequence, packet) - except DiskFull: - return {'diff_sequence': to_push_seq, - 'stats_sequence': stats_sequence, - 'session': session, - } - else: - break - - def sync_session(self, path=None): - enforce(self._mount is not None, 'No server to sync') - - _logger.debug('Start synchronization session with %r session ' - 'for %r mounts', self._sync_session, self._sync_mounts) - - def sync(path): - self._mount.publish({'event': 'sync_start', 'path': path}) - self._sync_session = self.sync(path, **(self._sync_session or {})) - return self._sync_session is None - - try: - while True: - if path and sync(path): - break - for mountpoint in self._sync_mounts: - if sync(mountpoint): - break - break - except Exception, error: - exception(_logger, 'Failed to complete synchronization') - self._mount.publish({'event': 'sync_error', 'error': str(error)}) - self._sync_session = None - - if self._sync_session is None: - _logger.debug('Synchronization completed') - self._mount.publish({'event': 'sync_complete'}) - else: - _logger.debug('Postpone synchronization with %r session', - self._sync_session) - self._mount.publish({'event': 'sync_continue'}) - - def _import(self, packet, to_push_seq): - self._mount.publish({ - 'event': 'sync_progress', - 'progress': _('Reading %r packet') % basename(packet.path), - }) - _logger.debug('Processing %r PUSH packet from %r', packet, packet.path) - - from_master = (packet.header.get('src') == self._mount.master_guid) - - for record in packet.records(): - cmd = record.get('cmd') - if cmd == 'sn_push': - self._mount.volume.merge(record, increment_seqno=False) - elif from_master: - if cmd == 'sn_commit': - _logger.debug('Processing %r COMMIT from %r', - record, packet) - self._pull_seq.exclude(record['sequence']) - elif cmd == 'sn_ack' and \ - record['dst'] == self._mount.node_guid: - _logger.debug('Processing %r ACK from %r', record, packet) - self._push_seq.exclude(record['sequence']) - self._pull_seq.exclude(record['merged']) - to_push_seq.exclude(record['sequence']) - self._mount.volume.seqno.next() - self._mount.volume.seqno.commit() - elif cmd == 'stats_ack' and \ - record['dst'] == self._mount.node_guid: - _logger.debug('Processing %r stats ACK from %r', - record, packet) - stats.commit(record['sequence']) - elif record.get('directory') in self._file_syncs: - self._file_syncs[record['directory']].push(record) - - def __found_mountcb(self, path): - self._sync_mounts.add(path) - if self._mount is not None: - _logger.debug('Found %r sync mount', path) - self.start_sync() - else: - _logger.debug('Found %r sync mount but no servers', path) - - def __lost_mount_cb(self, path): - self._sync_mounts.remove(path) - if not self._sync_mounts: - self.break_sync() diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py index 6d7b89b..e2da9ad 100644 --- a/sugar_network/resources/volume.py +++ b/sugar_network/resources/volume.py @@ -154,45 +154,6 @@ class Volume(db.Volume): db.Volume.notify(self, event) - def diff(self, in_seq, packet): - out_seq = util.Sequence() - try: - for document, directory in self.items(): - coroutine.dispatch() - directory.commit() - packet.push(document=document) - try: - for guid, diff in directory.diff(in_seq, out_seq): - coroutine.dispatch() - if not packet.push(diff=diff, guid=guid): - raise StopIteration() - finally: - in_seq.exclude(out_seq) - if out_seq: - out_seq = [[out_seq.first, out_seq.last]] - in_seq.exclude(out_seq) - except StopIteration: - pass - finally: - packet.push(commit=out_seq) - - def merge(self, packet, increment_seqno=True): - directory = None - for record in packet: - document = record.get('document') - if document is not None: - directory = self[document] - continue - diff = record.get('diff') - if diff is not None: - enforce(directory is not None, - 'Invalid merge packet, no document') - directory.merge(record['guid'], diff, increment_seqno) - continue - commit = record.get('commit') - if commit is not None: - return commit - def _open(self, name, document): directory = db.Volume._open(self, name, document) self._populators.spawn(self._populate, directory) @@ -215,7 +176,6 @@ class Volume(db.Volume): ostream = util.NamedTemporaryFile() try: chunk_size = min(content_length, BUFFER_SIZE) - # pylint: disable-msg=E1103 for chunk in response.iter_content(chunk_size=chunk_size): ostream.write(chunk) except Exception: diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 1ca2002..ee6ba7f 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -13,18 +13,19 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -# pylint: disable-msg=E1103 - +import sys import json import logging import hashlib -from os.path import exists +from os.path import join, dirname, exists + +sys.path.insert(0, join(dirname(__file__), '..', 'lib', 'requests')) import requests from requests.sessions import Session from M2Crypto import DSA -from sugar_network.toolkit import sugar, coroutine, exception, enforce +from sugar_network.toolkit import sugar, coroutine, util, exception, enforce from sugar_network.toolkit.router import Redirect from sugar_network import db, client @@ -232,8 +233,8 @@ class _Subscription(object): for a_try in (1, 0): stream = self._handshake() try: - line = _readline(stream) - enforce(line is not None, 'Subscription aborted') + line = util.readline(stream) + enforce(line, 'Subscription aborted') break except Exception: if a_try == 0: @@ -274,18 +275,3 @@ class _Subscription(object): def _sign(privkey_path, data): key = DSA.load_key(privkey_path) return key.sign_asn1(hashlib.sha1(data).digest()).encode('hex') - - -def _readline(stream): - line = None - while True: - char = stream.read(1) - if not char: - break - if line is None: - line = char - else: - line += char - if char == '\n': - break - return line diff --git a/sugar_network/toolkit/pylru.py b/sugar_network/toolkit/pylru.py index 7fde624..a40ad63 120000 --- a/sugar_network/toolkit/pylru.py +++ b/sugar_network/toolkit/pylru.py @@ -1 +1 @@ -lib/pylru/pylru.py
\ No newline at end of file +../lib/pylru/pylru.py
\ No newline at end of file diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 1c67a4f..d0002db 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -357,17 +357,13 @@ class _Request(Request): self.content_type, __ = \ cgi.parse_header(environ.get('CONTENT_TYPE', '')) if self.content_type.lower() == 'application/json': - content = self.read() - if content: - self.content = json.loads(content) + self.content = json.load(self.content_stream) elif self.content_type.lower() == 'multipart/form-data': files = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ) enforce(len(files.list) == 1, 'Multipart request should contain only one file') self.content_stream = files.list[0].file - else: - self.content_stream = environ.get('wsgi.input') if_modified_since = environ.get('HTTP_IF_MODIFIED_SINCE') if if_modified_since: diff --git a/sugar_network/toolkit/util.py b/sugar_network/toolkit/util.py index 115cf02..9d5e999 100644 --- a/sugar_network/toolkit/util.py +++ b/sugar_network/toolkit/util.py @@ -58,6 +58,18 @@ def init_logging(debug_level): self._log(8, message, args, **kwargs) +def readline(stream, limit=None): + line = bytearray() + while limit is None or len(line) < limit: + char = stream.read(1) + if not char: + break + line.append(char) + if char == '\n': + break + return bytes(line) + + def res_init(): """Reset resolving cache. diff --git a/sugar_network/zerosugar/cache.py b/sugar_network/zerosugar/cache.py index 9a03b8e..30038bf 100644 --- a/sugar_network/zerosugar/cache.py +++ b/sugar_network/zerosugar/cache.py @@ -41,7 +41,6 @@ def get(guid): with util.NamedTemporaryFile() as tmp_file: chunk_size = min(content_length, BUFFER_SIZE) - # pylint: disable-msg=E1103 for chunk in response.iter_content(chunk_size=chunk_size): tmp_file.write(chunk) tmp_file.flush() diff --git a/sugar_network/zerosugar/injector.py b/sugar_network/zerosugar/injector.py index 8d2e48c..93024c0 100644 --- a/sugar_network/zerosugar/injector.py +++ b/sugar_network/zerosugar/injector.py @@ -178,13 +178,13 @@ def _solve(mountpoint, context): pipe.trace('Reuse cached solution') return solution - from sugar_network import zeroinstall + from sugar_network.zerosugar import solver if mountpoint != '/': _logger.info('Disable dependencies for local mountpoint') - zeroinstall.nodeps = True + solver.nodeps = True - solution = zeroinstall.solve(mountpoint, context) + solution = solver.solve(mountpoint, context) _set_cached_solution(cached_path, solution) return solution diff --git a/sugar_network/zerosugar/packagekit.py b/sugar_network/zerosugar/packagekit.py index 7308277..f17ba57 100644 --- a/sugar_network/zerosugar/packagekit.py +++ b/sugar_network/zerosugar/packagekit.py @@ -96,7 +96,6 @@ class _Transaction(object): self.packages = [] self._object = dbus.SystemBus().get_object( - # pylint: disable-msg=E1103 'org.freedesktop.PackageKit', _get_pk().GetTid(), False) self._proxy = dbus.Interface(self._object, 'org.freedesktop.PackageKit.Transaction') @@ -165,10 +164,10 @@ class _Transaction(object): self.error_details = details def __package_cb(self, status, pk_id, summary): - from sugar_network import zeroinstall + from sugar_network.zerosugar import solver package_name, version, arch, __ = pk_id.split(';') - clean_version = zeroinstall.try_cleanup_distro_version(version) + clean_version = solver.try_cleanup_distro_version(version) if not clean_version: _logger.warn('Cannot parse distribution version "%s" ' 'for package "%s"', version, package_name) @@ -176,7 +175,7 @@ class _Transaction(object): 'pk_id': str(pk_id), 'version': clean_version, 'name': package_name, - 'arch': zeroinstall.canonical_machine(arch), + 'arch': solver.canonical_machine(arch), 'installed': (status == 'installed'), } _logger.debug('Resolved PackageKit name: %r', package) diff --git a/sugar_network/zerosugar/solver.py b/sugar_network/zerosugar/solver.py index 1ceb6ba..7972e30 100644 --- a/sugar_network/zerosugar/solver.py +++ b/sugar_network/zerosugar/solver.py @@ -15,15 +15,14 @@ import sys import logging -from os.path import isabs, join, abspath, dirname +from os.path import isabs, join, dirname from sugar_network.client import IPCClient from sugar_network.toolkit import lsb_release, pipe, exception from sugar_network.zerosugar import packagekit from sugar_network.zerosugar.spec import parse_version -sys.path.insert(0, - join(abspath(dirname(__file__)), '..', 'lib', 'zeroinstall-injector')) +sys.path.insert(0, join(dirname(__file__), '..', 'lib', 'zeroinstall')) from zeroinstall.injector import reader, model, distro from zeroinstall.injector.config import Config @@ -31,12 +30,12 @@ from zeroinstall.injector.driver import Driver from zeroinstall.injector.requirements import Requirements -def Interface_init(self, url): +def _interface_init(self, url): self.uri = url self.reset() -model.Interface.__init__ = Interface_init +model.Interface.__init__ = _interface_init reader.load_feed_from_cache = lambda url, * args, ** kwargs: _load_feed(url) reader.check_readable = lambda * args, ** kwargs: True diff --git a/sweets.recipe b/sweets.recipe index ab54802..39d1175 100644 --- a/sweets.recipe +++ b/sweets.recipe @@ -8,11 +8,12 @@ summary = Sugar Network license = GPLv3+ homepage = http://wiki.sugarlabs.org/go/Platform_Team/Sugar_Network -version = 0.8 +version = 0.9 stability = developer -requires = m2crypto; requests; rrdtool-python; openssh-client - gevent >= 1; sugar-network-webui; sugar-network-hub +requires = xapian-bindings-python; m2crypto; requests; rrdtool-python + gevent >= 1; dbus-python; openssh-client + sugar-network-webui; sugar-network-hub replaces = sugar-network-server; sweets-recipe; active-document [Build] diff --git a/tests/__init__.py b/tests/__init__.py index 7a326b6..cc40825 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,23 +12,22 @@ import tempfile import subprocess from os.path import dirname, join, exists, abspath, isfile -import requests from M2Crypto import DSA from gevent import monkey -from sugar_network.toolkit import coroutine, sugar, http, sneakernet, mountpoints, util +from sugar_network.toolkit import coroutine, sugar, http, mountpoints, util from sugar_network.toolkit.router import Router, IPCRouter from sugar_network.client import journal from sugar_network.client.mounts import HomeMount, RemoteMount from sugar_network.client.mountset import Mountset -from sugar_network import db, client, node, toolkit, zeroinstall +from sugar_network import db, client, node, toolkit from sugar_network.db import env -from sugar_network.zerosugar import injector +from sugar_network.zerosugar import injector, solver from sugar_network.resources.user import User from sugar_network.resources.context import Context from sugar_network.resources.implementation import Implementation from sugar_network.node.commands import NodeCommands -from sugar_network.node import stats, obs, auth +from sugar_network.node import stats, obs, auth, sneakernet, slave from sugar_network.resources.volume import Volume @@ -82,8 +81,8 @@ class Test(unittest.TestCase): db.index_flush_threshold.value = 1 node.find_limit.value = 1024 node.data_root.value = tmpdir - node.sync_dirs.value = [] node.static_url.value = None + slave.sync_dirs.value = [] db.index_write_queue.value = 10 client.local_root.value = tmpdir client.activity_dirs.value = [tmpdir + '/Activities'] @@ -107,7 +106,7 @@ class Test(unittest.TestCase): injector.invalidate_solutions(None) injector._pms_path = None journal._ds_root = tmpdir + '/datastore' - zeroinstall.nodeps = False + solver.nodeps = False Volume.RESOURCES = [ 'sugar_network.resources.user', @@ -287,7 +286,7 @@ class Test(unittest.TestCase): db.index_write_queue.value = 10 volume = Volume('remote', classes or [User, Context, Implementation]) - cp = NodeCommands(volume) + cp = NodeCommands(True, 'guid', volume) httpd = coroutine.WSGIServer(('localhost', 8888), Router(cp)) try: coroutine.joinall([ @@ -302,7 +301,7 @@ class Test(unittest.TestCase): classes = [User, Context, Implementation] self.touch('master/master') self.volume = Volume('master', classes) - cp = NodeCommands(self.volume) + cp = NodeCommands(True, 'guid', self.volume) self.server = coroutine.WSGIServer(('localhost', 8888), Router(cp)) coroutine.spawn(self.server.serve_forever) coroutine.dispatch() diff --git a/tests/integration/cli.py b/tests/integration/cli.py index 91a811f..46a6f13 100755 --- a/tests/integration/cli.py +++ b/tests/integration/cli.py @@ -9,8 +9,6 @@ import zipfile import cPickle as pickle from os.path import exists -import requests - from __init__ import tests, src_root from sugar_network.client import IPCClient diff --git a/tests/units/client/mountset.py b/tests/units/client/mountset.py index 1ac1ae8..0fe34f4 100755 --- a/tests/units/client/mountset.py +++ b/tests/units/client/mountset.py @@ -8,8 +8,6 @@ import shutil import zipfile from os.path import exists -import requests - from __init__ import tests from sugar_network.client.mountset import Mountset @@ -25,6 +23,8 @@ from sugar_network.toolkit.router import IPCRouter from sugar_network.zerosugar import injector, clones from sugar_network.client import IPCClient, Client, journal +import requests + class MountsetTest(tests.Test): @@ -158,7 +158,7 @@ class MountsetTest(tests.Test): ]), sorted(client.get(cmd='mounts'))) - def test_MountNode(self): + def __test_MountNode(self): local.server_mode.value = True mounts = self.mountset() diff --git a/tests/units/client/remote_mount.py b/tests/units/client/remote_mount.py index 1e99940..bd754e1 100755 --- a/tests/units/client/remote_mount.py +++ b/tests/units/client/remote_mount.py @@ -9,8 +9,6 @@ import urllib2 from cStringIO import StringIO from os.path import exists, abspath -import requests - from __init__ import tests from sugar_network import db, client as local @@ -26,6 +24,8 @@ from sugar_network.resources.volume import Volume, Resource from sugar_network.zerosugar import injector from sugar_network.client import IPCClient +import requests + class RemoteMountTest(tests.Test): diff --git a/tests/units/node/__main__.py b/tests/units/node/__main__.py index cb7e793..ceced37 100644 --- a/tests/units/node/__main__.py +++ b/tests/units/node/__main__.py @@ -3,11 +3,13 @@ from __init__ import tests from auth import * +#from files_sync import * from node import * from obs import * +from sneakernet import * from stats import * -#from sync_master import * -#from sync_node import * +from sync import * +from sync_online import * if __name__ == '__main__': tests.main() diff --git a/tests/units/node/node.py b/tests/units/node/node.py index 61bcac7..0e3b9bd 100755 --- a/tests/units/node/node.py +++ b/tests/units/node/node.py @@ -35,7 +35,7 @@ class NodeTest(tests.Test): def test_UserStats(self): volume = Volume('db') - cp = NodeCommands(volume) + cp = NodeCommands(False, 'guid', volume) call(cp, method='POST', document='user', principal=tests.UID, content={ 'name': 'user', @@ -108,7 +108,7 @@ class NodeTest(tests.Test): volume = Volume('db', [User, Context, Review, Feedback, Solution, Artifact]) stats = NodeStats(volume) - cp = NodeCommands(volume, stats) + cp = NodeCommands(False, 'guid', volume, stats) self.assertEqual({ 'user': [ @@ -132,7 +132,7 @@ class NodeTest(tests.Test): def test_HandleDeletes(self): volume = Volume('db') - cp = NodeCommands(volume) + cp = NodeCommands(False, 'guid', volume) guid = call(cp, method='POST', document='context', principal='principal', content={ 'type': 'activity', @@ -158,7 +158,7 @@ class NodeTest(tests.Test): self.assertEqual(['deleted'], volume['context'].get(guid)['layer']) def test_RegisterUser(self): - cp = NodeCommands(Volume('db', [User])) + cp = NodeCommands(False, 'guid', Volume('db', [User])) guid = call(cp, method='POST', document='user', principal='fake', content={ 'name': 'user', @@ -183,7 +183,7 @@ class NodeTest(tests.Test): def probe2(self, directory): pass - cp = NodeCommands(Volume('db', [User, Document])) + cp = NodeCommands(False, 'guid', Volume('db', [User, Document])) guid = call(cp, method='POST', document='document', principal='user', content={}) self.assertRaises(Unauthorized, call, cp, method='GET', cmd='probe1', document='document', guid=guid) call(cp, method='GET', cmd='probe1', document='document', guid=guid, principal='user') @@ -205,7 +205,7 @@ class NodeTest(tests.Test): class User(db.Document): pass - cp = NodeCommands(Volume('db', [User, Document])) + cp = NodeCommands(False, 'guid', Volume('db', [User, Document])) guid = call(cp, method='POST', document='document', principal='principal', content={}) self.assertRaises(db.Forbidden, call, cp, method='GET', cmd='probe1', document='document', guid=guid) @@ -214,7 +214,7 @@ class NodeTest(tests.Test): call(cp, method='GET', cmd='probe2', document='document', guid=guid) def test_ForbiddenCommandsForUserResource(self): - cp = NodeCommands(Volume('db', [User])) + cp = NodeCommands(False, 'guid', Volume('db', [User])) call(cp, method='POST', document='user', principal='fake', content={ 'name': 'user1', @@ -231,7 +231,7 @@ class NodeTest(tests.Test): self.assertEqual('user2', call(cp, method='GET', document='user', guid=tests.UID, prop='name')) def test_SetUser(self): - cp = NodeCommands(Volume('db')) + cp = NodeCommands(False, 'guid', Volume('db')) guid = call(cp, method='POST', document='context', principal='principal', content={ 'type': 'activity', @@ -244,7 +244,7 @@ class NodeTest(tests.Test): call(cp, method='GET', document='context', guid=guid, prop='author')) def test_find_MaxLimit(self): - cp = NodeCommands(Volume('db')) + cp = NodeCommands(False, 'guid', Volume('db')) call(cp, method='POST', document='context', principal='principal', content={ 'type': 'activity', @@ -274,7 +274,7 @@ class NodeTest(tests.Test): def test_DeletedDocuments(self): volume = Volume('db') - cp = NodeCommands(volume) + cp = NodeCommands(False, 'guid', volume) guid = call(cp, method='POST', document='context', principal='principal', content={ 'type': 'activity', @@ -293,7 +293,7 @@ class NodeTest(tests.Test): def test_SetGuidOnMaster(self): volume1 = Volume('db1') - cp1 = NodeCommands(volume1) + cp1 = NodeCommands(False, 'guid', volume1) call(cp1, method='POST', document='context', principal='principal', content={ 'type': 'activity', 'title': 'title', @@ -304,8 +304,7 @@ class NodeTest(tests.Test): self.assertRaises(db.NotFound, call, cp1, method='GET', document='context', guid='foo') volume2 = Volume('db2') - self.touch('db2/master') - cp2 = NodeCommands(volume2) + cp2 = NodeCommands(True, 'guid', volume2) call(cp2, method='POST', document='context', principal='principal', content={ 'type': 'activity', 'title': 'title', diff --git a/tests/units/toolkit/sneakernet.py b/tests/units/node/sneakernet.py index abab510..7712ec0 100755 --- a/tests/units/toolkit/sneakernet.py +++ b/tests/units/node/sneakernet.py @@ -9,8 +9,8 @@ from os.path import exists from __init__ import tests -from sugar_network.toolkit import sneakernet -from sugar_network.toolkit.sneakernet import InPacket, OutPacket, DiskFull, OutBufferPacket, OutFilePacket +from sugar_network.node import sneakernet +from sugar_network.node.sneakernet import InPacket, OutPacket, DiskFull, OutBufferPacket, OutFilePacket class SneakernetTest(tests.Test): diff --git a/tests/units/node/sync.py b/tests/units/node/sync.py new file mode 100755 index 0000000..abb5e18 --- /dev/null +++ b/tests/units/node/sync.py @@ -0,0 +1,379 @@ +#!/usr/bin/env python +# sugar-lint: disable + +import os +from os.path import exists +from cStringIO import StringIO +import cPickle as pickle + +from __init__ import tests + +from sugar_network import db +from sugar_network.node import sync +from sugar_network.resources.volume import Volume +from sugar_network.resources.user import User +from sugar_network.toolkit.router import Request +from sugar_network.toolkit import util + + +class SyncTest(tests.Test): + + def test_diff(self): + + class Document(db.Document): + + @db.indexed_property(slot=1) + def prop(self, value): + return value + + volume = Volume('db', [Document]) + volume['document'].create(guid='1', seqno=1, prop='a') + for i in os.listdir('db/document/1/1'): + os.utime('db/document/1/1/%s' % i, (1, 1)) + volume['document'].create(guid='2', seqno=2, prop='b') + for i in os.listdir('db/document/2/2'): + os.utime('db/document/2/2/%s' % i, (2, 2)) + + in_seq = util.Sequence([[1, None]]) + self.assertEqual([ + {'document': 'document'}, + {'guid': '1', + 'diff': { + 'guid': {'value': '1', 'mtime': 1}, + 'mtime': {'value': 0, 'mtime': 1}, + 'ctime': {'value': 0, 'mtime': 1}, + 'prop': {'value': 'a', 'mtime': 1}, + }, + }, + {'guid': '2', + 'diff': { + 'guid': {'value': '2', 'mtime': 2}, + 'mtime': {'value': 0, 'mtime': 2}, + 'ctime': {'value': 0, 'mtime': 2}, + 'prop': {'value': 'b', 'mtime': 2}, + }, + }, + {'commit': [[1, 2]]}, + ], + [i for i in sync.diff(volume, in_seq)]) + self.assertEqual([[1, None]], in_seq) + + def test_diff_Partial(self): + + class Document(db.Document): + + @db.indexed_property(slot=1) + def prop(self, value): + return value + + volume = Volume('db', [Document]) + volume['document'].create(guid='1', seqno=1, prop='a') + for i in os.listdir('db/document/1/1'): + os.utime('db/document/1/1/%s' % i, (1, 1)) + volume['document'].create(guid='2', seqno=2, prop='b') + for i in os.listdir('db/document/2/2'): + os.utime('db/document/2/2/%s' % i, (2, 2)) + + in_seq = util.Sequence([[1, None]]) + diff = sync.diff(volume, in_seq) + self.assertEqual({'document': 'document'}, diff.send(None)) + self.assertEqual('1', diff.send(None)['guid']) + self.assertEqual({'commit': []}, diff.send(sync._EOF)) + try: + diff.send(None) + assert False + except StopIteration: + pass + + diff = sync.diff(volume, in_seq) + self.assertEqual({'document': 'document'}, diff.send(None)) + self.assertEqual('1', diff.send(None)['guid']) + self.assertEqual('2', diff.send(None)['guid']) + self.assertEqual({'commit': [[1, 1]]}, diff.send(sync._EOF)) + try: + diff.send(None) + assert False + except StopIteration: + pass + + def test_diff_Collapsed(self): + + class Document(db.Document): + + @db.indexed_property(slot=1) + def prop(self, value): + return value + + volume = Volume('db', [Document]) + volume['document'].create(guid='1', seqno=1, prop='a') + for i in os.listdir('db/document/1/1'): + os.utime('db/document/1/1/%s' % i, (1, 1)) + volume['document'].create(guid='3', seqno=3, prop='c') + for i in os.listdir('db/document/3/3'): + os.utime('db/document/3/3/%s' % i, (3, 3)) + volume['document'].create(guid='5', seqno=5, prop='f') + for i in os.listdir('db/document/5/5'): + os.utime('db/document/5/5/%s' % i, (5, 5)) + + in_seq = util.Sequence([[1, None]]) + diff = sync.diff(volume, in_seq) + self.assertEqual({'document': 'document'}, diff.send(None)) + self.assertEqual('1', diff.send(None)['guid']) + self.assertEqual('3', diff.send(None)['guid']) + self.assertEqual('5', diff.send(None)['guid']) + self.assertEqual({'commit': [[1, 1], [3, 3]]}, diff.send(sync._EOF)) + try: + diff.send(None) + assert False + except StopIteration: + pass + + diff = sync.diff(volume, in_seq) + self.assertEqual({'document': 'document'}, diff.send(None)) + self.assertEqual('1', diff.send(None)['guid']) + self.assertEqual('3', diff.send(None)['guid']) + self.assertEqual('5', diff.send(None)['guid']) + self.assertEqual({'commit': [[1, 5]]}, diff.send(None)) + try: + diff.send(None) + assert False + except StopIteration: + pass + + def test_diff_TheSameInSeqForAllDocuments(self): + + class Document1(db.Document): + pass + + class Document2(db.Document): + pass + + class Document3(db.Document): + pass + + volume = Volume('db', [Document1, Document2, Document3]) + volume['document1'].create(guid='3', seqno=3) + for i in os.listdir('db/document1/3/3'): + os.utime('db/document1/3/3/%s' % i, (3, 3)) + volume['document2'].create(guid='2', seqno=2) + for i in os.listdir('db/document2/2/2'): + os.utime('db/document2/2/2/%s' % i, (2, 2)) + volume['document3'].create(guid='1', seqno=1) + for i in os.listdir('db/document3/1/1'): + os.utime('db/document3/1/1/%s' % i, (1, 1)) + + in_seq = util.Sequence([[1, None]]) + diff = sync.diff(volume, in_seq) + self.assertEqual({'document': 'document1'}, diff.send(None)) + self.assertEqual('3', diff.send(None)['guid']) + self.assertEqual({'document': 'document2'}, diff.send(None)) + self.assertEqual('2', diff.send(None)['guid']) + self.assertEqual({'document': 'document3'}, diff.send(None)) + self.assertEqual('1', diff.send(None)['guid']) + self.assertEqual({'commit': [[1, 3]]}, diff.send(None)) + try: + diff.send(None) + assert False + except StopIteration: + pass + + def test_merge_Create(self): + + class Document1(db.Document): + + @db.indexed_property(slot=1) + def prop(self, value): + return value + + class Document2(db.Document): + pass + + self.touch(('db/seqno', '100')) + volume = Volume('db', [Document1, Document2]) + + records = [ + {'document': 'document1'}, + {'guid': '1', 'diff': { + 'guid': {'value': '1', 'mtime': 1.0}, + 'ctime': {'value': 2, 'mtime': 2.0}, + 'mtime': {'value': 3, 'mtime': 3.0}, + 'prop': {'value': '4', 'mtime': 4.0}, + }}, + {'document': 'document2'}, + {'guid': '5', 'diff': { + 'guid': {'value': '5', 'mtime': 5.0}, + 'ctime': {'value': 6, 'mtime': 6.0}, + 'mtime': {'value': 7, 'mtime': 7.0}, + }}, + {'commit': [[1, 2]]}, + ] + self.assertEqual(([[1, 2]], [[101, 102]]), sync.merge(volume, records)) + + self.assertEqual( + {'guid': '1', 'prop': '4', 'ctime': 2, 'mtime': 3}, + volume['document1'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) + self.assertEqual(1, os.stat('db/document1/1/1/guid').st_mtime) + self.assertEqual(2, os.stat('db/document1/1/1/ctime').st_mtime) + self.assertEqual(3, os.stat('db/document1/1/1/mtime').st_mtime) + self.assertEqual(4, os.stat('db/document1/1/1/prop').st_mtime) + + self.assertEqual( + {'guid': '5', 'ctime': 6, 'mtime': 7}, + volume['document2'].get('5').properties(['guid', 'ctime', 'mtime'])) + self.assertEqual(5, os.stat('db/document2/5/5/guid').st_mtime) + self.assertEqual(6, os.stat('db/document2/5/5/ctime').st_mtime) + self.assertEqual(7, os.stat('db/document2/5/5/mtime').st_mtime) + + def test_merge_Update(self): + + class Document(db.Document): + + @db.indexed_property(slot=1) + def prop(self, value): + return value + + self.touch(('db/seqno', '100')) + volume = Volume('db', [Document]) + volume['document'].create(guid='1', prop='1', ctime=1, mtime=1) + for i in os.listdir('db/document/1/1'): + os.utime('db/document/1/1/%s' % i, (2, 2)) + + records = [ + {'document': 'document'}, + {'guid': '1', 'diff': {'prop': {'value': '2', 'mtime': 1.0}}}, + {'commit': 1}, + ] + self.assertEqual((1, []), sync.merge(volume, records)) + self.assertEqual( + {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}, + volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) + self.assertEqual(2, os.stat('db/document/1/1/prop').st_mtime) + + records = [ + {'document': 'document'}, + {'guid': '1', 'diff': {'prop': {'value': '3', 'mtime': 2.0}}}, + {'commit': 2}, + ] + self.assertEqual((2, []), sync.merge(volume, records)) + self.assertEqual( + {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}, + volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) + self.assertEqual(2, os.stat('db/document/1/1/prop').st_mtime) + + records = [ + {'document': 'document'}, + {'guid': '1', 'diff': {'prop': {'value': '4', 'mtime': 3.0}}}, + {'commit': 3}, + ] + self.assertEqual((3, [[102, 102]]), sync.merge(volume, records)) + self.assertEqual( + {'guid': '1', 'prop': '4', 'ctime': 1, 'mtime': 1}, + volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) + self.assertEqual(3, os.stat('db/document/1/1/prop').st_mtime) + + def test_merge_StopOnCommit(self): + + class Document(db.Document): + pass + + self.touch(('db/seqno', '100')) + volume = Volume('db', [Document]) + + def generator(): + for i in [ + {'document': 'document'}, + {'guid': '1', 'diff': { + 'guid': {'value': '1', 'mtime': 1.0}, + 'ctime': {'value': 2, 'mtime': 2.0}, + 'mtime': {'value': 3, 'mtime': 3.0}, + 'prop': {'value': '4', 'mtime': 4.0}, + }}, + {'commit': [[1, 1]]}, + {'tail': True}, + ]: + yield i + + records = generator() + self.assertEqual(([[1, 1]], [[101, 101]]), sync.merge(volume, records)) + assert volume['document'].exists('1') + self.assertEqual({'tail': True}, next(records)) + + def test_decode(self): + stream = StringIO() + self.assertRaises(EOFError, sync.decode(stream).next) + + stream = StringIO() + pickle.dump({'foo': 'bar'}, stream) + stream.seek(0) + sinput = sync.decode(stream) + self.assertEqual({'foo': 'bar'}, sinput.next()) + self.assertRaises(EOFError, sinput.next) + + stream = StringIO() + pickle.dump({'commit': None}, stream) + stream.seek(0) + self.assertEqual([ + {'commit': None}, + ], [i for i in sync.decode(stream)]) + + stream = StringIO() + pickle.dump({}, stream) + pickle.dump({'commit': None}, stream) + stream.seek(0) + self.assertEqual([ + {}, + {'commit': None}, + ], [i for i in sync.decode(stream)]) + + def test_chunked_encode(self): + output = sync.chunked_encode(iter([])) + self.assertEqual('0\r\n\r\n', output.read(10)) + + data = [{'foo': 1}, {'bar': 2}, 3] + data_stream = '' + for record in data: + data_stream += pickle.dumps(record) + + output = sync.chunked_encode(iter(data)) + dump = StringIO() + while True: + chunk = output.read(1) + if not chunk: + break + dump.write(chunk) + self.assertEqual(data_stream, decode_chunked(dump.getvalue())) + + output = sync.chunked_encode(iter(data)) + dump = StringIO() + while True: + chunk = output.read(2) + if not chunk: + break + dump.write(chunk) + self.assertEqual(data_stream, decode_chunked(dump.getvalue())) + + output = sync.chunked_encode(iter(data)) + dump = StringIO() + while True: + chunk = output.read(1000) + if not chunk: + break + dump.write(chunk) + self.assertEqual(data_stream, decode_chunked(dump.getvalue())) + + +def decode_chunked(encdata): + offset = 0 + newdata = '' + while (encdata != ''): + off = int(encdata[:encdata.index("\r\n")],16) + if off == 0: + break + encdata = encdata[encdata.index("\r\n") + 2:] + newdata = "%s%s" % (newdata, encdata[:off]) + encdata = encdata[off+2:] + return newdata + + +if __name__ == '__main__': + tests.main() diff --git a/tests/units/node/sync_node.py b/tests/units/node/sync_node.py index 90e5fa5..6d53100 100755 --- a/tests/units/node/sync_node.py +++ b/tests/units/node/sync_node.py @@ -401,7 +401,7 @@ class SyncCommands(sync_node.SyncCommands): self.node_mount = self self.events = [] - def publish(self, event): + def broadcast(self, event): self.events.append(event) diff --git a/tests/units/node/sync_online.py b/tests/units/node/sync_online.py new file mode 100755 index 0000000..8b8f2e5 --- /dev/null +++ b/tests/units/node/sync_online.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# sugar-lint: disable + +from __init__ import tests + +from sugar_network import db +from sugar_network.client import Client, api_url +from sugar_network.node import sync +from sugar_network.node.master import MasterCommands +from sugar_network.node.slave import SlaveCommands +from sugar_network.resources.volume import Volume +from sugar_network.resources.user import User +from sugar_network.resources.feedback import Feedback +from sugar_network.toolkit.router import Request, Router +from sugar_network.toolkit import util, coroutine + + +class SyncOnlineTest(tests.Test): + + def setUp(self): + tests.Test.setUp(self) + + class Document(Feedback): + pass + + self.master_volume = Volume('master', [User, Document]) + self.master_server = coroutine.WSGIServer(('localhost', 9000), Router(MasterCommands(self.master_volume))) + coroutine.spawn(self.master_server.serve_forever) + coroutine.dispatch() + client = Client('http://localhost:9000') + client.get(cmd='whoami') + + api_url.value = 'http://localhost:9000' + self.slave_volume = Volume('slave', [User, Document]) + self.slave_server = coroutine.WSGIServer(('localhost', 9001), Router(SlaveCommands('slave', self.slave_volume))) + coroutine.spawn(self.slave_server.serve_forever) + coroutine.dispatch() + + def tearDown(self): + self.master_server.stop() + self.slave_server.stop() + tests.Test.tearDown(self) + + def test_sync_Creaqte(self): + client = Client('http://localhost:9001') + + guid1 = client.post(['document'], {'context': '', 'content': '1', 'title': '', 'type': 'idea'}) + guid2 = client.post(['document'], {'context': '', 'content': '2', 'title': '', 'type': 'idea'}) + + client.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid1, 'content': {'en-us': '1'}}, + {'guid': guid2, 'content': {'en-us': '2'}}, + ], + [i.properties(['guid', 'content']) for i in self.master_volume['document'].find()[0]]) + + guid3 = client.post(['document'], {'context': '', 'content': '3', 'title': '', 'type': 'idea'}) + client.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid1, 'content': {'en-us': '1'}}, + {'guid': guid2, 'content': {'en-us': '2'}}, + {'guid': guid3, 'content': {'en-us': '3'}}, + ], + [i.properties(['guid', 'content']) for i in self.master_volume['document'].find()[0]]) + + def test_sync_Update(self): + client = Client('http://localhost:9001') + + guid = client.post(['document'], {'context': '', 'content': '1', 'title': '', 'type': 'idea'}) + client.post(cmd='online_sync') + coroutine.sleep(1) + + client.put(['document', guid], {'content': '2'}) + client.post(cmd='online_sync') + self.assertEqual( + {'guid': guid, 'content': {'en-us': '2'}}, + self.master_volume['document'].get(guid).properties(['guid', 'content'])) + + def test_sync_Delete(self): + client = Client('http://localhost:9001') + + guid1 = client.post(['document'], {'context': '', 'content': '1', 'title': '', 'type': 'idea'}) + guid2 = client.post(['document'], {'context': '', 'content': '2', 'title': '', 'type': 'idea'}) + guid3 = client.post(['document'], {'context': '', 'content': '3', 'title': '', 'type': 'idea'}) + client.post(cmd='online_sync') + coroutine.sleep(1) + + client.delete(['document', guid2]) + client.post(cmd='online_sync') + self.assertEqual([ + {'guid': guid1, 'content': {'en-us': '1'}, 'layer': ['public']}, + {'guid': guid2, 'content': {'en-us': '2'}, 'layer': ['deleted']}, + {'guid': guid3, 'content': {'en-us': '3'}, 'layer': ['public']}, + ], + [i.properties(['guid', 'content', 'layer']) for i in self.master_volume['document'].find()[0]]) + + + +if __name__ == '__main__': + tests.main() diff --git a/tests/units/resources/volume.py b/tests/units/resources/volume.py index 35722a6..e3d599c 100755 --- a/tests/units/resources/volume.py +++ b/tests/units/resources/volume.py @@ -10,7 +10,6 @@ from os.path import exists from __init__ import tests from sugar_network import db, node -from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, DiskFull from sugar_network.resources.volume import Volume, Resource, Commands, VolumeCommands from sugar_network.resources.user import User from sugar_network.toolkit.router import Request @@ -19,405 +18,6 @@ from sugar_network.toolkit import coroutine, sugar, util class VolumeTest(tests.Test): - def test_diff(self): - - class Document(db.Document): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - volume = Volume('db', [Document]) - volume['document'].create(guid='1', seqno=1, prop='a') - for i in os.listdir('db/document/1/1'): - os.utime('db/document/1/1/%s' % i, (1, 1)) - volume['document'].create(guid='2', seqno=2, prop='b') - for i in os.listdir('db/document/2/2'): - os.utime('db/document/2/2/%s' % i, (2, 2)) - - class Packet(list): - - def push(self, **kwargs): - self.append(kwargs) - return True - - packet = Packet() - in_seq = util.Sequence([[1, None]]) - volume.diff(in_seq, packet) - self.assertEqual([ - {'document': 'document'}, - {'guid': '1', - 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, - 'mtime': {'value': 0, 'mtime': 1.0}, - 'ctime': {'value': 0, 'mtime': 1.0}, - 'prop': {'value': 'a', 'mtime': 1.0}, - }, - }, - {'guid': '2', - 'diff': { - 'guid': {'value': '2', 'mtime': 2.0}, - 'mtime': {'value': 0, 'mtime': 2.0}, - 'ctime': {'value': 0, 'mtime': 2.0}, - 'prop': {'value': 'b', 'mtime': 2.0}, - }, - }, - {'commit': [[1, 2]]}, - ], - packet) - self.assertEqual([[3, None]], in_seq) - - def test_diff_Partial(self): - - class Document(db.Document): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - volume = Volume('db', [Document]) - volume['document'].create(guid='1', seqno=1, prop='a') - for i in os.listdir('db/document/1/1'): - os.utime('db/document/1/1/%s' % i, (1, 1)) - volume['document'].create(guid='2', seqno=2, prop='b') - for i in os.listdir('db/document/2/2'): - os.utime('db/document/2/2/%s' % i, (2, 2)) - - class Packet(list): - - def push(self, **kwargs): - if kwargs.get('guid') == '1': - return False - self.append(kwargs) - return True - - packet = Packet() - in_seq = util.Sequence([[1, None]]) - volume.diff(in_seq, packet) - self.assertEqual([ - {'document': 'document'}, - {'commit': []}, - ], - packet) - self.assertEqual([[1, None]], in_seq) - - class Packet(list): - - def push(self, **kwargs): - if kwargs.get('guid') == '2': - return False - self.append(kwargs) - return True - - packet = Packet() - in_seq = util.Sequence([[1, None]]) - volume.diff(in_seq, packet) - self.assertEqual([ - {'document': 'document'}, - {'guid': '1', - 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, - 'mtime': {'value': 0, 'mtime': 1.0}, - 'ctime': {'value': 0, 'mtime': 1.0}, - 'prop': {'value': 'a', 'mtime': 1.0}, - }, - }, - {'commit': [[1, 1]]}, - ], - packet) - self.assertEqual([[2, None]], in_seq) - - def test_diff_Collapsed(self): - - class Document(db.Document): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - volume = Volume('db', [Document]) - volume['document'].create(guid='1', seqno=1, prop='a') - for i in os.listdir('db/document/1/1'): - os.utime('db/document/1/1/%s' % i, (1, 1)) - volume['document'].create(guid='3', seqno=3, prop='c') - for i in os.listdir('db/document/3/3'): - os.utime('db/document/3/3/%s' % i, (3, 3)) - volume['document'].create(guid='5', seqno=5, prop='f') - for i in os.listdir('db/document/5/5'): - os.utime('db/document/5/5/%s' % i, (5, 5)) - - class Packet(list): - - def push(self, **kwargs): - if kwargs.get('guid') == '5': - return False - self.append(kwargs) - return True - - packet = Packet() - in_seq = util.Sequence([[1, None]]) - volume.diff(in_seq, packet) - self.assertEqual([ - {'document': 'document'}, - {'guid': '1', - 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, - 'mtime': {'value': 0, 'mtime': 1.0}, - 'ctime': {'value': 0, 'mtime': 1.0}, - 'prop': {'value': 'a', 'mtime': 1.0}, - }, - }, - {'guid': '3', - 'diff': { - 'guid': {'value': '3', 'mtime': 3.0}, - 'mtime': {'value': 0, 'mtime': 3.0}, - 'ctime': {'value': 0, 'mtime': 3.0}, - 'prop': {'value': 'c', 'mtime': 3.0}, - }, - }, - {'commit': [[1, 1], [3, 3]]}, - ], - packet) - self.assertEqual([[2, 2], [4, None]], in_seq) - - class Packet(list): - - def push(self, **kwargs): - self.append(kwargs) - return True - - packet = Packet() - in_seq = util.Sequence([[1, None]]) - volume.diff(in_seq, packet) - self.assertEqual([ - {'document': 'document'}, - {'guid': '1', - 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, - 'mtime': {'value': 0, 'mtime': 1.0}, - 'ctime': {'value': 0, 'mtime': 1.0}, - 'prop': {'value': 'a', 'mtime': 1.0}, - }, - }, - {'guid': '3', - 'diff': { - 'guid': {'value': '3', 'mtime': 3.0}, - 'mtime': {'value': 0, 'mtime': 3.0}, - 'ctime': {'value': 0, 'mtime': 3.0}, - 'prop': {'value': 'c', 'mtime': 3.0}, - }, - }, - {'guid': '5', - 'diff': { - 'guid': {'value': '5', 'mtime': 5.0}, - 'mtime': {'value': 0, 'mtime': 5.0}, - 'ctime': {'value': 0, 'mtime': 5.0}, - 'prop': {'value': 'f', 'mtime': 5.0}, - }, - }, - {'commit': [[1, 5]]}, - ], - packet) - self.assertEqual([[6, None]], in_seq) - - def test_diff_TheSameInSeqForAllDocuments(self): - - class Document1(db.Document): - pass - - class Document2(db.Document): - pass - - class Document3(db.Document): - pass - - volume = Volume('db', [Document1, Document2, Document3]) - volume['document1'].create(guid='3', seqno=3) - for i in os.listdir('db/document1/3/3'): - os.utime('db/document1/3/3/%s' % i, (3, 3)) - volume['document2'].create(guid='2', seqno=2) - for i in os.listdir('db/document2/2/2'): - os.utime('db/document2/2/2/%s' % i, (2, 2)) - volume['document3'].create(guid='1', seqno=1) - for i in os.listdir('db/document3/1/1'): - os.utime('db/document3/1/1/%s' % i, (1, 1)) - - class Packet(list): - - def push(self, **kwargs): - self.append(kwargs) - return True - - packet = Packet() - in_seq = util.Sequence([[1, None]]) - volume.diff(in_seq, packet) - self.assertEqual([ - {'document': 'document1'}, - {'guid': '3', - 'diff': { - 'guid': {'value': '3', 'mtime': 3.0}, - 'mtime': {'value': 0, 'mtime': 3.0}, - 'ctime': {'value': 0, 'mtime': 3.0}, - }, - }, - {'document': 'document2'}, - {'guid': '2', - 'diff': { - 'guid': {'value': '2', 'mtime': 2.0}, - 'mtime': {'value': 0, 'mtime': 2.0}, - 'ctime': {'value': 0, 'mtime': 2.0}, - }, - }, - {'document': 'document3'}, - {'guid': '1', - 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, - 'mtime': {'value': 0, 'mtime': 1.0}, - 'ctime': {'value': 0, 'mtime': 1.0}, - }, - }, - {'commit': [[1, 3]]}, - ], - packet) - self.assertEqual([[4, None]], in_seq) - - def test_merge_Create(self): - - class Document1(db.Document): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - class Document2(db.Document): - pass - - volume = Volume('db', [Document1, Document2]) - - self.assertEqual( - [[1, 2]], - volume.merge([ - {'document': 'document1'}, - {'guid': '1', - 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, - 'ctime': {'value': 2, 'mtime': 2.0}, - 'mtime': {'value': 3, 'mtime': 3.0}, - 'prop': {'value': '4', 'mtime': 4.0}, - }, - }, - {'document': 'document2'}, - {'guid': '5', - 'diff': { - 'guid': {'value': '5', 'mtime': 5.0}, - 'ctime': {'value': 6, 'mtime': 6.0}, - 'mtime': {'value': 7, 'mtime': 7.0}, - }, - }, - {'commit': [[1, 2]]}, - ])) - - self.assertEqual( - {'guid': '1', 'prop': '4', 'ctime': 2, 'mtime': 3}, - volume['document1'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) - self.assertEqual(1, os.stat('db/document1/1/1/guid').st_mtime) - self.assertEqual(2, os.stat('db/document1/1/1/ctime').st_mtime) - self.assertEqual(3, os.stat('db/document1/1/1/mtime').st_mtime) - self.assertEqual(4, os.stat('db/document1/1/1/prop').st_mtime) - - self.assertEqual( - {'guid': '5', 'ctime': 6, 'mtime': 7}, - volume['document2'].get('5').properties(['guid', 'ctime', 'mtime'])) - self.assertEqual(5, os.stat('db/document2/5/5/guid').st_mtime) - self.assertEqual(6, os.stat('db/document2/5/5/ctime').st_mtime) - self.assertEqual(7, os.stat('db/document2/5/5/mtime').st_mtime) - - def test_merge_Update(self): - - class Document(db.Document): - - @db.indexed_property(slot=1) - def prop(self, value): - return value - - volume = Volume('db', [Document]) - volume['document'].create(guid='1', prop='1', ctime=1, mtime=1) - for i in os.listdir('db/document/1/1'): - os.utime('db/document/1/1/%s' % i, (2, 2)) - - self.assertEqual( - [], - volume.merge([ - {'document': 'document'}, - {'guid': '1', - 'diff': { - 'prop': {'value': '2', 'mtime': 1.0}, - }, - }, - {'commit': []}, - ])) - self.assertEqual( - {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}, - volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) - self.assertEqual(2, os.stat('db/document/1/1/prop').st_mtime) - - self.assertEqual( - [], - volume.merge([ - {'document': 'document'}, - {'guid': '1', - 'diff': { - 'prop': {'value': '3', 'mtime': 2.0}, - }, - }, - {'commit': []}, - ])) - self.assertEqual( - {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}, - volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) - self.assertEqual(2, os.stat('db/document/1/1/prop').st_mtime) - - self.assertEqual( - [], - volume.merge([ - {'document': 'document'}, - {'guid': '1', - 'diff': { - 'prop': {'value': '4', 'mtime': 3.0}, - }, - }, - {'commit': []}, - ])) - self.assertEqual( - {'guid': '1', 'prop': '4', 'ctime': 1, 'mtime': 1}, - volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop'])) - self.assertEqual(3, os.stat('db/document/1/1/prop').st_mtime) - - def test_merge_StopOnCommit(self): - - class Document(db.Document): - pass - - volume = Volume('db', [Document]) - - diff = iter([ - {'document': 'document'}, - {'guid': '1', - 'diff': { - 'guid': {'value': '1', 'mtime': 1.0}, - 'ctime': {'value': 2, 'mtime': 2.0}, - 'mtime': {'value': 3, 'mtime': 3.0}, - 'prop': {'value': '4', 'mtime': 4.0}, - }, - }, - {'commit': [[1, 1]]}, - {'tail': True}, - ]) - - self.assertEqual([[1, 1]], volume.merge(diff)) - assert volume['document'].exists('1') - self.assertEqual([{'tail': True}], [i for i in diff]) - def test_SimulateDeleteEvents(self): class Document(Resource): @@ -904,15 +504,6 @@ class TestCommands(VolumeCommands, Commands): self.volume.connect(callback, condition) -def read_packet(packet): - result = [] - for i in InPacket(stream=packet.pop()): - if 'diff' in i: - i.pop('diff') - result.append(i) - return result - - def call(cp, principal=None, content=None, **kwargs): request = Request(**kwargs) request.principal = principal diff --git a/tests/units/toolkit/__main__.py b/tests/units/toolkit/__main__.py index 30a7b47..7a9587a 100644 --- a/tests/units/toolkit/__main__.py +++ b/tests/units/toolkit/__main__.py @@ -2,11 +2,9 @@ from __init__ import tests -#from files_sync import * from http import * from mountpoints import * from router import * -from sneakernet import * from util import * if __name__ == '__main__': diff --git a/tests/units/toolkit/util.py b/tests/units/toolkit/util.py index c5b1311..a25eea5 100755 --- a/tests/units/toolkit/util.py +++ b/tests/units/toolkit/util.py @@ -3,9 +3,11 @@ import copy from os.path import exists +from cStringIO import StringIO from __init__ import tests +from sugar_network.toolkit import util from sugar_network.toolkit.util import Seqno, Sequence @@ -337,6 +339,25 @@ class UtilTest(tests.Test): seq.include(10, 11) self.assertEqual(11, seq.last) + def test_readline(self): + + def readlines(string): + result = [] + stream = StringIO(string) + while True: + line = util.readline(stream) + if not line: + break + result.append(line) + return result + + self.assertEqual([], readlines('')) + self.assertEqual([' '], readlines(' ')) + self.assertEqual([' a '], readlines(' a ')) + self.assertEqual(['\n'], readlines('\n')) + self.assertEqual(['\n', 'b'], readlines('\nb')) + self.assertEqual([' \n', ' b \n'], readlines(' \n b \n')) + if __name__ == '__main__': tests.main() diff --git a/tests/units/zerosugar/injector.py b/tests/units/zerosugar/injector.py index f5299b6..0b61531 100755 --- a/tests/units/zerosugar/injector.py +++ b/tests/units/zerosugar/injector.py @@ -12,13 +12,12 @@ from os.path import exists, dirname from __init__ import tests -from sugar_network import zeroinstall from sugar_network.client import journal from sugar_network.toolkit import coroutine, enforce, pipe as pipe_, lsb_release from sugar_network.resources.user import User from sugar_network.resources.context import Context from sugar_network.resources.implementation import Implementation -from sugar_network.zerosugar import packagekit, injector, clones +from sugar_network.zerosugar import packagekit, injector, clones, solver from sugar_network.client import IPCClient from sugar_network import client as local @@ -395,7 +394,7 @@ class InjectorTest(tests.Test): def test_SolutionsCache_Set(self): solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}] - self.override(zeroinstall, 'solve', lambda *args: solution) + self.override(solver, 'solve', lambda *args: solution) self.assertEqual(solution, injector._solve('~', 'context')) self.assertEqual([local.api_url.value, solution], pickle.load(file('cache/solutions/~/co/context'))) @@ -408,7 +407,7 @@ class InjectorTest(tests.Test): def test_SolutionsCache_InvalidateByAPIUrl(self): solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}] - self.override(zeroinstall, 'solve', lambda *args: solution) + self.override(solver, 'solve', lambda *args: solution) cached_path = 'cache/solutions/~/co/context' solution2 = [{'name': 'name2', 'context': 'context2', 'id': 'id2', 'version': 'version2'}] @@ -422,7 +421,7 @@ class InjectorTest(tests.Test): def test_SolutionsCache_InvalidateByMtime(self): solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}] - self.override(zeroinstall, 'solve', lambda *args: solution) + self.override(solver, 'solve', lambda *args: solution) cached_path = 'cache/solutions/~/co/context' solution2 = [{'name': 'name2', 'context': 'context2', 'id': 'id2', 'version': 'version2'}] @@ -442,7 +441,7 @@ class InjectorTest(tests.Test): def test_SolutionsCache_InvalidateByPMSMtime(self): solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}] - self.override(zeroinstall, 'solve', lambda *args: solution) + self.override(solver, 'solve', lambda *args: solution) cached_path = 'cache/solutions/~/co/context' injector._pms_path = 'pms' @@ -464,7 +463,7 @@ class InjectorTest(tests.Test): def test_SolutionsCache_InvalidateBySpecMtime(self): solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}] - self.override(zeroinstall, 'solve', lambda *args: solution) + self.override(solver, 'solve', lambda *args: solution) cached_path = 'cache/solutions/~/co/context' solution2 = [{'spec': 'spec', 'name': 'name2', 'context': 'context2', 'id': 'id2', 'version': 'version2'}] @@ -630,7 +629,7 @@ class InjectorTest(tests.Test): {'version': '1', 'id': 'dep3', 'context': 'dep3', 'name': 'title3'}, {'name': 'title', 'version': '1', 'command': ['echo'], 'context': context, 'mountpoint': '/', 'id': impl}, ]), - sorted(zeroinstall.solve('/', context))) + sorted(solver.solve('/', context))) def test_NoDepsClonning(self): self.touch('remote/master') @@ -663,12 +662,12 @@ class InjectorTest(tests.Test): }, }}) - self.assertRaises(RuntimeError, zeroinstall.solve, '/', context) + self.assertRaises(RuntimeError, solver.solve, '/', context) - zeroinstall.nodeps = True + solver.nodeps = True self.assertEqual( [{'name': 'title', 'version': '1', 'command': ['echo'], 'context': context, 'mountpoint': '/', 'id': impl}], - zeroinstall.solve('/', context)) + solver.solve('/', context)) def test_LoadFeed_SetPackages(self): self.touch('remote/master') @@ -711,7 +710,7 @@ class InjectorTest(tests.Test): return dict([(i, {'name': i, 'pk_id': i, 'version': '1', 'arch': '*', 'installed': True}) for i in names]) self.override(packagekit, 'resolve', resolve) - self.assertRaises(RuntimeError, zeroinstall.solve, '/', context) + self.assertRaises(RuntimeError, solver.solve, '/', context) remote.put(['context', 'dep', 'packages'], { lsb_release.distributor_id() + '-' + lsb_release.release(): { @@ -719,7 +718,7 @@ class InjectorTest(tests.Test): 'binary': ['bin'], }, }) - self.assertEqual('dep', zeroinstall.solve('/', context)[-1]['context']) + self.assertEqual('dep', solver.solve('/', context)[-1]['context']) remote.put(['context', 'dep', 'packages'], { 'foo': { @@ -727,14 +726,14 @@ class InjectorTest(tests.Test): 'binary': ['bin'], }, }) - self.assertRaises(RuntimeError, zeroinstall.solve, '/', context) + self.assertRaises(RuntimeError, solver.solve, '/', context) remote.put(['context', 'dep', 'packages'], { lsb_release.distributor_id(): { 'binary': ['bin'], }, }) - self.assertEqual('dep', zeroinstall.solve('/', context)[-1]['context']) + self.assertEqual('dep', solver.solve('/', context)[-1]['context']) def test_SolveSugar(self): self.touch(('__init__.py', '')) @@ -782,7 +781,7 @@ class InjectorTest(tests.Test): {'name': 'title', 'version': '1', 'command': ['echo'], 'context': context, 'mountpoint': '/', 'id': impl}, {'name': 'sugar', 'version': '777', 'context': 'sugar', 'path': '/', 'mountpoint': None, 'id': 'sugar'}, ], - zeroinstall.solve('/', context)) + solver.solve('/', context)) if __name__ == '__main__': |