diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2012-07-31 06:13:11 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2012-07-31 06:19:50 (GMT) |
commit | 384bf05497e295c155903018b347ff17668ad20d (patch) | |
tree | b8c0be4767ec37c45d109938e465561842e991cd | |
parent | 6fad82b1eee71687604a465584e862b9c840d019 (diff) |
Full support for files sync
-rwxr-xr-x | sugar-network-server | 2 | ||||
-rwxr-xr-x | sugar-network-service | 4 | ||||
-rwxr-xr-x | sugar-network-sync | 2 | ||||
-rw-r--r-- | sugar_network/local/__init__.py | 4 | ||||
-rw-r--r-- | sugar_network/local/mounts.py | 34 | ||||
-rw-r--r-- | sugar_network/local/mountset.py | 11 | ||||
-rw-r--r-- | sugar_network/node/__init__.py | 11 | ||||
-rw-r--r-- | sugar_network/node/commands.py | 18 | ||||
-rw-r--r-- | sugar_network/toolkit/files_sync.py | 60 | ||||
-rw-r--r-- | tests/__init__.py | 1 | ||||
-rwxr-xr-x | tests/integration/sync.py | 15 | ||||
-rwxr-xr-x | tests/units/files_sync.py | 72 | ||||
-rwxr-xr-x | tests/units/sync_master.py | 2 |
13 files changed, 190 insertions, 46 deletions
diff --git a/sugar-network-server b/sugar-network-server index 1878faf..928fe92 100755 --- a/sugar-network-server +++ b/sugar-network-server @@ -55,7 +55,7 @@ class Application(application.Daemon): self.jobs.spawn(volume.populate) subscriber = SubscribeSocket(volume, node.host.value, node.subscribe_port.value) - cp = MasterCommands(volume, subscriber) + cp = MasterCommands(volume, subscriber, node.sync_dirs.value) logging.info('Listening for requests on %s:%s', node.host.value, node.port.value) diff --git a/sugar-network-service b/sugar-network-service index 48672f5..4e2b18c 100755 --- a/sugar-network-service +++ b/sugar-network-service @@ -151,7 +151,7 @@ class Application(application.Application): jobs = coroutine.Pool() volume = Volume(self._db_path, lazy_open=local.lazy_open.value) - mountset = Mountset(volume) + mountset = Mountset(volume, node.sync_dirs.value) mountset['~'] = HomeMount(volume) mountset['/'] = RemoteMount(volume) @@ -255,7 +255,7 @@ local.tmpdir.value = sugar.profile_path('tmp') Option.seek('main', [application.debug]) Option.seek('webui', webui) Option.seek('local', local) -Option.seek('node', [node.port, node.subscribe_port]) +Option.seek('node', [node.port, node.subscribe_port, node.sync_dirs]) Option.seek('active-document', ad) application = Application( diff --git a/sugar-network-sync b/sugar-network-sync index 2a2f162..b743788 100755 --- a/sugar-network-sync +++ b/sugar-network-sync @@ -153,7 +153,7 @@ disk_limit=$(expr 1024 \* 1024 \* 10) if [ $# -eq 0 ]; then if [ -e "$(dirname $0)/.sugar-network-sync" ]; then # Script was launched from sync directory, so, process sync - sync_path="$PWD" + sync_path="$(dirname $0)" else help exit 0 diff --git a/sugar_network/local/__init__.py b/sugar_network/local/__init__.py index 9d2eebb..148b7c5 100644 --- a/sugar_network/local/__init__.py +++ b/sugar_network/local/__init__.py @@ -39,10 +39,10 @@ local_root = Option( default=sugar.profile_path('network')) activity_dirs = Option( - 'colon separated list of paths to the directories with Sugar ' \ + 'colon separated list of paths to directories with Sugar ' \ 'activities; first path will be used to keep check-in ' \ 'activities', - type_cast=Option.list_cast, type_repr=Option.list_repr, default=[ + type_cast=Option.paths_cast, type_repr=Option.paths_repr, default=[ expanduser('~/Activities'), '/usr/share/sugar/activities', '/opt/sweets', diff --git a/sugar_network/local/mounts.py b/sugar_network/local/mounts.py index 7f7b216..361934e 100644 --- a/sugar_network/local/mounts.py +++ b/sugar_network/local/mounts.py @@ -416,10 +416,11 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands): class NodeMount(LocalMount, _ProxyCommands): - def __init__(self, volume, home_volume): + def __init__(self, volume, home_volume, file_syncs=None): LocalMount.__init__(self, volume) _ProxyCommands.__init__(self, home_volume) + self._file_syncs = file_syncs or {} self._push_seq = PersistentSequence( join(volume.root, 'push.sequence'), [1, None]) self._pull_seq = PersistentSequence( @@ -487,6 +488,9 @@ class NodeMount(LocalMount, _ProxyCommands): seqno=self.volume.seqno.value, api_url=local.api_url.value) as packet: if session_is_new: + for directory, sync in self._file_syncs.items(): + packet.push(cmd='files_pull', directory=directory, + sequence=sync.sequence) packet.push(cmd='sn_pull', sequence=self._pull_seq) _logger.debug('Generating %r PUSH packet to %r', @@ -544,19 +548,19 @@ class NodeMount(LocalMount, _ProxyCommands): for record in packet.records(): cmd = record.get('cmd') - if cmd == 'sn_push': self.volume.merge(record, increment_seqno=False) - - elif cmd == 'sn_commit' and from_master: - _logger.debug('Processing %r COMMIT from %r', record, packet) - self._pull_seq.exclude(record['sequence']) - - elif cmd == 'sn_ack' and from_master and \ - record['dst'] == self._node_guid: - _logger.debug('Processing %r ACK from %r', record, packet) - self._push_seq.exclude(record['sequence']) - self._pull_seq.exclude(record['merged']) - to_push_seq.exclude(record['sequence']) - self.volume.seqno.next() - self.volume.seqno.commit() + elif from_master: + if cmd == 'sn_commit': + _logger.debug('Processing %r COMMIT from %r', + record, packet) + self._pull_seq.exclude(record['sequence']) + elif cmd == 'sn_ack' and record['dst'] == self._node_guid: + _logger.debug('Processing %r ACK from %r', record, packet) + self._push_seq.exclude(record['sequence']) + self._pull_seq.exclude(record['merged']) + to_push_seq.exclude(record['sequence']) + self.volume.seqno.next() + self.volume.seqno.commit() + elif record.get('directory') in self._file_syncs: + self._file_syncs[record['directory']].push(record) diff --git a/sugar_network/local/mountset.py b/sugar_network/local/mountset.py index 1ef4f57..819f678 100644 --- a/sugar_network/local/mountset.py +++ b/sugar_network/local/mountset.py @@ -26,6 +26,7 @@ from sugar_network.toolkit.inotify import Inotify, \ from sugar_network import local, node from sugar_network.toolkit import zeroconf, netlink, network from sugar_network.toolkit.collection import MutableStack +from sugar_network.toolkit.files_sync import Leechers from sugar_network.local.mounts import LocalMount, NodeMount from sugar_network.node.subscribe_socket import SubscribeSocket from sugar_network.node.commands import NodeCommands @@ -44,13 +45,18 @@ _logger = logging.getLogger('local.mountset') class Mountset(dict, ad.CommandsProcessor): - def __init__(self, home_volume): + def __init__(self, home_volume, sync_dirs=None): dict.__init__(self) ad.CommandsProcessor.__init__(self) self.opened = coroutine.Event() self.home_volume = home_volume + if sync_dirs is None: + self._file_syncs = {} + else: + self._file_syncs = Leechers(sync_dirs, + join(home_volume.root, 'files')) self._subscriptions = {} self._locale = locale.getdefaultlocale()[0].replace('_', '-') self._jobs = coroutine.Pool() @@ -244,7 +250,8 @@ class Mountset(dict, ad.CommandsProcessor): _logger.debug('Found %r server mount', path) volume, server_mode = self._mount_volume(path) if server_mode: - self[path] = NodeMount(volume, self.home_volume) + self[path] = NodeMount(volume, self.home_volume, + self._file_syncs) else: self[path] = LocalMount(volume) diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py index c31399a..7774cc6 100644 --- a/sugar_network/node/__init__.py +++ b/sugar_network/node/__init__.py @@ -43,9 +43,8 @@ trust_users = Option( action='store_true', name='trust_users') data_root = Option( - 'path to the root directory for placing documents\' ' \ - 'data and indexes', - default='/var/lib/sugar-network/db', name='data_root') + 'path to a directory to place server data', + default='/var/lib/sugar-network', name='data_root') only_commit_events = Option( 'subscribers can be notified only with "commit" events; ' \ @@ -61,6 +60,12 @@ tmpdir = Option( 'if specified, use this directory for temporary files, such files ' \ 'might take hunder of megabytes while node synchronizing') +sync_dirs = Option( + 'colon separated list of paths to directories to synchronize with ' \ + 'master server', + type_cast=Option.paths_cast, type_repr=Option.paths_repr, + name='sync-dirs') + class HTTPStatus(Exception): diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py index dd0b343..c9de988 100644 --- a/sugar_network/node/commands.py +++ b/sugar_network/node/commands.py @@ -28,6 +28,7 @@ import active_document as ad from sugar_network import node from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, \ OutPacket, DiskFull +from sugar_network.toolkit.files_sync import Seeders from sugar_network.toolkit.collection import Sequence from active_toolkit import coroutine, util, enforce @@ -148,9 +149,10 @@ class NodeCommands(ad.VolumeCommands): class MasterCommands(NodeCommands): - def __init__(self, volume, subscriber=None, file_syncs=None): + def __init__(self, volume, subscriber=None, sync_dirs=None): NodeCommands.__init__(self, volume, subscriber) - self._file_syncs = file_syncs or {} + self._file_syncs = Seeders(sync_dirs, + join(node.data_root.value, 'files'), volume.seqno) self._pull_queue = lrucache(_PULL_QUEUE_SIZE, lambda key, pull: pull.unlink()) @@ -260,11 +262,13 @@ class MasterCommands(NodeCommands): class _Cookie(dict): def __init__(self, request=None): - if request is None: - dict.__init__(self) - else: + dict.__init__(self) + + if request is not None: value = self._get_cookie(request, 'sugar_network_sync') - dict.__init__(self, value or {}) + for key, seq in (value or {}).items(): + self[key] = Sequence(seq) + self.delay = 0 def include(self, cookie): @@ -290,8 +294,6 @@ class _Cookie(dict): seq = self.get(key) if seq is None: seq = self[key] = Sequence() - elif type(seq) is list: - seq = self[key] = Sequence(seq) return seq def _get_cookie(self, request, name): diff --git a/sugar_network/toolkit/files_sync.py b/sugar_network/toolkit/files_sync.py index 5448711..271a6ff 100644 --- a/sugar_network/toolkit/files_sync.py +++ b/sugar_network/toolkit/files_sync.py @@ -17,10 +17,11 @@ import os import json import logging from bisect import bisect_left -from os.path import join, exists, relpath, lexists, basename +from os.path import join, exists, relpath, lexists, basename, dirname from sugar_network.toolkit.sneakernet import DiskFull -from sugar_network.toolkit.collection import Sequence +from sugar_network.toolkit.collection import Sequence, PersistentSequence +from active_toolkit.sockets import BUFFER_SIZE from active_toolkit import util, coroutine @@ -168,13 +169,58 @@ class Seeder(object): json.dump((self._index, self._stamp), f) +class Seeders(dict): + + def __init__(self, sync_dirs, index_root, seqno): + dict.__init__(self) + + if not exists(index_root): + os.makedirs(index_root) + + for path in sync_dirs or []: + name = basename(path) + self[name] = Seeder(path, join(index_root, name), seqno) + + class Leecher(object): def __init__(self, files_path, sequence_path): - pass + self._files_path = files_path.rstrip(os.sep) + self.sequence = PersistentSequence(sequence_path, [1, None]) - def push(self, packet): - pass + if not exists(self._files_path): + os.makedirs(self._files_path) - def pull(self): - pass + def push(self, record): + cmd = record.get('cmd') + if cmd == 'files_push': + blob = record['blob'] + path = join(self._files_path, record['path']) + if not exists(dirname(path)): + os.makedirs(dirname(path)) + with util.new_file(path) as f: + while True: + chunk = blob.read(BUFFER_SIZE) + if not chunk: + break + f.write(chunk) + elif cmd == 'files_delete': + path = join(self._files_path, record['path']) + if exists(path): + os.unlink(path) + elif cmd == 'files_commit': + self.sequence.exclude(record['sequence']) + self.sequence.commit() + + +class Leechers(dict): + + def __init__(self, sync_dirs, sequences_root): + dict.__init__(self) + + if not exists(sequences_root): + os.makedirs(sequences_root) + + for path in sync_dirs or []: + name = basename(path) + self[name] = Leecher(path, join(sequences_root, name)) diff --git a/tests/__init__.py b/tests/__init__.py index c8b08d9..c3f0b40 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -73,6 +73,7 @@ class Test(unittest.TestCase): node.find_limit.value = 1024 node.tmpdir.value = tmpdir + '/tmp' node.only_commit_events.value = False + node.data_root.value = tmpdir ad.index_write_queue.value = 10 local.local_root.value = tmpdir local.activity_dirs.value = [tmpdir + '/Activities'] diff --git a/tests/integration/sync.py b/tests/integration/sync.py index 7905911..a8dfafd 100755 --- a/tests/integration/sync.py +++ b/tests/integration/sync.py @@ -6,6 +6,7 @@ import shutil import signal from cStringIO import StringIO from contextlib import contextmanager +from os.path import exists from __init__ import tests @@ -30,13 +31,16 @@ class SyncTest(tests.Test): 'sugar-network-server', '--port=8100', '--subscribe-port=8101', '--data-root=master/db', '--index-flush-threshold=1024', '--index-flush-timeout=3', '--only-commit-events', - '--tmpdir=tmp', '-DDDF', 'start', + '--tmpdir=tmp', '--sync-dirs=master/files/1:master/files/2', + '-DDDF', 'start', ]) self.node_pid = self.popen([ 'sugar-network-service', '--port=8200', '--subscribe-port=8201', '--activity-dirs=node/Activities', '--local-root=node', '--mounts-root=mnt', '--server-mode', '--tmpdir=tmp', - '--api-url=http://localhost:8100', '-DDDF', 'start', + '--api-url=http://localhost:8100', + '--sync-dirs=node/files/1:node/files/2', + '-DDD', 'debug', ]) coroutine.sleep(1) @@ -50,6 +54,10 @@ class SyncTest(tests.Test): tests.Test.tearDown(self) def test_Sneakernet(self): + # Create shared files on master + self.touch(('master/files/1/1', '1')) + self.touch(('master/files/2/2', '2')) + # Create initial data on master with Client('/') as client: context = client.Context(type='activity', title='title_1', summary='summary', description='description') @@ -123,6 +131,9 @@ class SyncTest(tests.Test): ]), sorted([(i['guid'], i['title'], i.get_blob('preview').read()) for i in client.Context.cursor(reply=['guid', 'title'])])) + self.assertEqual('1', file('node/files/1/1').read()) + self.assertEqual('2', file('node/files/2/2').read()) + def wait_for_events(self, *events): events = list(events) connected = coroutine.Event() diff --git a/tests/units/files_sync.py b/tests/units/files_sync.py index 14560f2..95cc33f 100755 --- a/tests/units/files_sync.py +++ b/tests/units/files_sync.py @@ -11,8 +11,8 @@ from __init__ import tests import active_document as ad from sugar_network.toolkit.collection import Sequence -from sugar_network.toolkit.files_sync import Seeder -from sugar_network.toolkit.sneakernet import OutBufferPacket, InPacket, DiskFull +from sugar_network.toolkit.files_sync import Seeder, Leecher +from sugar_network.toolkit.sneakernet import OutBufferPacket, InPacket, DiskFull, OutFilePacket CHUNK = 100000 @@ -320,6 +320,74 @@ class FilesSyncTest(tests.Test): ]), read_records(out_packet)) + def test_Leecher_push(self): + seqno = ad.Seqno('seqno') + seeder = Seeder('src/files', 'src/index', seqno) + leecher = Leecher('dst/files', 'dst/sequence') + + self.touch(('src/files/1', '1')) + self.touch(('src/files/2/3', '3')) + self.touch(('src/files/4/5/6', '6')) + self.utime('src/files', 1) + os.utime('src/files', (1, 1)) + + with OutFilePacket('.') as packet: + seeder.pull(Sequence([[1, None]]), packet) + self.assertEqual(3, seqno.value) + for i in InPacket(packet.path): + leecher.push(i) + + self.assertEqual( + '[[4, null]]', + file('dst/sequence').read()) + self.assertEqual( + '1', + file('dst/files/1').read()) + self.assertEqual( + '3', + file('dst/files/2/3').read()) + self.assertEqual( + '6', + file('dst/files/4/5/6').read()) + + os.unlink('src/files/2/3') + os.utime('src/files', (2, 2)) + + with OutFilePacket('.') as packet: + seeder.pull(Sequence([[4, None]]), packet) + self.assertEqual(4, seqno.value) + for i in InPacket(packet.path): + leecher.push(i) + + self.assertEqual( + '[[5, null]]', + file('dst/sequence').read()) + assert exists('dst/files/1') + assert not exists('dst/files/2/3') + assert exists('dst/files/4/5/6') + + os.unlink('src/files/1') + self.touch(('src/files/2/3', 'new_3')) + os.unlink('src/files/4/5/6') + self.utime('src/files', 3) + os.utime('src/files', (3, 3)) + + with OutFilePacket('.') as packet: + seeder.pull(Sequence([[5, None]]), packet) + self.assertEqual(7, seqno.value) + for i in InPacket(packet.path): + leecher.push(i) + + self.assertEqual( + '[[8, null]]', + file('dst/sequence').read()) + assert not exists('dst/files/1') + assert exists('dst/files/2/3') + assert not exists('dst/files/4/5/6') + self.assertEqual( + 'new_3', + file('dst/files/2/3').read()) + def read_records(in_packet): records = [] diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py index 611e495..4abd3a4 100755 --- a/tests/units/sync_master.py +++ b/tests/units/sync_master.py @@ -573,7 +573,7 @@ class SyncMasterTest(tests.Test): def test_pull_ProcessFilePulls(self): seqno = ad.Seqno('seqno') - master = MasterCommands('master', file_syncs={'files': Seeder('files', 'index', seqno)}) + master = MasterCommands('master', sync_dirs=['files']) request = Request() response = ad.Response() |