Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2013-02-19 23:39:44 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2013-02-19 23:39:44 (GMT)
commitd6ef221dd9dc627d55997cb00c2d23400ebebc03 (patch)
tree0b8c805c0d58eae4e5b94d8d85f9ba6baba9575c
parent28676beb8c9ebcb50679dcd52ae3d35b036e6bd0 (diff)
Online sync of SN data
-rw-r--r--TODO1
-rwxr-xr-xsugar-network-client4
-rwxr-xr-xsugar-network-node27
-rw-r--r--sugar_network/client/mounts.py10
-rw-r--r--sugar_network/client/mountset.py21
-rw-r--r--sugar_network/node/__init__.py11
-rw-r--r--sugar_network/node/commands.py40
-rw-r--r--sugar_network/node/files_sync.py2
-rw-r--r--sugar_network/node/master.py42
-rw-r--r--sugar_network/node/obs.py1
-rw-r--r--sugar_network/node/slave.py245
-rw-r--r--sugar_network/node/sneakernet.py (renamed from sugar_network/toolkit/sneakernet.py)0
-rw-r--r--sugar_network/node/stats.py3
-rw-r--r--sugar_network/node/sync.py118
-rw-r--r--sugar_network/node/sync_master.py285
-rw-r--r--sugar_network/node/sync_node.py223
-rw-r--r--sugar_network/resources/volume.py40
-rw-r--r--sugar_network/toolkit/http.py28
l---------sugar_network/toolkit/pylru.py2
-rw-r--r--sugar_network/toolkit/router.py6
-rw-r--r--sugar_network/toolkit/util.py12
-rw-r--r--sugar_network/zerosugar/cache.py1
-rw-r--r--sugar_network/zerosugar/injector.py6
-rw-r--r--sugar_network/zerosugar/packagekit.py7
-rw-r--r--sugar_network/zerosugar/solver.py9
-rw-r--r--sweets.recipe7
-rw-r--r--tests/__init__.py17
-rwxr-xr-xtests/integration/cli.py2
-rwxr-xr-xtests/units/client/mountset.py6
-rwxr-xr-xtests/units/client/remote_mount.py4
-rw-r--r--tests/units/node/__main__.py6
-rwxr-xr-xtests/units/node/node.py25
-rwxr-xr-xtests/units/node/sneakernet.py (renamed from tests/units/toolkit/sneakernet.py)4
-rwxr-xr-xtests/units/node/sync.py379
-rwxr-xr-xtests/units/node/sync_node.py2
-rwxr-xr-xtests/units/node/sync_online.py100
-rwxr-xr-xtests/units/resources/volume.py409
-rw-r--r--tests/units/toolkit/__main__.py2
-rwxr-xr-xtests/units/toolkit/util.py21
-rwxr-xr-xtests/units/zerosugar/injector.py31
40 files changed, 1040 insertions, 1119 deletions
diff --git a/TODO b/TODO
index 1d9458a..34156c4 100644
--- a/TODO
+++ b/TODO
@@ -19,3 +19,4 @@
- unstall activities on checking out and on initial syncing
- "Cannot find implementation for" error if there is no required sugar
- trace says that current sugar version is (ok)
+- increase granularity for sync.chunked_encode()
diff --git a/sugar-network-client b/sugar-network-client
index 55fcb95..6cc1689 100755
--- a/sugar-network-client
+++ b/sugar-network-client
@@ -32,7 +32,7 @@ from sugar_network.toolkit.router import IPCRouter
from sugar_network.client.mounts import HomeMount, RemoteMount
from sugar_network.client.mountset import Mountset
from sugar_network.zerosugar import clones
-from sugar_network.node import stats
+from sugar_network.node import stats, slave
from sugar_network.resources.volume import Volume
from sugar_network.toolkit import Option
from sugar_network.toolkit import util, printf, application, coroutine, enforce
@@ -202,7 +202,7 @@ Option.seek('main', application)
Option.seek('webui', webui)
Option.seek('client', client)
Option.seek('client', [sugar.keyfile, toolkit.tmpdir])
-Option.seek('node', [node.port, node.sync_dirs])
+Option.seek('node', [node.port, slave.sync_dirs])
Option.seek('stats', stats)
Option.seek('db', db)
diff --git a/sugar-network-node b/sugar-network-node
index 314e750..7d71739 100755
--- a/sugar-network-node
+++ b/sugar-network-node
@@ -1,6 +1,6 @@
#!/usr/bin/env python
-# Copyright (C) 2012 Aleksey Lim
+# Copyright (C) 2012-2013 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -18,7 +18,7 @@
import os
import locale
import logging
-from os.path import exists
+from os.path import exists, join
from gevent import monkey
@@ -28,7 +28,8 @@ from sugar_network.client.mounts import LocalMount
from sugar_network.client.mountset import Mountset
from sugar_network.client.mounts import LocalMount
from sugar_network.node import stats, obs
-from sugar_network.node.commands import NodeCommands
+from sugar_network.node.master import MasterCommands
+from sugar_network.node.slave import SlaveCommands
from sugar_network.resources.volume import Volume
from sugar_network.toolkit.router import Router
from sugar_network.toolkit import sugar, coroutine, application, util, Option
@@ -57,7 +58,25 @@ class Application(application.Daemon):
if stats.stats_node_step.value:
node_stats = stats.NodeStats(volume)
self.jobs.spawn(self._commit_stats, node_stats)
- cp = NodeCommands(volume, node_stats)
+
+ node_path = join(node.data_root.value, 'node')
+ master_path = join(node.data_root.value, 'master')
+ if exists(node_path):
+ with file(node_path) as f:
+ guid = f.read().strip()
+ logging.info('Start %s node in slave mode', guid)
+ cp = SlaveCommands(guid, volume, node_stats)
+ elif exists(master_path):
+ with file(master_path) as f:
+ guid = f.read().strip()
+ logging.info('Start %s node in master mode', guid)
+ cp = MasterCommands(guid, volume, node_stats)
+ else:
+ guid = db.uuid()
+ with file(node_path, 'w') as f:
+ f.write(guid)
+ logging.info('Start new %s node in slave mode', guid)
+ cp = SlaveCommands(guid, volume, node_stats)
logging.info('Listening for requests on %s:%s',
node.host.value, node.port.value)
diff --git a/sugar_network/client/mounts.py b/sugar_network/client/mounts.py
index 4118997..1c80183 100644
--- a/sugar_network/client/mounts.py
+++ b/sugar_network/client/mounts.py
@@ -59,14 +59,14 @@ class _Mount(object):
self.mounted.set()
else:
self.mounted.clear()
- self.publish({
+ self.broadcast({
'event': 'mount' if value else 'unmount',
'mountpoint': self.mountpoint,
'name': self.name,
'private': self.private,
})
- def publish(self, event):
+ def broadcast(self, event):
if self.publisher is not None:
# pylint: disable-msg=E1102
self.publisher(event)
@@ -98,7 +98,7 @@ class LocalMount(VolumeCommands, _Mount):
def _events_cb(self, event):
event['mountpoint'] = self.mountpoint
- self.publish(event)
+ self.broadcast(event)
class HomeMount(LocalMount):
@@ -121,7 +121,7 @@ class HomeMount(LocalMount):
if props and set(props.keys()) & _LOCAL_PROPS:
# _LOCAL_PROPS are common for `~` and `/` mountpoints
event['mountpoint'] = '/'
- self.publish(event)
+ self.broadcast(event)
LocalMount._events_cb(self, event)
@@ -298,7 +298,7 @@ class RemoteMount(db.CommandsProcessor, _Mount, _ProxyCommands):
if mtime:
injector.invalidate_solutions(mtime)
event['mountpoint'] = self.mountpoint
- self.publish(event)
+ self.broadcast(event)
except Exception:
exception(_logger, 'Failed to dispatch remote event')
finally:
diff --git a/sugar_network/client/mountset.py b/sugar_network/client/mountset.py
index 058fc70..b7b0d80 100644
--- a/sugar_network/client/mountset.py
+++ b/sugar_network/client/mountset.py
@@ -24,7 +24,6 @@ from sugar_network.toolkit import coroutine, util, exception, enforce
from sugar_network.client import journal, zeroconf
from sugar_network.client.mounts import LocalMount, NodeMount
from sugar_network.node.commands import NodeCommands
-from sugar_network.node.sync_node import SyncCommands
from sugar_network.zerosugar import clones, injector
from sugar_network.resources.volume import Volume, Commands
@@ -34,18 +33,17 @@ _DB_DIRNAME = '.sugar-network'
_logger = logging.getLogger('client.mountset')
-class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands,
- SyncCommands):
+class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands):
def __init__(self, home_volume):
self.opened = coroutine.Event()
self._subscriptions = {}
self._jobs = coroutine.Pool()
self._servers = coroutine.Pool()
+ self.node_mount = None
dict.__init__(self)
db.CommandsProcessor.__init__(self)
- SyncCommands.__init__(self, client.path('sync'))
Commands.__init__(self)
if not client.no_dbus.value:
journal.Commands.__init__(self)
@@ -58,7 +56,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands,
def __setitem__(self, mountpoint, mount):
dict.__setitem__(self, mountpoint, mount)
mount.mountpoint = mountpoint
- mount.publisher = self.publish
+ mount.publisher = self.broadcast
mount.set_mounted(True)
def __delitem__(self, mountpoint):
@@ -116,8 +114,8 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands,
mount.set_mounted(True)
return mount.mounted.is_set()
- @db.volume_command(method='POST', cmd='publish')
- def publish(self, event, request=None):
+ @db.volume_command(method='POST', cmd='broadcast')
+ def broadcast(self, event, request=None):
if request is not None:
event = request.content
@@ -137,7 +135,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands,
for event in injector.make(mountpoint, guid):
event['event'] = 'make'
- self.publish(event)
+ self.broadcast(event)
@db.document_command(method='GET', cmd='launch',
arguments={'args': db.to_list})
@@ -150,7 +148,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands,
activity_id=activity_id, object_id=object_id, uri=uri,
color=color):
event['event'] = 'launch'
- self.publish(event)
+ self.broadcast(event)
if no_spawn:
do_launch()
@@ -262,7 +260,6 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands,
self.opened.set()
def close(self):
- self.break_sync()
self._servers.kill()
self._jobs.kill()
for mountpoint in self.keys():
@@ -336,7 +333,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands,
if value:
if force or not journal.exists(uid):
self.journal_update(uid, **get_props())
- self.publish({'event': 'show_journal', 'uid': uid})
+ self.broadcast({'event': 'show_journal', 'uid': uid})
else:
if journal.exists(uid):
self.journal_delete(uid)
@@ -392,7 +389,7 @@ class Mountset(dict, db.CommandsProcessor, Commands, journal.Commands,
for event in pipe:
event['event'] = 'clone'
- self.publish(event)
+ self.broadcast(event)
for __ in clones.walk(guid):
break
diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py
index 6a5ece5..eb564dd 100644
--- a/sugar_network/node/__init__.py
+++ b/sugar_network/node/__init__.py
@@ -47,17 +47,6 @@ find_limit = Option(
'limit the resulting list for search requests',
default=32, type_cast=int)
-sync_dirs = Option(
- 'colon separated list of paths to directories to synchronize with '
- 'master server',
- type_cast=Option.paths_cast, type_repr=Option.paths_repr,
- name='sync-dirs')
-
-pull_timeout = Option(
- 'delay in seconds to return to sync-pull requester to wait until '
- 'pull request will be ready',
- default=30, type_cast=int)
-
static_url = Option(
'url prefix to use for static files that should be served via API '
'server; if omited, HTTP_HOST request value will be used')
diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py
index ee97a97..9c6ea5c 100644
--- a/sugar_network/node/commands.py
+++ b/sugar_network/node/commands.py
@@ -24,7 +24,6 @@ from sugar_network.resources.volume import Commands, VolumeCommands
from sugar_network.toolkit import router, util, exception, enforce
-_DEFAULT_MASTER_GUID = 'api-testing.network.sugarlabs.org'
_MAX_STATS_LENGTH = 100
_logger = logging.getLogger('node.commands')
@@ -32,30 +31,16 @@ _logger = logging.getLogger('node.commands')
class NodeCommands(VolumeCommands, Commands):
- def __init__(self, volume, stats=None):
+ def __init__(self, is_master, guid, volume, stats=None):
VolumeCommands.__init__(self, volume)
Commands.__init__(self)
- self._is_master = False
+ self._is_master = is_master
+ self._guid = guid
self._stats = stats
- node_path = join(volume.root, 'node')
- master_path = join(volume.root, 'master')
-
- if exists(node_path):
- with file(node_path) as f:
- self._guid = f.read().strip()
- elif exists(master_path):
- with file(master_path) as f:
- self._guid = f.read().strip()
- self._is_master = True
- else:
- self._guid = db.uuid()
- with file(node_path, 'w') as f:
- f.write(self._guid)
-
- if not self._is_master and not exists(master_path):
- with file(master_path, 'w') as f:
- f.write(_DEFAULT_MASTER_GUID)
+ @property
+ def guid(self):
+ return self._guid
@property
def is_master(self):
@@ -141,7 +126,7 @@ class NodeCommands(VolumeCommands, Commands):
permissions=db.ACCESS_AUTH | db.ACCESS_AUTHOR)
def delete(self, document, guid):
# Servers data should not be deleted immediately
- # to let master-node synchronization possible
+ # to let master-slave synchronization possible
directory = self.volume[document]
directory.update(guid, {'layer': ['deleted']})
@@ -165,13 +150,6 @@ class NodeCommands(VolumeCommands, Commands):
layer = list(set(doc['layer']) - set(request.content))
directory.update(guid, {'layer': layer})
- @db.document_command(method='PUT', cmd='merge',
- permissions=db.ACCESS_AUTH)
- def merge(self, document, guid, request):
- auth.validate(request, 'root')
- directory = self.volume[document]
- directory.merge(guid, request.content)
-
@db.volume_command(method='GET', cmd='whoami',
mime_type='application/json')
def whoami(self, request):
@@ -194,6 +172,10 @@ class NodeCommands(VolumeCommands, Commands):
guid=impls[0]['guid'], prop='data')
return self.call(request, db.Response())
+ def broadcast(self, event):
+ # TODO Node level broadcast events?
+ _logger.info('Publish event: %r', event)
+
def call(self, request, response=None):
try:
result = VolumeCommands.call(self, request, response)
diff --git a/sugar_network/node/files_sync.py b/sugar_network/node/files_sync.py
index 022a2f7..5ce9258 100644
--- a/sugar_network/node/files_sync.py
+++ b/sugar_network/node/files_sync.py
@@ -19,7 +19,7 @@ import logging
from bisect import bisect_left
from os.path import join, exists, relpath, lexists, basename, dirname
-from sugar_network.toolkit.sneakernet import DiskFull
+from sugar_network.node.sneakernet import DiskFull
from sugar_network.toolkit import BUFFER_SIZE, util, coroutine
diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py
new file mode 100644
index 0000000..8f4d884
--- /dev/null
+++ b/sugar_network/node/master.py
@@ -0,0 +1,42 @@
+# Copyright (C) 2013 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+from urlparse import urlsplit
+
+from sugar_network import db, client
+from sugar_network.node import sync
+from sugar_network.node.commands import NodeCommands
+from sugar_network.toolkit import util
+
+
+_logger = logging.getLogger('node.master')
+
+
+class MasterCommands(NodeCommands):
+
+ def __init__(self, volume):
+ guid = urlsplit(client.api_url.value).netloc
+ NodeCommands.__init__(self, True, guid, volume)
+
+ @db.volume_command(method='POST', cmd='sync',
+ permissions=db.ACCESS_AUTH)
+ def sync(self, request):
+ content = sync.decode(request.content_stream)
+ pull_seq = util.Sequence(next(content)['pull'])
+ push_seq, merged_seq = sync.merge(self.volume, content)
+ return sync.encode(
+ [{'ack': merged_seq, 'sequence': push_seq}],
+ sync.diff(self.volume, pull_seq))
diff --git a/sugar_network/node/obs.py b/sugar_network/node/obs.py
index de36daa..49a03aa 100644
--- a/sugar_network/node/obs.py
+++ b/sugar_network/node/obs.py
@@ -95,7 +95,6 @@ def _request(*args, **kwargs):
response = _client.request(*args, allowed=(400, 404), **kwargs)
enforce(response.headers.get('Content-Type') == 'text/xml',
'Irregular OBS response')
- # pylint: disable-msg=E1103
reply = ElementTree.parse(response.raw).getroot()
if response.status_code != 200:
diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py
new file mode 100644
index 0000000..3a9386a
--- /dev/null
+++ b/sugar_network/node/slave.py
@@ -0,0 +1,245 @@
+# Copyright (C) 2012-2013 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+from os.path import join
+
+from sugar_network import db
+from sugar_network.client import Client
+from sugar_network.node import sync
+from sugar_network.node.commands import NodeCommands
+from sugar_network.toolkit import Option, util
+
+
+_SYNC_DIRNAME = '.sugar-network-sync'
+
+_logger = logging.getLogger('node.slave')
+
+
+sync_dirs = Option(
+ 'colon separated list of paths to directories to synchronize with '
+ 'master server',
+ type_cast=Option.paths_cast, type_repr=Option.paths_repr,
+ name='sync_dirs')
+
+
+class SlaveCommands(NodeCommands):
+
+ def __init__(self, guid, volume, stats=None):
+ NodeCommands.__init__(self, False, guid, volume, stats)
+
+ self._push_seq = util.PersistentSequence(
+ join(volume.root, 'push'), [1, None])
+ self._pull_seq = util.PersistentSequence(
+ join(volume.root, 'pull'), [1, None])
+
+ @db.volume_command(method='POST', cmd='online_sync',
+ permissions=db.ACCESS_LOCAL)
+ def online_sync(self):
+ response = Client().request('POST',
+ data=sync.chunked_encode(
+ [{'pull': self._pull_seq}],
+ sync.diff(self.volume, self._push_seq)),
+ params={'cmd': 'sync'},
+ headers={'Transfer-Encoding': 'chunked'})
+ reply = sync.decode(response.raw)
+ ack = next(reply)
+ _logger.error('>>> %r', ack)
+ self._pull_seq.exclude(ack['ack'])
+ self._pull_seq.commit()
+ self._push_seq.exclude(ack['sequence'])
+ self._push_seq.commit()
+ sync.merge(self.volume, reply, increment_seqno=False)
+
+
+"""
+class SlaveCommands(NodeCommands):
+
+ def __init__(self, guid, volume, stats=None):
+ NodeCommands.__init__(self, False, guid, volume, stats)
+
+ self._jobs = coroutine.Pool()
+ self._mounts = util.MutableStack()
+ self._offline_script = join(dirname(sys.argv[0]), 'sugar-network-sync')
+ self._file_syncs = \
+ files_sync.Leechers(sync_dirs.value, volume.root)
+ self._offline_session = None
+
+ mountpoints.connect(_SYNC_DIRNAME,
+ self.__found_mountcb, self.__lost_mount_cb)
+
+ @db.volume_command(method='POST', cmd='start_offline_sync')
+ def start_offline_sync(self, rewind=False, path=None):
+ if self._jobs:
+ return
+ enforce(path or self._mounts, 'No mounts to synchronize with')
+ if rewind:
+ self._mounts.rewind()
+ self._jobs.spawn(self._offline_sync, path)
+
+ @db.volume_command(method='POST', cmd='break_offline_sync')
+ def break_offline_sync(self):
+ self._jobs.kill()
+
+ def _offline_sync(self, path=None):
+ _logger.debug('Start synchronization session with %r session '
+ 'for %r mounts', self._offline_session, self._mounts)
+
+ def sync(path):
+ self.broadcast({'event': 'sync_start', 'path': path})
+ self._offline_session = self._offline_sync_session(path,
+ **(self._offline_session or {}))
+ return self._offline_session is None
+
+ try:
+ while True:
+ if path and sync(path):
+ break
+ for mountpoint in self._mounts:
+ if sync(mountpoint):
+ break
+ break
+ except Exception, error:
+ util.exception(_logger, 'Failed to complete synchronization')
+ self.broadcast({'event': 'sync_error', 'error': str(error)})
+ self._offline_session = None
+
+ if self._offline_session is None:
+ _logger.debug('Synchronization completed')
+ self.broadcast({'event': 'sync_complete'})
+ else:
+ _logger.debug('Postpone synchronization with %r session',
+ self._offline_session)
+ self.broadcast({'event': 'sync_continue'})
+
+
+
+
+
+
+ def _offline_sync_session(self, path, accept_length=None,
+ diff_sequence=None, stats_sequence=None, session=None):
+ to_push_seq = util.Sequence(empty_value=[1, None])
+ if diff_sequence is None:
+ to_push_seq.include(self._push_seq)
+ else:
+ to_push_seq = util.Sequence(diff_sequence)
+
+ if stats_sequence is None:
+ stats_sequence = {}
+
+ if session is None:
+ session_is_new = True
+ session = util.uuid()
+ else:
+ session_is_new = False
+
+ while True:
+ for packet in sneakernet.walk(path):
+ if packet.header.get('src') == self.guid:
+ if packet.header.get('session') == session:
+ _logger.debug('Keep current session %r packet', packet)
+ else:
+ _logger.debug('Remove our previous %r packet', packet)
+ os.unlink(packet.path)
+ else:
+ self._import(packet, to_push_seq)
+ self._push_seq.commit()
+ self._pull_seq.commit()
+
+ if exists(self._offline_script):
+ shutil.copy(self._offline_script, path)
+
+ with OutFilePacket(path, limit=accept_length,
+ src=self.guid, dst=api_url.value,
+ session=session, seqno=self.volume.seqno.value,
+ api_url=client.api_url.value) as packet:
+
+
+
+
+
+
+
+ def _export(self, packet):
+ if session_is_new:
+ for directory, sync in self._file_syncs.items():
+ packet.push(cmd='files_pull', directory=directory,
+ sequence=sync.sequence)
+ packet.push(cmd='sn_pull', sequence=self._pull_seq)
+
+ _logger.debug('Generating %r PUSH packet to %r', packet, packet.path)
+ self.broadcast({
+ 'event': 'sync_progress',
+ 'progress': _('Generating %r packet') % packet.basename,
+ })
+
+ try:
+ self.volume.diff(to_push_seq, packet)
+ stats.pull(stats_sequence, packet)
+ except DiskFull:
+ return {'diff_sequence': to_push_seq,
+ 'stats_sequence': stats_sequence,
+ 'session': session,
+ }
+ else:
+ break
+
+
+
+
+ def _import(self, packet, to_push_seq):
+ self.broadcast({
+ 'event': 'sync_progress',
+ 'progress': _('Reading %r packet') % basename(packet.path),
+ })
+ _logger.debug('Processing %r PUSH packet from %r', packet, packet.path)
+
+ from_master = (packet.header.get('src') == self._master_guid)
+
+ for record in packet.records():
+ cmd = record.get('cmd')
+ if cmd == 'sn_push':
+ self.volume.merge(record, increment_seqno=False)
+ elif from_master:
+ if cmd == 'sn_commit':
+ _logger.debug('Processing %r COMMIT from %r',
+ record, packet)
+ self._pull_seq.exclude(record['sequence'])
+ elif cmd == 'sn_ack' and \
+ record['dst'] == self.guid:
+ _logger.debug('Processing %r ACK from %r', record, packet)
+ self._push_seq.exclude(record['sequence'])
+ self._pull_seq.exclude(record['merged'])
+ to_push_seq.exclude(record['sequence'])
+ self.volume.seqno.next()
+ self.volume.seqno.commit()
+ elif cmd == 'stats_ack' and record['dst'] == self.guid:
+ _logger.debug('Processing %r stats ACK from %r',
+ record, packet)
+ stats.commit(record['sequence'])
+ elif record.get('directory') in self._file_syncs:
+ self._file_syncs[record['directory']].push(record)
+
+ def __found_mountcb(self, path):
+ self._mounts.add(path)
+ _logger.debug('Found %r sync mount', path)
+ self.start_offline_sync()
+
+ def __lost_mount_cb(self, path):
+ self._mounts.remove(path)
+ if not self._mounts:
+ self.break_offline_sync()
+"""
diff --git a/sugar_network/toolkit/sneakernet.py b/sugar_network/node/sneakernet.py
index 3cd6601..3cd6601 100644
--- a/sugar_network/toolkit/sneakernet.py
+++ b/sugar_network/node/sneakernet.py
diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py
index 7be6730..277700f 100644
--- a/sugar_network/node/stats.py
+++ b/sugar_network/node/stats.py
@@ -17,9 +17,8 @@ import os
import logging
from os.path import join, exists, isdir
-from sugar_network import pylru
from sugar_network.toolkit.rrd import Rrd
-from sugar_network.toolkit import Option, util
+from sugar_network.toolkit import Option, util, pylru
stats_root = Option(
diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py
new file mode 100644
index 0000000..aa68f6b
--- /dev/null
+++ b/sugar_network/node/sync.py
@@ -0,0 +1,118 @@
+# Copyright (C) 2012-2013 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import logging
+import cPickle as pickle
+
+from sugar_network.toolkit import util, coroutine, enforce
+
+
+_EOF = object()
+
+_logger = logging.getLogger('node.sync')
+
+
+def decode(stream):
+ if not hasattr(stream, 'readline'):
+ stream.readline = lambda: util.readline(stream)
+ record = {}
+ while 'commit' not in record:
+ record = pickle.load(stream)
+ yield record
+
+
+def encode(*args):
+ for sequence in args:
+ for record in sequence:
+ yield pickle.dumps(record)
+
+
+def chunked_encode(*args):
+ return _ContentOutput(encode(*args))
+
+
+def diff(volume, in_seq):
+ out_seq = util.Sequence([])
+ try:
+ for document, directory in volume.items():
+ coroutine.dispatch()
+ directory.commit()
+ yield {'document': document}
+
+ for guid, patch in directory.diff(in_seq, out_seq):
+ coroutine.dispatch()
+ if (yield {'diff': patch, 'guid': guid}) is _EOF:
+ raise StopIteration()
+ if out_seq:
+ # We processed all documents till `out_seq.last`, thus,
+ # it is possible to collapse the sequence to avoid possible holes
+ out_seq = [[out_seq.first, out_seq.last]]
+ finally:
+ yield {'commit': out_seq}
+
+
+def merge(volume, records, increment_seqno=True):
+ directory = None
+ merged_seq = util.Sequence()
+
+ for record in records:
+ document = record.get('document')
+ if document is not None:
+ directory = volume[document]
+ continue
+
+ patch = record.get('diff')
+ if patch is not None:
+ enforce(directory is not None,
+ 'Invalid merge, no document')
+ seqno = directory.merge(record['guid'], patch, increment_seqno)
+ if seqno is not None:
+ merged_seq.include(seqno, seqno)
+ continue
+
+ commit = record.get('commit')
+ if commit is not None:
+ return commit, merged_seq
+
+
+class _ContentOutput(object):
+
+ def __init__(self, iterator):
+ self._iterator = iterator
+ self._buffer = ''
+ self._buffer_start = 0
+ self._buffer_end = 0
+
+ def read(self, size):
+ if self._iterator is None:
+ return ''
+
+ def buffer_read():
+ result = self._buffer[self._buffer_start:self._buffer_start + size]
+ self._buffer_start += size
+ return '%X\r\n%s\r\n' % (len(result), result)
+
+ if self._buffer_start < self._buffer_end:
+ return buffer_read()
+
+ try:
+ self._buffer = next(self._iterator)
+ except StopIteration:
+ self._iterator = None
+ return '0\r\n\r\n'
+
+ self._buffer_start = 0
+ self._buffer_end = len(self._buffer)
+ return buffer_read()
diff --git a/sugar_network/node/sync_master.py b/sugar_network/node/sync_master.py
deleted file mode 100644
index 7520d6d..0000000
--- a/sugar_network/node/sync_master.py
+++ /dev/null
@@ -1,285 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import json
-import base64
-import hashlib
-import logging
-from Cookie import SimpleCookie
-from os.path import exists, join
-
-from sugar_network import db, node, pylru
-from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, \
- OutPacket, DiskFull
-from sugar_network.toolkit.files_sync import Seeders
-from sugar_network.toolkit.util import Sequence
-from sugar_network.node import stats
-from sugar_network.toolkit import tmpdir, coroutine, exception, enforce
-
-
-_PULL_QUEUE_SIZE = 256
-
-_logger = logging.getLogger('node.sync_master')
-
-
-class SyncCommands(object):
-
- _guid = None
- volume = None
-
- def __init__(self):
- self._file_syncs = Seeders(node.sync_dirs.value,
- join(node.data_root.value, 'sync'), self.volume.seqno)
- self._pull_queue = pylru.lrucache(_PULL_QUEUE_SIZE,
- lambda key, pull: pull.unlink())
-
- @db.volume_command(method='POST', cmd='push')
- def push(self, request, response):
- with InPacket(stream=request) as in_packet:
- enforce('src' in in_packet.header and
- in_packet.header['src'] != self._guid,
- 'Misaddressed packet')
- enforce('dst' in in_packet.header and
- in_packet.header['dst'] == self._guid,
- 'Misaddressed packet')
-
- out_packet = OutBufferPacket(src=self._guid,
- dst=in_packet.header['src'],
- filename='ack.' + in_packet.header.get('filename'))
- pushed = Sequence()
- merged = Sequence()
- cookie = _Cookie()
- stats_pushed = {}
-
- for record in in_packet.records(dst=self._guid):
- cmd = record.get('cmd')
- if cmd == 'sn_push':
- seqno = self.volume.merge(record)
- merged.include(seqno, seqno)
- elif cmd == 'sn_commit':
- _logger.debug('Merged %r commit', record)
- pushed.include(record['sequence'])
- elif cmd == 'sn_pull':
- cookie['sn_pull'].include(record['sequence'])
- elif cmd == 'files_pull':
- cookie[record['directory']].include(record['sequence'])
- elif cmd == 'stats_push':
- db_name = record['db']
- user = record['user']
-
- rrd = stats.get_rrd(user)
- rrd[db_name].put(record['values'], record['timestamp'])
-
- user_seq = stats_pushed.setdefault(user, {})
- db_seq = user_seq.setdefault(db_name, Sequence())
- db_seq.include(record['sequence'])
-
- enforce(not merged or pushed,
- '"sn_push" record without "sn_commit"')
- if pushed:
- out_packet.push(cmd='sn_ack', sequence=pushed, merged=merged)
- if stats_pushed:
- out_packet.push(cmd='stats_ack', sequence=stats_pushed)
-
- cookie['sn_pull'].exclude(merged)
- # Read passed cookie only after excluding `merged`.
- # If there is sn_pull out of currently pushed packet, excluding
- # `merged` should not affect it.
- cookie.include(_Cookie(request))
- cookie.store(response)
-
- response.content_type = out_packet.content_type
- if not out_packet.empty:
- return out_packet.pop()
-
- @db.volume_command(method='GET', cmd='pull',
- mime_type='application/octet-stream',
- arguments={'accept_length': db.to_int})
- def pull(self, request, response, accept_length=None, **pulls):
- cookie = _Cookie(request)
- for key, seq in pulls.items():
- cookie[key][:] = json.loads(seq)
- if not cookie:
- _logger.debug('Clone full dump')
- cookie['sn_pull'].include(1, None)
-
- pull_key = hashlib.sha1(json.dumps(cookie)).hexdigest()
- pull = None
- content = None
-
- if pull_key in self._pull_queue:
- pull = self._pull_queue[pull_key]
- if accept_length is not None and pull.length > accept_length:
- _logger.debug('Cached %r pull is bigger than requested '
- 'length, will recreate it', cookie)
- pull.unlink()
- del self._pull_queue[pull_key]
- pull = None
-
- if pull is None:
- pull = self._pull_queue[pull_key] = _Pull(pull_key, cookie,
- self._pull, src=self._guid, seqno=self.volume.seqno.value,
- limit=accept_length)
-
- if pull.exception is not None:
- del self._pull_queue[pull_key]
- raise pull.exception
-
- if pull.ready:
- _logger.debug('Response with ready %r pull', cookie)
- content = pull.content
- response.content_type = pull.content_type
- cookie = pull.cookie
- else:
- _logger.debug('Pull %r is not yet ready', cookie)
- cookie.delay = pull.seconds_remained
-
- cookie.store(response)
- return content
-
- def _pull(self, cookie, packet):
- sn_pull = cookie['sn_pull']
- if sn_pull:
- self.volume.diff(sn_pull, packet)
-
- for directory, seq in cookie.items():
- sync = self._file_syncs.get(directory)
- if sync is None or not sync.pending(seq):
- continue
- sync.pull(seq, packet)
-
-
-class _Pull(object):
-
- def __init__(self, pull_key, cookie, pull_cb, **packet_args):
- self.cookie = cookie
- self.exception = None
- self.seconds_remained = 0
- self.content_type = None
- self._path = join(tmpdir.value, pull_key + '.pull')
- self._job = None
-
- if exists(self._path):
- try:
- with InPacket(self._path) as packet:
- self.content_type = packet.content_type
- self.cookie = _Cookie()
- self.cookie.update(packet.header['cookie'])
- except Exception:
- exception('Cannot open cached packet for %r, recreate',
- self._path)
- os.unlink(self._path)
-
- if not exists(self._path):
- packet = OutPacket(stream=file(self._path, 'wb+'), **packet_args)
- self.content_type = packet.content_type
- # TODO Might be useful to set meaningful value here
- self.seconds_remained = node.pull_timeout.value
- self._job = coroutine.spawn(self._pull, packet, pull_cb)
-
- @property
- def ready(self):
- # pylint: disable-msg=E1101
- return self._job is None or self._job.dead
-
- @property
- def content(self):
- if exists(self._path):
- return file(self._path, 'rb')
-
- @property
- def length(self):
- if exists(self._path):
- return os.stat(self._path).st_size
-
- def unlink(self):
- if self._job is not None:
- self._job.kill()
- if exists(self._path):
- _logger.debug('Eject %r pull from queue', self._path)
- os.unlink(self._path)
-
- def _pull(self, packet, cb):
- try:
- cb(self.cookie, packet)
- except DiskFull:
- pass
- except Exception, error:
- exception('Error while making %r pull', self.cookie)
- self.exception = error
- self.unlink()
- else:
- self.cookie.clear()
- packet.header['cookie'] = self.cookie
- packet.close()
-
-
-class _Cookie(dict):
-
- def __init__(self, request=None):
- dict.__init__(self)
-
- if request is not None:
- value = self._get_cookie(request, 'sugar_network_sync')
- for key, seq in (value or {}).items():
- self[key] = Sequence(seq)
-
- self.delay = 0
-
- def include(self, cookie):
- for key, seq in cookie.items():
- self[key].include(seq)
-
- def store(self, response):
- to_store = {}
- for key, value in self.items():
- if value:
- to_store[key] = value
-
- if to_store:
- _logger.debug('Postpone %r pull in cookie', to_store)
- to_store = base64.b64encode(json.dumps(to_store))
- self._set_cookie(response, 'sugar_network_sync', to_store)
- self._set_cookie(response, 'sugar_network_delay', self.delay)
- else:
- self._unset_cookie(response, 'sugar_network_sync')
- self._unset_cookie(response, 'sugar_network_delay')
-
- def __getitem__(self, key):
- seq = self.get(key)
- if seq is None:
- seq = self[key] = Sequence()
- return seq
-
- def _get_cookie(self, request, name):
- cookie_str = request.environ.get('HTTP_COOKIE')
- if not cookie_str:
- return
- cookie = SimpleCookie()
- cookie.load(cookie_str)
- if name not in cookie:
- return
- value = cookie.get(name).value
- if value != 'unset_%s' % name:
- return json.loads(base64.b64decode(value))
-
- def _set_cookie(self, response, name, value, age=3600):
- response.setdefault('Set-Cookie', [])
- cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, value, age)
- response['Set-Cookie'].append(cookie)
-
- def _unset_cookie(self, response, name):
- self._set_cookie(response, name, 'unset_%s' % name, 0)
diff --git a/sugar_network/node/sync_node.py b/sugar_network/node/sync_node.py
deleted file mode 100644
index 3dcb102..0000000
--- a/sugar_network/node/sync_node.py
+++ /dev/null
@@ -1,223 +0,0 @@
-# Copyright (C) 2012 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import sys
-import shutil
-import logging
-from os.path import join, dirname, exists, basename
-from gettext import gettext as _
-
-from sugar_network import db, node, client
-from sugar_network.toolkit import mountpoints, sneakernet, files_sync
-from sugar_network.toolkit import coroutine, util, exception, enforce
-from sugar_network.toolkit.sneakernet import OutFilePacket, DiskFull
-from sugar_network.node import stats
-
-
-_SYNC_DIRNAME = '.sugar-network-sync'
-
-_logger = logging.getLogger('node.sync_node')
-
-
-class SyncCommands(object):
-
- def __init__(self, sequences_path):
- self._sync = coroutine.Pool()
- self._sync_mounts = util.MutableStack()
- self._file_syncs = \
- files_sync.Leechers(node.sync_dirs.value, sequences_path)
- self._sync_session = None
- self._push_seq = util.PersistentSequence(
- join(sequences_path, 'push'), [1, None])
- self._pull_seq = util.PersistentSequence(
- join(sequences_path, 'pull'), [1, None])
- self._sync_script = join(dirname(sys.argv[0]), 'sugar-network-sync')
- self._mount = None
-
- mountpoints.connect(_SYNC_DIRNAME,
- self.__found_mountcb, self.__lost_mount_cb)
-
- @property
- def node_mount(self):
- return self._mount
-
- @node_mount.setter
- def node_mount(self, value):
- if self._mount is value:
- return
- self._mount = value
- if self._sync_mounts:
- self.start_sync()
-
- @db.volume_command(method='POST', cmd='start_sync')
- def start_sync(self, rewind=False, path=None):
- enforce(self._mount is not None, 'No server to sync')
-
- if self._sync:
- return
-
- enforce(self._mount is not None, 'No server to synchronize')
- enforce(path or self._sync_mounts, 'No mounts to synchronize with')
-
- if rewind:
- self._sync_mounts.rewind()
- self._sync.spawn(self.sync_session, path)
-
- @db.volume_command(method='POST', cmd='break_sync')
- def break_sync(self):
- self._sync.kill()
-
- def sync(self, path, accept_length=None, diff_sequence=None,
- stats_sequence=None, session=None):
- enforce(self._mount is not None, 'No server to sync')
-
- to_push_seq = util.Sequence(empty_value=[1, None])
- if diff_sequence is None:
- to_push_seq.include(self._push_seq)
- else:
- to_push_seq = util.Sequence(diff_sequence)
-
- if stats_sequence is None:
- stats_sequence = {}
-
- if session is None:
- session_is_new = True
- session = db.uuid()
- else:
- session_is_new = False
-
- while True:
- for packet in sneakernet.walk(path):
- if packet.header.get('src') == self._mount.node_guid:
- if packet.header.get('session') == session:
- _logger.debug('Keep current session %r packet', packet)
- else:
- _logger.debug('Remove our previous %r packet', packet)
- os.unlink(packet.path)
- else:
- self._import(packet, to_push_seq)
- self._push_seq.commit()
- self._pull_seq.commit()
-
- if exists(self._sync_script):
- shutil.copy(self._sync_script, path)
-
- with OutFilePacket(path, limit=accept_length,
- src=self._mount.node_guid, dst=self._mount.master_guid,
- session=session, seqno=self._mount.volume.seqno.value,
- api_url=client.api_url.value) as packet:
- if session_is_new:
- for directory, sync in self._file_syncs.items():
- packet.push(cmd='files_pull', directory=directory,
- sequence=sync.sequence)
- packet.push(cmd='sn_pull', sequence=self._pull_seq)
-
- _logger.debug('Generating %r PUSH packet to %r',
- packet, packet.path)
- self._mount.publish({
- 'event': 'sync_progress',
- 'progress': _('Generating %r packet') % packet.basename,
- })
-
- try:
- self._mount.volume.diff(to_push_seq, packet)
- stats.pull(stats_sequence, packet)
- except DiskFull:
- return {'diff_sequence': to_push_seq,
- 'stats_sequence': stats_sequence,
- 'session': session,
- }
- else:
- break
-
- def sync_session(self, path=None):
- enforce(self._mount is not None, 'No server to sync')
-
- _logger.debug('Start synchronization session with %r session '
- 'for %r mounts', self._sync_session, self._sync_mounts)
-
- def sync(path):
- self._mount.publish({'event': 'sync_start', 'path': path})
- self._sync_session = self.sync(path, **(self._sync_session or {}))
- return self._sync_session is None
-
- try:
- while True:
- if path and sync(path):
- break
- for mountpoint in self._sync_mounts:
- if sync(mountpoint):
- break
- break
- except Exception, error:
- exception(_logger, 'Failed to complete synchronization')
- self._mount.publish({'event': 'sync_error', 'error': str(error)})
- self._sync_session = None
-
- if self._sync_session is None:
- _logger.debug('Synchronization completed')
- self._mount.publish({'event': 'sync_complete'})
- else:
- _logger.debug('Postpone synchronization with %r session',
- self._sync_session)
- self._mount.publish({'event': 'sync_continue'})
-
- def _import(self, packet, to_push_seq):
- self._mount.publish({
- 'event': 'sync_progress',
- 'progress': _('Reading %r packet') % basename(packet.path),
- })
- _logger.debug('Processing %r PUSH packet from %r', packet, packet.path)
-
- from_master = (packet.header.get('src') == self._mount.master_guid)
-
- for record in packet.records():
- cmd = record.get('cmd')
- if cmd == 'sn_push':
- self._mount.volume.merge(record, increment_seqno=False)
- elif from_master:
- if cmd == 'sn_commit':
- _logger.debug('Processing %r COMMIT from %r',
- record, packet)
- self._pull_seq.exclude(record['sequence'])
- elif cmd == 'sn_ack' and \
- record['dst'] == self._mount.node_guid:
- _logger.debug('Processing %r ACK from %r', record, packet)
- self._push_seq.exclude(record['sequence'])
- self._pull_seq.exclude(record['merged'])
- to_push_seq.exclude(record['sequence'])
- self._mount.volume.seqno.next()
- self._mount.volume.seqno.commit()
- elif cmd == 'stats_ack' and \
- record['dst'] == self._mount.node_guid:
- _logger.debug('Processing %r stats ACK from %r',
- record, packet)
- stats.commit(record['sequence'])
- elif record.get('directory') in self._file_syncs:
- self._file_syncs[record['directory']].push(record)
-
- def __found_mountcb(self, path):
- self._sync_mounts.add(path)
- if self._mount is not None:
- _logger.debug('Found %r sync mount', path)
- self.start_sync()
- else:
- _logger.debug('Found %r sync mount but no servers', path)
-
- def __lost_mount_cb(self, path):
- self._sync_mounts.remove(path)
- if not self._sync_mounts:
- self.break_sync()
diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py
index 6d7b89b..e2da9ad 100644
--- a/sugar_network/resources/volume.py
+++ b/sugar_network/resources/volume.py
@@ -154,45 +154,6 @@ class Volume(db.Volume):
db.Volume.notify(self, event)
- def diff(self, in_seq, packet):
- out_seq = util.Sequence()
- try:
- for document, directory in self.items():
- coroutine.dispatch()
- directory.commit()
- packet.push(document=document)
- try:
- for guid, diff in directory.diff(in_seq, out_seq):
- coroutine.dispatch()
- if not packet.push(diff=diff, guid=guid):
- raise StopIteration()
- finally:
- in_seq.exclude(out_seq)
- if out_seq:
- out_seq = [[out_seq.first, out_seq.last]]
- in_seq.exclude(out_seq)
- except StopIteration:
- pass
- finally:
- packet.push(commit=out_seq)
-
- def merge(self, packet, increment_seqno=True):
- directory = None
- for record in packet:
- document = record.get('document')
- if document is not None:
- directory = self[document]
- continue
- diff = record.get('diff')
- if diff is not None:
- enforce(directory is not None,
- 'Invalid merge packet, no document')
- directory.merge(record['guid'], diff, increment_seqno)
- continue
- commit = record.get('commit')
- if commit is not None:
- return commit
-
def _open(self, name, document):
directory = db.Volume._open(self, name, document)
self._populators.spawn(self._populate, directory)
@@ -215,7 +176,6 @@ class Volume(db.Volume):
ostream = util.NamedTemporaryFile()
try:
chunk_size = min(content_length, BUFFER_SIZE)
- # pylint: disable-msg=E1103
for chunk in response.iter_content(chunk_size=chunk_size):
ostream.write(chunk)
except Exception:
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index 1ca2002..ee6ba7f 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -13,18 +13,19 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-# pylint: disable-msg=E1103
-
+import sys
import json
import logging
import hashlib
-from os.path import exists
+from os.path import join, dirname, exists
+
+sys.path.insert(0, join(dirname(__file__), '..', 'lib', 'requests'))
import requests
from requests.sessions import Session
from M2Crypto import DSA
-from sugar_network.toolkit import sugar, coroutine, exception, enforce
+from sugar_network.toolkit import sugar, coroutine, util, exception, enforce
from sugar_network.toolkit.router import Redirect
from sugar_network import db, client
@@ -232,8 +233,8 @@ class _Subscription(object):
for a_try in (1, 0):
stream = self._handshake()
try:
- line = _readline(stream)
- enforce(line is not None, 'Subscription aborted')
+ line = util.readline(stream)
+ enforce(line, 'Subscription aborted')
break
except Exception:
if a_try == 0:
@@ -274,18 +275,3 @@ class _Subscription(object):
def _sign(privkey_path, data):
key = DSA.load_key(privkey_path)
return key.sign_asn1(hashlib.sha1(data).digest()).encode('hex')
-
-
-def _readline(stream):
- line = None
- while True:
- char = stream.read(1)
- if not char:
- break
- if line is None:
- line = char
- else:
- line += char
- if char == '\n':
- break
- return line
diff --git a/sugar_network/toolkit/pylru.py b/sugar_network/toolkit/pylru.py
index 7fde624..a40ad63 120000
--- a/sugar_network/toolkit/pylru.py
+++ b/sugar_network/toolkit/pylru.py
@@ -1 +1 @@
-lib/pylru/pylru.py \ No newline at end of file
+../lib/pylru/pylru.py \ No newline at end of file
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index 1c67a4f..d0002db 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -357,17 +357,13 @@ class _Request(Request):
self.content_type, __ = \
cgi.parse_header(environ.get('CONTENT_TYPE', ''))
if self.content_type.lower() == 'application/json':
- content = self.read()
- if content:
- self.content = json.loads(content)
+ self.content = json.load(self.content_stream)
elif self.content_type.lower() == 'multipart/form-data':
files = cgi.FieldStorage(fp=environ['wsgi.input'],
environ=environ)
enforce(len(files.list) == 1,
'Multipart request should contain only one file')
self.content_stream = files.list[0].file
- else:
- self.content_stream = environ.get('wsgi.input')
if_modified_since = environ.get('HTTP_IF_MODIFIED_SINCE')
if if_modified_since:
diff --git a/sugar_network/toolkit/util.py b/sugar_network/toolkit/util.py
index 115cf02..9d5e999 100644
--- a/sugar_network/toolkit/util.py
+++ b/sugar_network/toolkit/util.py
@@ -58,6 +58,18 @@ def init_logging(debug_level):
self._log(8, message, args, **kwargs)
+def readline(stream, limit=None):
+ line = bytearray()
+ while limit is None or len(line) < limit:
+ char = stream.read(1)
+ if not char:
+ break
+ line.append(char)
+ if char == '\n':
+ break
+ return bytes(line)
+
+
def res_init():
"""Reset resolving cache.
diff --git a/sugar_network/zerosugar/cache.py b/sugar_network/zerosugar/cache.py
index 9a03b8e..30038bf 100644
--- a/sugar_network/zerosugar/cache.py
+++ b/sugar_network/zerosugar/cache.py
@@ -41,7 +41,6 @@ def get(guid):
with util.NamedTemporaryFile() as tmp_file:
chunk_size = min(content_length, BUFFER_SIZE)
- # pylint: disable-msg=E1103
for chunk in response.iter_content(chunk_size=chunk_size):
tmp_file.write(chunk)
tmp_file.flush()
diff --git a/sugar_network/zerosugar/injector.py b/sugar_network/zerosugar/injector.py
index 8d2e48c..93024c0 100644
--- a/sugar_network/zerosugar/injector.py
+++ b/sugar_network/zerosugar/injector.py
@@ -178,13 +178,13 @@ def _solve(mountpoint, context):
pipe.trace('Reuse cached solution')
return solution
- from sugar_network import zeroinstall
+ from sugar_network.zerosugar import solver
if mountpoint != '/':
_logger.info('Disable dependencies for local mountpoint')
- zeroinstall.nodeps = True
+ solver.nodeps = True
- solution = zeroinstall.solve(mountpoint, context)
+ solution = solver.solve(mountpoint, context)
_set_cached_solution(cached_path, solution)
return solution
diff --git a/sugar_network/zerosugar/packagekit.py b/sugar_network/zerosugar/packagekit.py
index 7308277..f17ba57 100644
--- a/sugar_network/zerosugar/packagekit.py
+++ b/sugar_network/zerosugar/packagekit.py
@@ -96,7 +96,6 @@ class _Transaction(object):
self.packages = []
self._object = dbus.SystemBus().get_object(
- # pylint: disable-msg=E1103
'org.freedesktop.PackageKit', _get_pk().GetTid(), False)
self._proxy = dbus.Interface(self._object,
'org.freedesktop.PackageKit.Transaction')
@@ -165,10 +164,10 @@ class _Transaction(object):
self.error_details = details
def __package_cb(self, status, pk_id, summary):
- from sugar_network import zeroinstall
+ from sugar_network.zerosugar import solver
package_name, version, arch, __ = pk_id.split(';')
- clean_version = zeroinstall.try_cleanup_distro_version(version)
+ clean_version = solver.try_cleanup_distro_version(version)
if not clean_version:
_logger.warn('Cannot parse distribution version "%s" '
'for package "%s"', version, package_name)
@@ -176,7 +175,7 @@ class _Transaction(object):
'pk_id': str(pk_id),
'version': clean_version,
'name': package_name,
- 'arch': zeroinstall.canonical_machine(arch),
+ 'arch': solver.canonical_machine(arch),
'installed': (status == 'installed'),
}
_logger.debug('Resolved PackageKit name: %r', package)
diff --git a/sugar_network/zerosugar/solver.py b/sugar_network/zerosugar/solver.py
index 1ceb6ba..7972e30 100644
--- a/sugar_network/zerosugar/solver.py
+++ b/sugar_network/zerosugar/solver.py
@@ -15,15 +15,14 @@
import sys
import logging
-from os.path import isabs, join, abspath, dirname
+from os.path import isabs, join, dirname
from sugar_network.client import IPCClient
from sugar_network.toolkit import lsb_release, pipe, exception
from sugar_network.zerosugar import packagekit
from sugar_network.zerosugar.spec import parse_version
-sys.path.insert(0,
- join(abspath(dirname(__file__)), '..', 'lib', 'zeroinstall-injector'))
+sys.path.insert(0, join(dirname(__file__), '..', 'lib', 'zeroinstall'))
from zeroinstall.injector import reader, model, distro
from zeroinstall.injector.config import Config
@@ -31,12 +30,12 @@ from zeroinstall.injector.driver import Driver
from zeroinstall.injector.requirements import Requirements
-def Interface_init(self, url):
+def _interface_init(self, url):
self.uri = url
self.reset()
-model.Interface.__init__ = Interface_init
+model.Interface.__init__ = _interface_init
reader.load_feed_from_cache = lambda url, * args, ** kwargs: _load_feed(url)
reader.check_readable = lambda * args, ** kwargs: True
diff --git a/sweets.recipe b/sweets.recipe
index ab54802..39d1175 100644
--- a/sweets.recipe
+++ b/sweets.recipe
@@ -8,11 +8,12 @@ summary = Sugar Network
license = GPLv3+
homepage = http://wiki.sugarlabs.org/go/Platform_Team/Sugar_Network
-version = 0.8
+version = 0.9
stability = developer
-requires = m2crypto; requests; rrdtool-python; openssh-client
- gevent >= 1; sugar-network-webui; sugar-network-hub
+requires = xapian-bindings-python; m2crypto; requests; rrdtool-python
+ gevent >= 1; dbus-python; openssh-client
+ sugar-network-webui; sugar-network-hub
replaces = sugar-network-server; sweets-recipe; active-document
[Build]
diff --git a/tests/__init__.py b/tests/__init__.py
index 7a326b6..cc40825 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -12,23 +12,22 @@ import tempfile
import subprocess
from os.path import dirname, join, exists, abspath, isfile
-import requests
from M2Crypto import DSA
from gevent import monkey
-from sugar_network.toolkit import coroutine, sugar, http, sneakernet, mountpoints, util
+from sugar_network.toolkit import coroutine, sugar, http, mountpoints, util
from sugar_network.toolkit.router import Router, IPCRouter
from sugar_network.client import journal
from sugar_network.client.mounts import HomeMount, RemoteMount
from sugar_network.client.mountset import Mountset
-from sugar_network import db, client, node, toolkit, zeroinstall
+from sugar_network import db, client, node, toolkit
from sugar_network.db import env
-from sugar_network.zerosugar import injector
+from sugar_network.zerosugar import injector, solver
from sugar_network.resources.user import User
from sugar_network.resources.context import Context
from sugar_network.resources.implementation import Implementation
from sugar_network.node.commands import NodeCommands
-from sugar_network.node import stats, obs, auth
+from sugar_network.node import stats, obs, auth, sneakernet, slave
from sugar_network.resources.volume import Volume
@@ -82,8 +81,8 @@ class Test(unittest.TestCase):
db.index_flush_threshold.value = 1
node.find_limit.value = 1024
node.data_root.value = tmpdir
- node.sync_dirs.value = []
node.static_url.value = None
+ slave.sync_dirs.value = []
db.index_write_queue.value = 10
client.local_root.value = tmpdir
client.activity_dirs.value = [tmpdir + '/Activities']
@@ -107,7 +106,7 @@ class Test(unittest.TestCase):
injector.invalidate_solutions(None)
injector._pms_path = None
journal._ds_root = tmpdir + '/datastore'
- zeroinstall.nodeps = False
+ solver.nodeps = False
Volume.RESOURCES = [
'sugar_network.resources.user',
@@ -287,7 +286,7 @@ class Test(unittest.TestCase):
db.index_write_queue.value = 10
volume = Volume('remote', classes or [User, Context, Implementation])
- cp = NodeCommands(volume)
+ cp = NodeCommands(True, 'guid', volume)
httpd = coroutine.WSGIServer(('localhost', 8888), Router(cp))
try:
coroutine.joinall([
@@ -302,7 +301,7 @@ class Test(unittest.TestCase):
classes = [User, Context, Implementation]
self.touch('master/master')
self.volume = Volume('master', classes)
- cp = NodeCommands(self.volume)
+ cp = NodeCommands(True, 'guid', self.volume)
self.server = coroutine.WSGIServer(('localhost', 8888), Router(cp))
coroutine.spawn(self.server.serve_forever)
coroutine.dispatch()
diff --git a/tests/integration/cli.py b/tests/integration/cli.py
index 91a811f..46a6f13 100755
--- a/tests/integration/cli.py
+++ b/tests/integration/cli.py
@@ -9,8 +9,6 @@ import zipfile
import cPickle as pickle
from os.path import exists
-import requests
-
from __init__ import tests, src_root
from sugar_network.client import IPCClient
diff --git a/tests/units/client/mountset.py b/tests/units/client/mountset.py
index 1ac1ae8..0fe34f4 100755
--- a/tests/units/client/mountset.py
+++ b/tests/units/client/mountset.py
@@ -8,8 +8,6 @@ import shutil
import zipfile
from os.path import exists
-import requests
-
from __init__ import tests
from sugar_network.client.mountset import Mountset
@@ -25,6 +23,8 @@ from sugar_network.toolkit.router import IPCRouter
from sugar_network.zerosugar import injector, clones
from sugar_network.client import IPCClient, Client, journal
+import requests
+
class MountsetTest(tests.Test):
@@ -158,7 +158,7 @@ class MountsetTest(tests.Test):
]),
sorted(client.get(cmd='mounts')))
- def test_MountNode(self):
+ def __test_MountNode(self):
local.server_mode.value = True
mounts = self.mountset()
diff --git a/tests/units/client/remote_mount.py b/tests/units/client/remote_mount.py
index 1e99940..bd754e1 100755
--- a/tests/units/client/remote_mount.py
+++ b/tests/units/client/remote_mount.py
@@ -9,8 +9,6 @@ import urllib2
from cStringIO import StringIO
from os.path import exists, abspath
-import requests
-
from __init__ import tests
from sugar_network import db, client as local
@@ -26,6 +24,8 @@ from sugar_network.resources.volume import Volume, Resource
from sugar_network.zerosugar import injector
from sugar_network.client import IPCClient
+import requests
+
class RemoteMountTest(tests.Test):
diff --git a/tests/units/node/__main__.py b/tests/units/node/__main__.py
index cb7e793..ceced37 100644
--- a/tests/units/node/__main__.py
+++ b/tests/units/node/__main__.py
@@ -3,11 +3,13 @@
from __init__ import tests
from auth import *
+#from files_sync import *
from node import *
from obs import *
+from sneakernet import *
from stats import *
-#from sync_master import *
-#from sync_node import *
+from sync import *
+from sync_online import *
if __name__ == '__main__':
tests.main()
diff --git a/tests/units/node/node.py b/tests/units/node/node.py
index 61bcac7..0e3b9bd 100755
--- a/tests/units/node/node.py
+++ b/tests/units/node/node.py
@@ -35,7 +35,7 @@ class NodeTest(tests.Test):
def test_UserStats(self):
volume = Volume('db')
- cp = NodeCommands(volume)
+ cp = NodeCommands(False, 'guid', volume)
call(cp, method='POST', document='user', principal=tests.UID, content={
'name': 'user',
@@ -108,7 +108,7 @@ class NodeTest(tests.Test):
volume = Volume('db', [User, Context, Review, Feedback, Solution, Artifact])
stats = NodeStats(volume)
- cp = NodeCommands(volume, stats)
+ cp = NodeCommands(False, 'guid', volume, stats)
self.assertEqual({
'user': [
@@ -132,7 +132,7 @@ class NodeTest(tests.Test):
def test_HandleDeletes(self):
volume = Volume('db')
- cp = NodeCommands(volume)
+ cp = NodeCommands(False, 'guid', volume)
guid = call(cp, method='POST', document='context', principal='principal', content={
'type': 'activity',
@@ -158,7 +158,7 @@ class NodeTest(tests.Test):
self.assertEqual(['deleted'], volume['context'].get(guid)['layer'])
def test_RegisterUser(self):
- cp = NodeCommands(Volume('db', [User]))
+ cp = NodeCommands(False, 'guid', Volume('db', [User]))
guid = call(cp, method='POST', document='user', principal='fake', content={
'name': 'user',
@@ -183,7 +183,7 @@ class NodeTest(tests.Test):
def probe2(self, directory):
pass
- cp = NodeCommands(Volume('db', [User, Document]))
+ cp = NodeCommands(False, 'guid', Volume('db', [User, Document]))
guid = call(cp, method='POST', document='document', principal='user', content={})
self.assertRaises(Unauthorized, call, cp, method='GET', cmd='probe1', document='document', guid=guid)
call(cp, method='GET', cmd='probe1', document='document', guid=guid, principal='user')
@@ -205,7 +205,7 @@ class NodeTest(tests.Test):
class User(db.Document):
pass
- cp = NodeCommands(Volume('db', [User, Document]))
+ cp = NodeCommands(False, 'guid', Volume('db', [User, Document]))
guid = call(cp, method='POST', document='document', principal='principal', content={})
self.assertRaises(db.Forbidden, call, cp, method='GET', cmd='probe1', document='document', guid=guid)
@@ -214,7 +214,7 @@ class NodeTest(tests.Test):
call(cp, method='GET', cmd='probe2', document='document', guid=guid)
def test_ForbiddenCommandsForUserResource(self):
- cp = NodeCommands(Volume('db', [User]))
+ cp = NodeCommands(False, 'guid', Volume('db', [User]))
call(cp, method='POST', document='user', principal='fake', content={
'name': 'user1',
@@ -231,7 +231,7 @@ class NodeTest(tests.Test):
self.assertEqual('user2', call(cp, method='GET', document='user', guid=tests.UID, prop='name'))
def test_SetUser(self):
- cp = NodeCommands(Volume('db'))
+ cp = NodeCommands(False, 'guid', Volume('db'))
guid = call(cp, method='POST', document='context', principal='principal', content={
'type': 'activity',
@@ -244,7 +244,7 @@ class NodeTest(tests.Test):
call(cp, method='GET', document='context', guid=guid, prop='author'))
def test_find_MaxLimit(self):
- cp = NodeCommands(Volume('db'))
+ cp = NodeCommands(False, 'guid', Volume('db'))
call(cp, method='POST', document='context', principal='principal', content={
'type': 'activity',
@@ -274,7 +274,7 @@ class NodeTest(tests.Test):
def test_DeletedDocuments(self):
volume = Volume('db')
- cp = NodeCommands(volume)
+ cp = NodeCommands(False, 'guid', volume)
guid = call(cp, method='POST', document='context', principal='principal', content={
'type': 'activity',
@@ -293,7 +293,7 @@ class NodeTest(tests.Test):
def test_SetGuidOnMaster(self):
volume1 = Volume('db1')
- cp1 = NodeCommands(volume1)
+ cp1 = NodeCommands(False, 'guid', volume1)
call(cp1, method='POST', document='context', principal='principal', content={
'type': 'activity',
'title': 'title',
@@ -304,8 +304,7 @@ class NodeTest(tests.Test):
self.assertRaises(db.NotFound, call, cp1, method='GET', document='context', guid='foo')
volume2 = Volume('db2')
- self.touch('db2/master')
- cp2 = NodeCommands(volume2)
+ cp2 = NodeCommands(True, 'guid', volume2)
call(cp2, method='POST', document='context', principal='principal', content={
'type': 'activity',
'title': 'title',
diff --git a/tests/units/toolkit/sneakernet.py b/tests/units/node/sneakernet.py
index abab510..7712ec0 100755
--- a/tests/units/toolkit/sneakernet.py
+++ b/tests/units/node/sneakernet.py
@@ -9,8 +9,8 @@ from os.path import exists
from __init__ import tests
-from sugar_network.toolkit import sneakernet
-from sugar_network.toolkit.sneakernet import InPacket, OutPacket, DiskFull, OutBufferPacket, OutFilePacket
+from sugar_network.node import sneakernet
+from sugar_network.node.sneakernet import InPacket, OutPacket, DiskFull, OutBufferPacket, OutFilePacket
class SneakernetTest(tests.Test):
diff --git a/tests/units/node/sync.py b/tests/units/node/sync.py
new file mode 100755
index 0000000..abb5e18
--- /dev/null
+++ b/tests/units/node/sync.py
@@ -0,0 +1,379 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+import os
+from os.path import exists
+from cStringIO import StringIO
+import cPickle as pickle
+
+from __init__ import tests
+
+from sugar_network import db
+from sugar_network.node import sync
+from sugar_network.resources.volume import Volume
+from sugar_network.resources.user import User
+from sugar_network.toolkit.router import Request
+from sugar_network.toolkit import util
+
+
+class SyncTest(tests.Test):
+
+ def test_diff(self):
+
+ class Document(db.Document):
+
+ @db.indexed_property(slot=1)
+ def prop(self, value):
+ return value
+
+ volume = Volume('db', [Document])
+ volume['document'].create(guid='1', seqno=1, prop='a')
+ for i in os.listdir('db/document/1/1'):
+ os.utime('db/document/1/1/%s' % i, (1, 1))
+ volume['document'].create(guid='2', seqno=2, prop='b')
+ for i in os.listdir('db/document/2/2'):
+ os.utime('db/document/2/2/%s' % i, (2, 2))
+
+ in_seq = util.Sequence([[1, None]])
+ self.assertEqual([
+ {'document': 'document'},
+ {'guid': '1',
+ 'diff': {
+ 'guid': {'value': '1', 'mtime': 1},
+ 'mtime': {'value': 0, 'mtime': 1},
+ 'ctime': {'value': 0, 'mtime': 1},
+ 'prop': {'value': 'a', 'mtime': 1},
+ },
+ },
+ {'guid': '2',
+ 'diff': {
+ 'guid': {'value': '2', 'mtime': 2},
+ 'mtime': {'value': 0, 'mtime': 2},
+ 'ctime': {'value': 0, 'mtime': 2},
+ 'prop': {'value': 'b', 'mtime': 2},
+ },
+ },
+ {'commit': [[1, 2]]},
+ ],
+ [i for i in sync.diff(volume, in_seq)])
+ self.assertEqual([[1, None]], in_seq)
+
+ def test_diff_Partial(self):
+
+ class Document(db.Document):
+
+ @db.indexed_property(slot=1)
+ def prop(self, value):
+ return value
+
+ volume = Volume('db', [Document])
+ volume['document'].create(guid='1', seqno=1, prop='a')
+ for i in os.listdir('db/document/1/1'):
+ os.utime('db/document/1/1/%s' % i, (1, 1))
+ volume['document'].create(guid='2', seqno=2, prop='b')
+ for i in os.listdir('db/document/2/2'):
+ os.utime('db/document/2/2/%s' % i, (2, 2))
+
+ in_seq = util.Sequence([[1, None]])
+ diff = sync.diff(volume, in_seq)
+ self.assertEqual({'document': 'document'}, diff.send(None))
+ self.assertEqual('1', diff.send(None)['guid'])
+ self.assertEqual({'commit': []}, diff.send(sync._EOF))
+ try:
+ diff.send(None)
+ assert False
+ except StopIteration:
+ pass
+
+ diff = sync.diff(volume, in_seq)
+ self.assertEqual({'document': 'document'}, diff.send(None))
+ self.assertEqual('1', diff.send(None)['guid'])
+ self.assertEqual('2', diff.send(None)['guid'])
+ self.assertEqual({'commit': [[1, 1]]}, diff.send(sync._EOF))
+ try:
+ diff.send(None)
+ assert False
+ except StopIteration:
+ pass
+
+ def test_diff_Collapsed(self):
+
+ class Document(db.Document):
+
+ @db.indexed_property(slot=1)
+ def prop(self, value):
+ return value
+
+ volume = Volume('db', [Document])
+ volume['document'].create(guid='1', seqno=1, prop='a')
+ for i in os.listdir('db/document/1/1'):
+ os.utime('db/document/1/1/%s' % i, (1, 1))
+ volume['document'].create(guid='3', seqno=3, prop='c')
+ for i in os.listdir('db/document/3/3'):
+ os.utime('db/document/3/3/%s' % i, (3, 3))
+ volume['document'].create(guid='5', seqno=5, prop='f')
+ for i in os.listdir('db/document/5/5'):
+ os.utime('db/document/5/5/%s' % i, (5, 5))
+
+ in_seq = util.Sequence([[1, None]])
+ diff = sync.diff(volume, in_seq)
+ self.assertEqual({'document': 'document'}, diff.send(None))
+ self.assertEqual('1', diff.send(None)['guid'])
+ self.assertEqual('3', diff.send(None)['guid'])
+ self.assertEqual('5', diff.send(None)['guid'])
+ self.assertEqual({'commit': [[1, 1], [3, 3]]}, diff.send(sync._EOF))
+ try:
+ diff.send(None)
+ assert False
+ except StopIteration:
+ pass
+
+ diff = sync.diff(volume, in_seq)
+ self.assertEqual({'document': 'document'}, diff.send(None))
+ self.assertEqual('1', diff.send(None)['guid'])
+ self.assertEqual('3', diff.send(None)['guid'])
+ self.assertEqual('5', diff.send(None)['guid'])
+ self.assertEqual({'commit': [[1, 5]]}, diff.send(None))
+ try:
+ diff.send(None)
+ assert False
+ except StopIteration:
+ pass
+
+ def test_diff_TheSameInSeqForAllDocuments(self):
+
+ class Document1(db.Document):
+ pass
+
+ class Document2(db.Document):
+ pass
+
+ class Document3(db.Document):
+ pass
+
+ volume = Volume('db', [Document1, Document2, Document3])
+ volume['document1'].create(guid='3', seqno=3)
+ for i in os.listdir('db/document1/3/3'):
+ os.utime('db/document1/3/3/%s' % i, (3, 3))
+ volume['document2'].create(guid='2', seqno=2)
+ for i in os.listdir('db/document2/2/2'):
+ os.utime('db/document2/2/2/%s' % i, (2, 2))
+ volume['document3'].create(guid='1', seqno=1)
+ for i in os.listdir('db/document3/1/1'):
+ os.utime('db/document3/1/1/%s' % i, (1, 1))
+
+ in_seq = util.Sequence([[1, None]])
+ diff = sync.diff(volume, in_seq)
+ self.assertEqual({'document': 'document1'}, diff.send(None))
+ self.assertEqual('3', diff.send(None)['guid'])
+ self.assertEqual({'document': 'document2'}, diff.send(None))
+ self.assertEqual('2', diff.send(None)['guid'])
+ self.assertEqual({'document': 'document3'}, diff.send(None))
+ self.assertEqual('1', diff.send(None)['guid'])
+ self.assertEqual({'commit': [[1, 3]]}, diff.send(None))
+ try:
+ diff.send(None)
+ assert False
+ except StopIteration:
+ pass
+
+ def test_merge_Create(self):
+
+ class Document1(db.Document):
+
+ @db.indexed_property(slot=1)
+ def prop(self, value):
+ return value
+
+ class Document2(db.Document):
+ pass
+
+ self.touch(('db/seqno', '100'))
+ volume = Volume('db', [Document1, Document2])
+
+ records = [
+ {'document': 'document1'},
+ {'guid': '1', 'diff': {
+ 'guid': {'value': '1', 'mtime': 1.0},
+ 'ctime': {'value': 2, 'mtime': 2.0},
+ 'mtime': {'value': 3, 'mtime': 3.0},
+ 'prop': {'value': '4', 'mtime': 4.0},
+ }},
+ {'document': 'document2'},
+ {'guid': '5', 'diff': {
+ 'guid': {'value': '5', 'mtime': 5.0},
+ 'ctime': {'value': 6, 'mtime': 6.0},
+ 'mtime': {'value': 7, 'mtime': 7.0},
+ }},
+ {'commit': [[1, 2]]},
+ ]
+ self.assertEqual(([[1, 2]], [[101, 102]]), sync.merge(volume, records))
+
+ self.assertEqual(
+ {'guid': '1', 'prop': '4', 'ctime': 2, 'mtime': 3},
+ volume['document1'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
+ self.assertEqual(1, os.stat('db/document1/1/1/guid').st_mtime)
+ self.assertEqual(2, os.stat('db/document1/1/1/ctime').st_mtime)
+ self.assertEqual(3, os.stat('db/document1/1/1/mtime').st_mtime)
+ self.assertEqual(4, os.stat('db/document1/1/1/prop').st_mtime)
+
+ self.assertEqual(
+ {'guid': '5', 'ctime': 6, 'mtime': 7},
+ volume['document2'].get('5').properties(['guid', 'ctime', 'mtime']))
+ self.assertEqual(5, os.stat('db/document2/5/5/guid').st_mtime)
+ self.assertEqual(6, os.stat('db/document2/5/5/ctime').st_mtime)
+ self.assertEqual(7, os.stat('db/document2/5/5/mtime').st_mtime)
+
+ def test_merge_Update(self):
+
+ class Document(db.Document):
+
+ @db.indexed_property(slot=1)
+ def prop(self, value):
+ return value
+
+ self.touch(('db/seqno', '100'))
+ volume = Volume('db', [Document])
+ volume['document'].create(guid='1', prop='1', ctime=1, mtime=1)
+ for i in os.listdir('db/document/1/1'):
+ os.utime('db/document/1/1/%s' % i, (2, 2))
+
+ records = [
+ {'document': 'document'},
+ {'guid': '1', 'diff': {'prop': {'value': '2', 'mtime': 1.0}}},
+ {'commit': 1},
+ ]
+ self.assertEqual((1, []), sync.merge(volume, records))
+ self.assertEqual(
+ {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1},
+ volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
+ self.assertEqual(2, os.stat('db/document/1/1/prop').st_mtime)
+
+ records = [
+ {'document': 'document'},
+ {'guid': '1', 'diff': {'prop': {'value': '3', 'mtime': 2.0}}},
+ {'commit': 2},
+ ]
+ self.assertEqual((2, []), sync.merge(volume, records))
+ self.assertEqual(
+ {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1},
+ volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
+ self.assertEqual(2, os.stat('db/document/1/1/prop').st_mtime)
+
+ records = [
+ {'document': 'document'},
+ {'guid': '1', 'diff': {'prop': {'value': '4', 'mtime': 3.0}}},
+ {'commit': 3},
+ ]
+ self.assertEqual((3, [[102, 102]]), sync.merge(volume, records))
+ self.assertEqual(
+ {'guid': '1', 'prop': '4', 'ctime': 1, 'mtime': 1},
+ volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
+ self.assertEqual(3, os.stat('db/document/1/1/prop').st_mtime)
+
+ def test_merge_StopOnCommit(self):
+
+ class Document(db.Document):
+ pass
+
+ self.touch(('db/seqno', '100'))
+ volume = Volume('db', [Document])
+
+ def generator():
+ for i in [
+ {'document': 'document'},
+ {'guid': '1', 'diff': {
+ 'guid': {'value': '1', 'mtime': 1.0},
+ 'ctime': {'value': 2, 'mtime': 2.0},
+ 'mtime': {'value': 3, 'mtime': 3.0},
+ 'prop': {'value': '4', 'mtime': 4.0},
+ }},
+ {'commit': [[1, 1]]},
+ {'tail': True},
+ ]:
+ yield i
+
+ records = generator()
+ self.assertEqual(([[1, 1]], [[101, 101]]), sync.merge(volume, records))
+ assert volume['document'].exists('1')
+ self.assertEqual({'tail': True}, next(records))
+
+ def test_decode(self):
+ stream = StringIO()
+ self.assertRaises(EOFError, sync.decode(stream).next)
+
+ stream = StringIO()
+ pickle.dump({'foo': 'bar'}, stream)
+ stream.seek(0)
+ sinput = sync.decode(stream)
+ self.assertEqual({'foo': 'bar'}, sinput.next())
+ self.assertRaises(EOFError, sinput.next)
+
+ stream = StringIO()
+ pickle.dump({'commit': None}, stream)
+ stream.seek(0)
+ self.assertEqual([
+ {'commit': None},
+ ], [i for i in sync.decode(stream)])
+
+ stream = StringIO()
+ pickle.dump({}, stream)
+ pickle.dump({'commit': None}, stream)
+ stream.seek(0)
+ self.assertEqual([
+ {},
+ {'commit': None},
+ ], [i for i in sync.decode(stream)])
+
+ def test_chunked_encode(self):
+ output = sync.chunked_encode(iter([]))
+ self.assertEqual('0\r\n\r\n', output.read(10))
+
+ data = [{'foo': 1}, {'bar': 2}, 3]
+ data_stream = ''
+ for record in data:
+ data_stream += pickle.dumps(record)
+
+ output = sync.chunked_encode(iter(data))
+ dump = StringIO()
+ while True:
+ chunk = output.read(1)
+ if not chunk:
+ break
+ dump.write(chunk)
+ self.assertEqual(data_stream, decode_chunked(dump.getvalue()))
+
+ output = sync.chunked_encode(iter(data))
+ dump = StringIO()
+ while True:
+ chunk = output.read(2)
+ if not chunk:
+ break
+ dump.write(chunk)
+ self.assertEqual(data_stream, decode_chunked(dump.getvalue()))
+
+ output = sync.chunked_encode(iter(data))
+ dump = StringIO()
+ while True:
+ chunk = output.read(1000)
+ if not chunk:
+ break
+ dump.write(chunk)
+ self.assertEqual(data_stream, decode_chunked(dump.getvalue()))
+
+
+def decode_chunked(encdata):
+ offset = 0
+ newdata = ''
+ while (encdata != ''):
+ off = int(encdata[:encdata.index("\r\n")],16)
+ if off == 0:
+ break
+ encdata = encdata[encdata.index("\r\n") + 2:]
+ newdata = "%s%s" % (newdata, encdata[:off])
+ encdata = encdata[off+2:]
+ return newdata
+
+
+if __name__ == '__main__':
+ tests.main()
diff --git a/tests/units/node/sync_node.py b/tests/units/node/sync_node.py
index 90e5fa5..6d53100 100755
--- a/tests/units/node/sync_node.py
+++ b/tests/units/node/sync_node.py
@@ -401,7 +401,7 @@ class SyncCommands(sync_node.SyncCommands):
self.node_mount = self
self.events = []
- def publish(self, event):
+ def broadcast(self, event):
self.events.append(event)
diff --git a/tests/units/node/sync_online.py b/tests/units/node/sync_online.py
new file mode 100755
index 0000000..8b8f2e5
--- /dev/null
+++ b/tests/units/node/sync_online.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+from __init__ import tests
+
+from sugar_network import db
+from sugar_network.client import Client, api_url
+from sugar_network.node import sync
+from sugar_network.node.master import MasterCommands
+from sugar_network.node.slave import SlaveCommands
+from sugar_network.resources.volume import Volume
+from sugar_network.resources.user import User
+from sugar_network.resources.feedback import Feedback
+from sugar_network.toolkit.router import Request, Router
+from sugar_network.toolkit import util, coroutine
+
+
+class SyncOnlineTest(tests.Test):
+
+ def setUp(self):
+ tests.Test.setUp(self)
+
+ class Document(Feedback):
+ pass
+
+ self.master_volume = Volume('master', [User, Document])
+ self.master_server = coroutine.WSGIServer(('localhost', 9000), Router(MasterCommands(self.master_volume)))
+ coroutine.spawn(self.master_server.serve_forever)
+ coroutine.dispatch()
+ client = Client('http://localhost:9000')
+ client.get(cmd='whoami')
+
+ api_url.value = 'http://localhost:9000'
+ self.slave_volume = Volume('slave', [User, Document])
+ self.slave_server = coroutine.WSGIServer(('localhost', 9001), Router(SlaveCommands('slave', self.slave_volume)))
+ coroutine.spawn(self.slave_server.serve_forever)
+ coroutine.dispatch()
+
+ def tearDown(self):
+ self.master_server.stop()
+ self.slave_server.stop()
+ tests.Test.tearDown(self)
+
+ def test_sync_Creaqte(self):
+ client = Client('http://localhost:9001')
+
+ guid1 = client.post(['document'], {'context': '', 'content': '1', 'title': '', 'type': 'idea'})
+ guid2 = client.post(['document'], {'context': '', 'content': '2', 'title': '', 'type': 'idea'})
+
+ client.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid1, 'content': {'en-us': '1'}},
+ {'guid': guid2, 'content': {'en-us': '2'}},
+ ],
+ [i.properties(['guid', 'content']) for i in self.master_volume['document'].find()[0]])
+
+ guid3 = client.post(['document'], {'context': '', 'content': '3', 'title': '', 'type': 'idea'})
+ client.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid1, 'content': {'en-us': '1'}},
+ {'guid': guid2, 'content': {'en-us': '2'}},
+ {'guid': guid3, 'content': {'en-us': '3'}},
+ ],
+ [i.properties(['guid', 'content']) for i in self.master_volume['document'].find()[0]])
+
+ def test_sync_Update(self):
+ client = Client('http://localhost:9001')
+
+ guid = client.post(['document'], {'context': '', 'content': '1', 'title': '', 'type': 'idea'})
+ client.post(cmd='online_sync')
+ coroutine.sleep(1)
+
+ client.put(['document', guid], {'content': '2'})
+ client.post(cmd='online_sync')
+ self.assertEqual(
+ {'guid': guid, 'content': {'en-us': '2'}},
+ self.master_volume['document'].get(guid).properties(['guid', 'content']))
+
+ def test_sync_Delete(self):
+ client = Client('http://localhost:9001')
+
+ guid1 = client.post(['document'], {'context': '', 'content': '1', 'title': '', 'type': 'idea'})
+ guid2 = client.post(['document'], {'context': '', 'content': '2', 'title': '', 'type': 'idea'})
+ guid3 = client.post(['document'], {'context': '', 'content': '3', 'title': '', 'type': 'idea'})
+ client.post(cmd='online_sync')
+ coroutine.sleep(1)
+
+ client.delete(['document', guid2])
+ client.post(cmd='online_sync')
+ self.assertEqual([
+ {'guid': guid1, 'content': {'en-us': '1'}, 'layer': ['public']},
+ {'guid': guid2, 'content': {'en-us': '2'}, 'layer': ['deleted']},
+ {'guid': guid3, 'content': {'en-us': '3'}, 'layer': ['public']},
+ ],
+ [i.properties(['guid', 'content', 'layer']) for i in self.master_volume['document'].find()[0]])
+
+
+
+if __name__ == '__main__':
+ tests.main()
diff --git a/tests/units/resources/volume.py b/tests/units/resources/volume.py
index 35722a6..e3d599c 100755
--- a/tests/units/resources/volume.py
+++ b/tests/units/resources/volume.py
@@ -10,7 +10,6 @@ from os.path import exists
from __init__ import tests
from sugar_network import db, node
-from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, DiskFull
from sugar_network.resources.volume import Volume, Resource, Commands, VolumeCommands
from sugar_network.resources.user import User
from sugar_network.toolkit.router import Request
@@ -19,405 +18,6 @@ from sugar_network.toolkit import coroutine, sugar, util
class VolumeTest(tests.Test):
- def test_diff(self):
-
- class Document(db.Document):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- volume = Volume('db', [Document])
- volume['document'].create(guid='1', seqno=1, prop='a')
- for i in os.listdir('db/document/1/1'):
- os.utime('db/document/1/1/%s' % i, (1, 1))
- volume['document'].create(guid='2', seqno=2, prop='b')
- for i in os.listdir('db/document/2/2'):
- os.utime('db/document/2/2/%s' % i, (2, 2))
-
- class Packet(list):
-
- def push(self, **kwargs):
- self.append(kwargs)
- return True
-
- packet = Packet()
- in_seq = util.Sequence([[1, None]])
- volume.diff(in_seq, packet)
- self.assertEqual([
- {'document': 'document'},
- {'guid': '1',
- 'diff': {
- 'guid': {'value': '1', 'mtime': 1.0},
- 'mtime': {'value': 0, 'mtime': 1.0},
- 'ctime': {'value': 0, 'mtime': 1.0},
- 'prop': {'value': 'a', 'mtime': 1.0},
- },
- },
- {'guid': '2',
- 'diff': {
- 'guid': {'value': '2', 'mtime': 2.0},
- 'mtime': {'value': 0, 'mtime': 2.0},
- 'ctime': {'value': 0, 'mtime': 2.0},
- 'prop': {'value': 'b', 'mtime': 2.0},
- },
- },
- {'commit': [[1, 2]]},
- ],
- packet)
- self.assertEqual([[3, None]], in_seq)
-
- def test_diff_Partial(self):
-
- class Document(db.Document):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- volume = Volume('db', [Document])
- volume['document'].create(guid='1', seqno=1, prop='a')
- for i in os.listdir('db/document/1/1'):
- os.utime('db/document/1/1/%s' % i, (1, 1))
- volume['document'].create(guid='2', seqno=2, prop='b')
- for i in os.listdir('db/document/2/2'):
- os.utime('db/document/2/2/%s' % i, (2, 2))
-
- class Packet(list):
-
- def push(self, **kwargs):
- if kwargs.get('guid') == '1':
- return False
- self.append(kwargs)
- return True
-
- packet = Packet()
- in_seq = util.Sequence([[1, None]])
- volume.diff(in_seq, packet)
- self.assertEqual([
- {'document': 'document'},
- {'commit': []},
- ],
- packet)
- self.assertEqual([[1, None]], in_seq)
-
- class Packet(list):
-
- def push(self, **kwargs):
- if kwargs.get('guid') == '2':
- return False
- self.append(kwargs)
- return True
-
- packet = Packet()
- in_seq = util.Sequence([[1, None]])
- volume.diff(in_seq, packet)
- self.assertEqual([
- {'document': 'document'},
- {'guid': '1',
- 'diff': {
- 'guid': {'value': '1', 'mtime': 1.0},
- 'mtime': {'value': 0, 'mtime': 1.0},
- 'ctime': {'value': 0, 'mtime': 1.0},
- 'prop': {'value': 'a', 'mtime': 1.0},
- },
- },
- {'commit': [[1, 1]]},
- ],
- packet)
- self.assertEqual([[2, None]], in_seq)
-
- def test_diff_Collapsed(self):
-
- class Document(db.Document):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- volume = Volume('db', [Document])
- volume['document'].create(guid='1', seqno=1, prop='a')
- for i in os.listdir('db/document/1/1'):
- os.utime('db/document/1/1/%s' % i, (1, 1))
- volume['document'].create(guid='3', seqno=3, prop='c')
- for i in os.listdir('db/document/3/3'):
- os.utime('db/document/3/3/%s' % i, (3, 3))
- volume['document'].create(guid='5', seqno=5, prop='f')
- for i in os.listdir('db/document/5/5'):
- os.utime('db/document/5/5/%s' % i, (5, 5))
-
- class Packet(list):
-
- def push(self, **kwargs):
- if kwargs.get('guid') == '5':
- return False
- self.append(kwargs)
- return True
-
- packet = Packet()
- in_seq = util.Sequence([[1, None]])
- volume.diff(in_seq, packet)
- self.assertEqual([
- {'document': 'document'},
- {'guid': '1',
- 'diff': {
- 'guid': {'value': '1', 'mtime': 1.0},
- 'mtime': {'value': 0, 'mtime': 1.0},
- 'ctime': {'value': 0, 'mtime': 1.0},
- 'prop': {'value': 'a', 'mtime': 1.0},
- },
- },
- {'guid': '3',
- 'diff': {
- 'guid': {'value': '3', 'mtime': 3.0},
- 'mtime': {'value': 0, 'mtime': 3.0},
- 'ctime': {'value': 0, 'mtime': 3.0},
- 'prop': {'value': 'c', 'mtime': 3.0},
- },
- },
- {'commit': [[1, 1], [3, 3]]},
- ],
- packet)
- self.assertEqual([[2, 2], [4, None]], in_seq)
-
- class Packet(list):
-
- def push(self, **kwargs):
- self.append(kwargs)
- return True
-
- packet = Packet()
- in_seq = util.Sequence([[1, None]])
- volume.diff(in_seq, packet)
- self.assertEqual([
- {'document': 'document'},
- {'guid': '1',
- 'diff': {
- 'guid': {'value': '1', 'mtime': 1.0},
- 'mtime': {'value': 0, 'mtime': 1.0},
- 'ctime': {'value': 0, 'mtime': 1.0},
- 'prop': {'value': 'a', 'mtime': 1.0},
- },
- },
- {'guid': '3',
- 'diff': {
- 'guid': {'value': '3', 'mtime': 3.0},
- 'mtime': {'value': 0, 'mtime': 3.0},
- 'ctime': {'value': 0, 'mtime': 3.0},
- 'prop': {'value': 'c', 'mtime': 3.0},
- },
- },
- {'guid': '5',
- 'diff': {
- 'guid': {'value': '5', 'mtime': 5.0},
- 'mtime': {'value': 0, 'mtime': 5.0},
- 'ctime': {'value': 0, 'mtime': 5.0},
- 'prop': {'value': 'f', 'mtime': 5.0},
- },
- },
- {'commit': [[1, 5]]},
- ],
- packet)
- self.assertEqual([[6, None]], in_seq)
-
- def test_diff_TheSameInSeqForAllDocuments(self):
-
- class Document1(db.Document):
- pass
-
- class Document2(db.Document):
- pass
-
- class Document3(db.Document):
- pass
-
- volume = Volume('db', [Document1, Document2, Document3])
- volume['document1'].create(guid='3', seqno=3)
- for i in os.listdir('db/document1/3/3'):
- os.utime('db/document1/3/3/%s' % i, (3, 3))
- volume['document2'].create(guid='2', seqno=2)
- for i in os.listdir('db/document2/2/2'):
- os.utime('db/document2/2/2/%s' % i, (2, 2))
- volume['document3'].create(guid='1', seqno=1)
- for i in os.listdir('db/document3/1/1'):
- os.utime('db/document3/1/1/%s' % i, (1, 1))
-
- class Packet(list):
-
- def push(self, **kwargs):
- self.append(kwargs)
- return True
-
- packet = Packet()
- in_seq = util.Sequence([[1, None]])
- volume.diff(in_seq, packet)
- self.assertEqual([
- {'document': 'document1'},
- {'guid': '3',
- 'diff': {
- 'guid': {'value': '3', 'mtime': 3.0},
- 'mtime': {'value': 0, 'mtime': 3.0},
- 'ctime': {'value': 0, 'mtime': 3.0},
- },
- },
- {'document': 'document2'},
- {'guid': '2',
- 'diff': {
- 'guid': {'value': '2', 'mtime': 2.0},
- 'mtime': {'value': 0, 'mtime': 2.0},
- 'ctime': {'value': 0, 'mtime': 2.0},
- },
- },
- {'document': 'document3'},
- {'guid': '1',
- 'diff': {
- 'guid': {'value': '1', 'mtime': 1.0},
- 'mtime': {'value': 0, 'mtime': 1.0},
- 'ctime': {'value': 0, 'mtime': 1.0},
- },
- },
- {'commit': [[1, 3]]},
- ],
- packet)
- self.assertEqual([[4, None]], in_seq)
-
- def test_merge_Create(self):
-
- class Document1(db.Document):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- class Document2(db.Document):
- pass
-
- volume = Volume('db', [Document1, Document2])
-
- self.assertEqual(
- [[1, 2]],
- volume.merge([
- {'document': 'document1'},
- {'guid': '1',
- 'diff': {
- 'guid': {'value': '1', 'mtime': 1.0},
- 'ctime': {'value': 2, 'mtime': 2.0},
- 'mtime': {'value': 3, 'mtime': 3.0},
- 'prop': {'value': '4', 'mtime': 4.0},
- },
- },
- {'document': 'document2'},
- {'guid': '5',
- 'diff': {
- 'guid': {'value': '5', 'mtime': 5.0},
- 'ctime': {'value': 6, 'mtime': 6.0},
- 'mtime': {'value': 7, 'mtime': 7.0},
- },
- },
- {'commit': [[1, 2]]},
- ]))
-
- self.assertEqual(
- {'guid': '1', 'prop': '4', 'ctime': 2, 'mtime': 3},
- volume['document1'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
- self.assertEqual(1, os.stat('db/document1/1/1/guid').st_mtime)
- self.assertEqual(2, os.stat('db/document1/1/1/ctime').st_mtime)
- self.assertEqual(3, os.stat('db/document1/1/1/mtime').st_mtime)
- self.assertEqual(4, os.stat('db/document1/1/1/prop').st_mtime)
-
- self.assertEqual(
- {'guid': '5', 'ctime': 6, 'mtime': 7},
- volume['document2'].get('5').properties(['guid', 'ctime', 'mtime']))
- self.assertEqual(5, os.stat('db/document2/5/5/guid').st_mtime)
- self.assertEqual(6, os.stat('db/document2/5/5/ctime').st_mtime)
- self.assertEqual(7, os.stat('db/document2/5/5/mtime').st_mtime)
-
- def test_merge_Update(self):
-
- class Document(db.Document):
-
- @db.indexed_property(slot=1)
- def prop(self, value):
- return value
-
- volume = Volume('db', [Document])
- volume['document'].create(guid='1', prop='1', ctime=1, mtime=1)
- for i in os.listdir('db/document/1/1'):
- os.utime('db/document/1/1/%s' % i, (2, 2))
-
- self.assertEqual(
- [],
- volume.merge([
- {'document': 'document'},
- {'guid': '1',
- 'diff': {
- 'prop': {'value': '2', 'mtime': 1.0},
- },
- },
- {'commit': []},
- ]))
- self.assertEqual(
- {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1},
- volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
- self.assertEqual(2, os.stat('db/document/1/1/prop').st_mtime)
-
- self.assertEqual(
- [],
- volume.merge([
- {'document': 'document'},
- {'guid': '1',
- 'diff': {
- 'prop': {'value': '3', 'mtime': 2.0},
- },
- },
- {'commit': []},
- ]))
- self.assertEqual(
- {'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1},
- volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
- self.assertEqual(2, os.stat('db/document/1/1/prop').st_mtime)
-
- self.assertEqual(
- [],
- volume.merge([
- {'document': 'document'},
- {'guid': '1',
- 'diff': {
- 'prop': {'value': '4', 'mtime': 3.0},
- },
- },
- {'commit': []},
- ]))
- self.assertEqual(
- {'guid': '1', 'prop': '4', 'ctime': 1, 'mtime': 1},
- volume['document'].get('1').properties(['guid', 'ctime', 'mtime', 'prop']))
- self.assertEqual(3, os.stat('db/document/1/1/prop').st_mtime)
-
- def test_merge_StopOnCommit(self):
-
- class Document(db.Document):
- pass
-
- volume = Volume('db', [Document])
-
- diff = iter([
- {'document': 'document'},
- {'guid': '1',
- 'diff': {
- 'guid': {'value': '1', 'mtime': 1.0},
- 'ctime': {'value': 2, 'mtime': 2.0},
- 'mtime': {'value': 3, 'mtime': 3.0},
- 'prop': {'value': '4', 'mtime': 4.0},
- },
- },
- {'commit': [[1, 1]]},
- {'tail': True},
- ])
-
- self.assertEqual([[1, 1]], volume.merge(diff))
- assert volume['document'].exists('1')
- self.assertEqual([{'tail': True}], [i for i in diff])
-
def test_SimulateDeleteEvents(self):
class Document(Resource):
@@ -904,15 +504,6 @@ class TestCommands(VolumeCommands, Commands):
self.volume.connect(callback, condition)
-def read_packet(packet):
- result = []
- for i in InPacket(stream=packet.pop()):
- if 'diff' in i:
- i.pop('diff')
- result.append(i)
- return result
-
-
def call(cp, principal=None, content=None, **kwargs):
request = Request(**kwargs)
request.principal = principal
diff --git a/tests/units/toolkit/__main__.py b/tests/units/toolkit/__main__.py
index 30a7b47..7a9587a 100644
--- a/tests/units/toolkit/__main__.py
+++ b/tests/units/toolkit/__main__.py
@@ -2,11 +2,9 @@
from __init__ import tests
-#from files_sync import *
from http import *
from mountpoints import *
from router import *
-from sneakernet import *
from util import *
if __name__ == '__main__':
diff --git a/tests/units/toolkit/util.py b/tests/units/toolkit/util.py
index c5b1311..a25eea5 100755
--- a/tests/units/toolkit/util.py
+++ b/tests/units/toolkit/util.py
@@ -3,9 +3,11 @@
import copy
from os.path import exists
+from cStringIO import StringIO
from __init__ import tests
+from sugar_network.toolkit import util
from sugar_network.toolkit.util import Seqno, Sequence
@@ -337,6 +339,25 @@ class UtilTest(tests.Test):
seq.include(10, 11)
self.assertEqual(11, seq.last)
+ def test_readline(self):
+
+ def readlines(string):
+ result = []
+ stream = StringIO(string)
+ while True:
+ line = util.readline(stream)
+ if not line:
+ break
+ result.append(line)
+ return result
+
+ self.assertEqual([], readlines(''))
+ self.assertEqual([' '], readlines(' '))
+ self.assertEqual([' a '], readlines(' a '))
+ self.assertEqual(['\n'], readlines('\n'))
+ self.assertEqual(['\n', 'b'], readlines('\nb'))
+ self.assertEqual([' \n', ' b \n'], readlines(' \n b \n'))
+
if __name__ == '__main__':
tests.main()
diff --git a/tests/units/zerosugar/injector.py b/tests/units/zerosugar/injector.py
index f5299b6..0b61531 100755
--- a/tests/units/zerosugar/injector.py
+++ b/tests/units/zerosugar/injector.py
@@ -12,13 +12,12 @@ from os.path import exists, dirname
from __init__ import tests
-from sugar_network import zeroinstall
from sugar_network.client import journal
from sugar_network.toolkit import coroutine, enforce, pipe as pipe_, lsb_release
from sugar_network.resources.user import User
from sugar_network.resources.context import Context
from sugar_network.resources.implementation import Implementation
-from sugar_network.zerosugar import packagekit, injector, clones
+from sugar_network.zerosugar import packagekit, injector, clones, solver
from sugar_network.client import IPCClient
from sugar_network import client as local
@@ -395,7 +394,7 @@ class InjectorTest(tests.Test):
def test_SolutionsCache_Set(self):
solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}]
- self.override(zeroinstall, 'solve', lambda *args: solution)
+ self.override(solver, 'solve', lambda *args: solution)
self.assertEqual(solution, injector._solve('~', 'context'))
self.assertEqual([local.api_url.value, solution], pickle.load(file('cache/solutions/~/co/context')))
@@ -408,7 +407,7 @@ class InjectorTest(tests.Test):
def test_SolutionsCache_InvalidateByAPIUrl(self):
solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}]
- self.override(zeroinstall, 'solve', lambda *args: solution)
+ self.override(solver, 'solve', lambda *args: solution)
cached_path = 'cache/solutions/~/co/context'
solution2 = [{'name': 'name2', 'context': 'context2', 'id': 'id2', 'version': 'version2'}]
@@ -422,7 +421,7 @@ class InjectorTest(tests.Test):
def test_SolutionsCache_InvalidateByMtime(self):
solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}]
- self.override(zeroinstall, 'solve', lambda *args: solution)
+ self.override(solver, 'solve', lambda *args: solution)
cached_path = 'cache/solutions/~/co/context'
solution2 = [{'name': 'name2', 'context': 'context2', 'id': 'id2', 'version': 'version2'}]
@@ -442,7 +441,7 @@ class InjectorTest(tests.Test):
def test_SolutionsCache_InvalidateByPMSMtime(self):
solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}]
- self.override(zeroinstall, 'solve', lambda *args: solution)
+ self.override(solver, 'solve', lambda *args: solution)
cached_path = 'cache/solutions/~/co/context'
injector._pms_path = 'pms'
@@ -464,7 +463,7 @@ class InjectorTest(tests.Test):
def test_SolutionsCache_InvalidateBySpecMtime(self):
solution = [{'name': 'name', 'context': 'context', 'id': 'id', 'version': 'version'}]
- self.override(zeroinstall, 'solve', lambda *args: solution)
+ self.override(solver, 'solve', lambda *args: solution)
cached_path = 'cache/solutions/~/co/context'
solution2 = [{'spec': 'spec', 'name': 'name2', 'context': 'context2', 'id': 'id2', 'version': 'version2'}]
@@ -630,7 +629,7 @@ class InjectorTest(tests.Test):
{'version': '1', 'id': 'dep3', 'context': 'dep3', 'name': 'title3'},
{'name': 'title', 'version': '1', 'command': ['echo'], 'context': context, 'mountpoint': '/', 'id': impl},
]),
- sorted(zeroinstall.solve('/', context)))
+ sorted(solver.solve('/', context)))
def test_NoDepsClonning(self):
self.touch('remote/master')
@@ -663,12 +662,12 @@ class InjectorTest(tests.Test):
},
}})
- self.assertRaises(RuntimeError, zeroinstall.solve, '/', context)
+ self.assertRaises(RuntimeError, solver.solve, '/', context)
- zeroinstall.nodeps = True
+ solver.nodeps = True
self.assertEqual(
[{'name': 'title', 'version': '1', 'command': ['echo'], 'context': context, 'mountpoint': '/', 'id': impl}],
- zeroinstall.solve('/', context))
+ solver.solve('/', context))
def test_LoadFeed_SetPackages(self):
self.touch('remote/master')
@@ -711,7 +710,7 @@ class InjectorTest(tests.Test):
return dict([(i, {'name': i, 'pk_id': i, 'version': '1', 'arch': '*', 'installed': True}) for i in names])
self.override(packagekit, 'resolve', resolve)
- self.assertRaises(RuntimeError, zeroinstall.solve, '/', context)
+ self.assertRaises(RuntimeError, solver.solve, '/', context)
remote.put(['context', 'dep', 'packages'], {
lsb_release.distributor_id() + '-' + lsb_release.release(): {
@@ -719,7 +718,7 @@ class InjectorTest(tests.Test):
'binary': ['bin'],
},
})
- self.assertEqual('dep', zeroinstall.solve('/', context)[-1]['context'])
+ self.assertEqual('dep', solver.solve('/', context)[-1]['context'])
remote.put(['context', 'dep', 'packages'], {
'foo': {
@@ -727,14 +726,14 @@ class InjectorTest(tests.Test):
'binary': ['bin'],
},
})
- self.assertRaises(RuntimeError, zeroinstall.solve, '/', context)
+ self.assertRaises(RuntimeError, solver.solve, '/', context)
remote.put(['context', 'dep', 'packages'], {
lsb_release.distributor_id(): {
'binary': ['bin'],
},
})
- self.assertEqual('dep', zeroinstall.solve('/', context)[-1]['context'])
+ self.assertEqual('dep', solver.solve('/', context)[-1]['context'])
def test_SolveSugar(self):
self.touch(('__init__.py', ''))
@@ -782,7 +781,7 @@ class InjectorTest(tests.Test):
{'name': 'title', 'version': '1', 'command': ['echo'], 'context': context, 'mountpoint': '/', 'id': impl},
{'name': 'sugar', 'version': '777', 'context': 'sugar', 'path': '/', 'mountpoint': None, 'id': 'sugar'},
],
- zeroinstall.solve('/', context))
+ solver.solve('/', context))
if __name__ == '__main__':