Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2012-07-31 06:13:11 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2012-07-31 06:19:50 (GMT)
commit384bf05497e295c155903018b347ff17668ad20d (patch)
treeb8c0be4767ec37c45d109938e465561842e991cd
parent6fad82b1eee71687604a465584e862b9c840d019 (diff)
Full support for files sync
-rwxr-xr-xsugar-network-server2
-rwxr-xr-xsugar-network-service4
-rwxr-xr-xsugar-network-sync2
-rw-r--r--sugar_network/local/__init__.py4
-rw-r--r--sugar_network/local/mounts.py34
-rw-r--r--sugar_network/local/mountset.py11
-rw-r--r--sugar_network/node/__init__.py11
-rw-r--r--sugar_network/node/commands.py18
-rw-r--r--sugar_network/toolkit/files_sync.py60
-rw-r--r--tests/__init__.py1
-rwxr-xr-xtests/integration/sync.py15
-rwxr-xr-xtests/units/files_sync.py72
-rwxr-xr-xtests/units/sync_master.py2
13 files changed, 190 insertions, 46 deletions
diff --git a/sugar-network-server b/sugar-network-server
index 1878faf..928fe92 100755
--- a/sugar-network-server
+++ b/sugar-network-server
@@ -55,7 +55,7 @@ class Application(application.Daemon):
self.jobs.spawn(volume.populate)
subscriber = SubscribeSocket(volume,
node.host.value, node.subscribe_port.value)
- cp = MasterCommands(volume, subscriber)
+ cp = MasterCommands(volume, subscriber, node.sync_dirs.value)
logging.info('Listening for requests on %s:%s',
node.host.value, node.port.value)
diff --git a/sugar-network-service b/sugar-network-service
index 48672f5..4e2b18c 100755
--- a/sugar-network-service
+++ b/sugar-network-service
@@ -151,7 +151,7 @@ class Application(application.Application):
jobs = coroutine.Pool()
volume = Volume(self._db_path, lazy_open=local.lazy_open.value)
- mountset = Mountset(volume)
+ mountset = Mountset(volume, node.sync_dirs.value)
mountset['~'] = HomeMount(volume)
mountset['/'] = RemoteMount(volume)
@@ -255,7 +255,7 @@ local.tmpdir.value = sugar.profile_path('tmp')
Option.seek('main', [application.debug])
Option.seek('webui', webui)
Option.seek('local', local)
-Option.seek('node', [node.port, node.subscribe_port])
+Option.seek('node', [node.port, node.subscribe_port, node.sync_dirs])
Option.seek('active-document', ad)
application = Application(
diff --git a/sugar-network-sync b/sugar-network-sync
index 2a2f162..b743788 100755
--- a/sugar-network-sync
+++ b/sugar-network-sync
@@ -153,7 +153,7 @@ disk_limit=$(expr 1024 \* 1024 \* 10)
if [ $# -eq 0 ]; then
if [ -e "$(dirname $0)/.sugar-network-sync" ]; then
# Script was launched from sync directory, so, process sync
- sync_path="$PWD"
+ sync_path="$(dirname $0)"
else
help
exit 0
diff --git a/sugar_network/local/__init__.py b/sugar_network/local/__init__.py
index 9d2eebb..148b7c5 100644
--- a/sugar_network/local/__init__.py
+++ b/sugar_network/local/__init__.py
@@ -39,10 +39,10 @@ local_root = Option(
default=sugar.profile_path('network'))
activity_dirs = Option(
- 'colon separated list of paths to the directories with Sugar ' \
+ 'colon separated list of paths to directories with Sugar ' \
'activities; first path will be used to keep check-in ' \
'activities',
- type_cast=Option.list_cast, type_repr=Option.list_repr, default=[
+ type_cast=Option.paths_cast, type_repr=Option.paths_repr, default=[
expanduser('~/Activities'),
'/usr/share/sugar/activities',
'/opt/sweets',
diff --git a/sugar_network/local/mounts.py b/sugar_network/local/mounts.py
index 7f7b216..361934e 100644
--- a/sugar_network/local/mounts.py
+++ b/sugar_network/local/mounts.py
@@ -416,10 +416,11 @@ class RemoteMount(ad.CommandsProcessor, _Mount, _ProxyCommands):
class NodeMount(LocalMount, _ProxyCommands):
- def __init__(self, volume, home_volume):
+ def __init__(self, volume, home_volume, file_syncs=None):
LocalMount.__init__(self, volume)
_ProxyCommands.__init__(self, home_volume)
+ self._file_syncs = file_syncs or {}
self._push_seq = PersistentSequence(
join(volume.root, 'push.sequence'), [1, None])
self._pull_seq = PersistentSequence(
@@ -487,6 +488,9 @@ class NodeMount(LocalMount, _ProxyCommands):
seqno=self.volume.seqno.value,
api_url=local.api_url.value) as packet:
if session_is_new:
+ for directory, sync in self._file_syncs.items():
+ packet.push(cmd='files_pull', directory=directory,
+ sequence=sync.sequence)
packet.push(cmd='sn_pull', sequence=self._pull_seq)
_logger.debug('Generating %r PUSH packet to %r',
@@ -544,19 +548,19 @@ class NodeMount(LocalMount, _ProxyCommands):
for record in packet.records():
cmd = record.get('cmd')
-
if cmd == 'sn_push':
self.volume.merge(record, increment_seqno=False)
-
- elif cmd == 'sn_commit' and from_master:
- _logger.debug('Processing %r COMMIT from %r', record, packet)
- self._pull_seq.exclude(record['sequence'])
-
- elif cmd == 'sn_ack' and from_master and \
- record['dst'] == self._node_guid:
- _logger.debug('Processing %r ACK from %r', record, packet)
- self._push_seq.exclude(record['sequence'])
- self._pull_seq.exclude(record['merged'])
- to_push_seq.exclude(record['sequence'])
- self.volume.seqno.next()
- self.volume.seqno.commit()
+ elif from_master:
+ if cmd == 'sn_commit':
+ _logger.debug('Processing %r COMMIT from %r',
+ record, packet)
+ self._pull_seq.exclude(record['sequence'])
+ elif cmd == 'sn_ack' and record['dst'] == self._node_guid:
+ _logger.debug('Processing %r ACK from %r', record, packet)
+ self._push_seq.exclude(record['sequence'])
+ self._pull_seq.exclude(record['merged'])
+ to_push_seq.exclude(record['sequence'])
+ self.volume.seqno.next()
+ self.volume.seqno.commit()
+ elif record.get('directory') in self._file_syncs:
+ self._file_syncs[record['directory']].push(record)
diff --git a/sugar_network/local/mountset.py b/sugar_network/local/mountset.py
index 1ef4f57..819f678 100644
--- a/sugar_network/local/mountset.py
+++ b/sugar_network/local/mountset.py
@@ -26,6 +26,7 @@ from sugar_network.toolkit.inotify import Inotify, \
from sugar_network import local, node
from sugar_network.toolkit import zeroconf, netlink, network
from sugar_network.toolkit.collection import MutableStack
+from sugar_network.toolkit.files_sync import Leechers
from sugar_network.local.mounts import LocalMount, NodeMount
from sugar_network.node.subscribe_socket import SubscribeSocket
from sugar_network.node.commands import NodeCommands
@@ -44,13 +45,18 @@ _logger = logging.getLogger('local.mountset')
class Mountset(dict, ad.CommandsProcessor):
- def __init__(self, home_volume):
+ def __init__(self, home_volume, sync_dirs=None):
dict.__init__(self)
ad.CommandsProcessor.__init__(self)
self.opened = coroutine.Event()
self.home_volume = home_volume
+ if sync_dirs is None:
+ self._file_syncs = {}
+ else:
+ self._file_syncs = Leechers(sync_dirs,
+ join(home_volume.root, 'files'))
self._subscriptions = {}
self._locale = locale.getdefaultlocale()[0].replace('_', '-')
self._jobs = coroutine.Pool()
@@ -244,7 +250,8 @@ class Mountset(dict, ad.CommandsProcessor):
_logger.debug('Found %r server mount', path)
volume, server_mode = self._mount_volume(path)
if server_mode:
- self[path] = NodeMount(volume, self.home_volume)
+ self[path] = NodeMount(volume, self.home_volume,
+ self._file_syncs)
else:
self[path] = LocalMount(volume)
diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py
index c31399a..7774cc6 100644
--- a/sugar_network/node/__init__.py
+++ b/sugar_network/node/__init__.py
@@ -43,9 +43,8 @@ trust_users = Option(
action='store_true', name='trust_users')
data_root = Option(
- 'path to the root directory for placing documents\' ' \
- 'data and indexes',
- default='/var/lib/sugar-network/db', name='data_root')
+ 'path to a directory to place server data',
+ default='/var/lib/sugar-network', name='data_root')
only_commit_events = Option(
'subscribers can be notified only with "commit" events; ' \
@@ -61,6 +60,12 @@ tmpdir = Option(
'if specified, use this directory for temporary files, such files ' \
'might take hunder of megabytes while node synchronizing')
+sync_dirs = Option(
+ 'colon separated list of paths to directories to synchronize with ' \
+ 'master server',
+ type_cast=Option.paths_cast, type_repr=Option.paths_repr,
+ name='sync-dirs')
+
class HTTPStatus(Exception):
diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py
index dd0b343..c9de988 100644
--- a/sugar_network/node/commands.py
+++ b/sugar_network/node/commands.py
@@ -28,6 +28,7 @@ import active_document as ad
from sugar_network import node
from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, \
OutPacket, DiskFull
+from sugar_network.toolkit.files_sync import Seeders
from sugar_network.toolkit.collection import Sequence
from active_toolkit import coroutine, util, enforce
@@ -148,9 +149,10 @@ class NodeCommands(ad.VolumeCommands):
class MasterCommands(NodeCommands):
- def __init__(self, volume, subscriber=None, file_syncs=None):
+ def __init__(self, volume, subscriber=None, sync_dirs=None):
NodeCommands.__init__(self, volume, subscriber)
- self._file_syncs = file_syncs or {}
+ self._file_syncs = Seeders(sync_dirs,
+ join(node.data_root.value, 'files'), volume.seqno)
self._pull_queue = lrucache(_PULL_QUEUE_SIZE,
lambda key, pull: pull.unlink())
@@ -260,11 +262,13 @@ class MasterCommands(NodeCommands):
class _Cookie(dict):
def __init__(self, request=None):
- if request is None:
- dict.__init__(self)
- else:
+ dict.__init__(self)
+
+ if request is not None:
value = self._get_cookie(request, 'sugar_network_sync')
- dict.__init__(self, value or {})
+ for key, seq in (value or {}).items():
+ self[key] = Sequence(seq)
+
self.delay = 0
def include(self, cookie):
@@ -290,8 +294,6 @@ class _Cookie(dict):
seq = self.get(key)
if seq is None:
seq = self[key] = Sequence()
- elif type(seq) is list:
- seq = self[key] = Sequence(seq)
return seq
def _get_cookie(self, request, name):
diff --git a/sugar_network/toolkit/files_sync.py b/sugar_network/toolkit/files_sync.py
index 5448711..271a6ff 100644
--- a/sugar_network/toolkit/files_sync.py
+++ b/sugar_network/toolkit/files_sync.py
@@ -17,10 +17,11 @@ import os
import json
import logging
from bisect import bisect_left
-from os.path import join, exists, relpath, lexists, basename
+from os.path import join, exists, relpath, lexists, basename, dirname
from sugar_network.toolkit.sneakernet import DiskFull
-from sugar_network.toolkit.collection import Sequence
+from sugar_network.toolkit.collection import Sequence, PersistentSequence
+from active_toolkit.sockets import BUFFER_SIZE
from active_toolkit import util, coroutine
@@ -168,13 +169,58 @@ class Seeder(object):
json.dump((self._index, self._stamp), f)
+class Seeders(dict):
+
+ def __init__(self, sync_dirs, index_root, seqno):
+ dict.__init__(self)
+
+ if not exists(index_root):
+ os.makedirs(index_root)
+
+ for path in sync_dirs or []:
+ name = basename(path)
+ self[name] = Seeder(path, join(index_root, name), seqno)
+
+
class Leecher(object):
def __init__(self, files_path, sequence_path):
- pass
+ self._files_path = files_path.rstrip(os.sep)
+ self.sequence = PersistentSequence(sequence_path, [1, None])
- def push(self, packet):
- pass
+ if not exists(self._files_path):
+ os.makedirs(self._files_path)
- def pull(self):
- pass
+ def push(self, record):
+ cmd = record.get('cmd')
+ if cmd == 'files_push':
+ blob = record['blob']
+ path = join(self._files_path, record['path'])
+ if not exists(dirname(path)):
+ os.makedirs(dirname(path))
+ with util.new_file(path) as f:
+ while True:
+ chunk = blob.read(BUFFER_SIZE)
+ if not chunk:
+ break
+ f.write(chunk)
+ elif cmd == 'files_delete':
+ path = join(self._files_path, record['path'])
+ if exists(path):
+ os.unlink(path)
+ elif cmd == 'files_commit':
+ self.sequence.exclude(record['sequence'])
+ self.sequence.commit()
+
+
+class Leechers(dict):
+
+ def __init__(self, sync_dirs, sequences_root):
+ dict.__init__(self)
+
+ if not exists(sequences_root):
+ os.makedirs(sequences_root)
+
+ for path in sync_dirs or []:
+ name = basename(path)
+ self[name] = Leecher(path, join(sequences_root, name))
diff --git a/tests/__init__.py b/tests/__init__.py
index c8b08d9..c3f0b40 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -73,6 +73,7 @@ class Test(unittest.TestCase):
node.find_limit.value = 1024
node.tmpdir.value = tmpdir + '/tmp'
node.only_commit_events.value = False
+ node.data_root.value = tmpdir
ad.index_write_queue.value = 10
local.local_root.value = tmpdir
local.activity_dirs.value = [tmpdir + '/Activities']
diff --git a/tests/integration/sync.py b/tests/integration/sync.py
index 7905911..a8dfafd 100755
--- a/tests/integration/sync.py
+++ b/tests/integration/sync.py
@@ -6,6 +6,7 @@ import shutil
import signal
from cStringIO import StringIO
from contextlib import contextmanager
+from os.path import exists
from __init__ import tests
@@ -30,13 +31,16 @@ class SyncTest(tests.Test):
'sugar-network-server', '--port=8100', '--subscribe-port=8101',
'--data-root=master/db', '--index-flush-threshold=1024',
'--index-flush-timeout=3', '--only-commit-events',
- '--tmpdir=tmp', '-DDDF', 'start',
+ '--tmpdir=tmp', '--sync-dirs=master/files/1:master/files/2',
+ '-DDDF', 'start',
])
self.node_pid = self.popen([
'sugar-network-service', '--port=8200', '--subscribe-port=8201',
'--activity-dirs=node/Activities', '--local-root=node',
'--mounts-root=mnt', '--server-mode', '--tmpdir=tmp',
- '--api-url=http://localhost:8100', '-DDDF', 'start',
+ '--api-url=http://localhost:8100',
+ '--sync-dirs=node/files/1:node/files/2',
+ '-DDD', 'debug',
])
coroutine.sleep(1)
@@ -50,6 +54,10 @@ class SyncTest(tests.Test):
tests.Test.tearDown(self)
def test_Sneakernet(self):
+ # Create shared files on master
+ self.touch(('master/files/1/1', '1'))
+ self.touch(('master/files/2/2', '2'))
+
# Create initial data on master
with Client('/') as client:
context = client.Context(type='activity', title='title_1', summary='summary', description='description')
@@ -123,6 +131,9 @@ class SyncTest(tests.Test):
]),
sorted([(i['guid'], i['title'], i.get_blob('preview').read()) for i in client.Context.cursor(reply=['guid', 'title'])]))
+ self.assertEqual('1', file('node/files/1/1').read())
+ self.assertEqual('2', file('node/files/2/2').read())
+
def wait_for_events(self, *events):
events = list(events)
connected = coroutine.Event()
diff --git a/tests/units/files_sync.py b/tests/units/files_sync.py
index 14560f2..95cc33f 100755
--- a/tests/units/files_sync.py
+++ b/tests/units/files_sync.py
@@ -11,8 +11,8 @@ from __init__ import tests
import active_document as ad
from sugar_network.toolkit.collection import Sequence
-from sugar_network.toolkit.files_sync import Seeder
-from sugar_network.toolkit.sneakernet import OutBufferPacket, InPacket, DiskFull
+from sugar_network.toolkit.files_sync import Seeder, Leecher
+from sugar_network.toolkit.sneakernet import OutBufferPacket, InPacket, DiskFull, OutFilePacket
CHUNK = 100000
@@ -320,6 +320,74 @@ class FilesSyncTest(tests.Test):
]),
read_records(out_packet))
+ def test_Leecher_push(self):
+ seqno = ad.Seqno('seqno')
+ seeder = Seeder('src/files', 'src/index', seqno)
+ leecher = Leecher('dst/files', 'dst/sequence')
+
+ self.touch(('src/files/1', '1'))
+ self.touch(('src/files/2/3', '3'))
+ self.touch(('src/files/4/5/6', '6'))
+ self.utime('src/files', 1)
+ os.utime('src/files', (1, 1))
+
+ with OutFilePacket('.') as packet:
+ seeder.pull(Sequence([[1, None]]), packet)
+ self.assertEqual(3, seqno.value)
+ for i in InPacket(packet.path):
+ leecher.push(i)
+
+ self.assertEqual(
+ '[[4, null]]',
+ file('dst/sequence').read())
+ self.assertEqual(
+ '1',
+ file('dst/files/1').read())
+ self.assertEqual(
+ '3',
+ file('dst/files/2/3').read())
+ self.assertEqual(
+ '6',
+ file('dst/files/4/5/6').read())
+
+ os.unlink('src/files/2/3')
+ os.utime('src/files', (2, 2))
+
+ with OutFilePacket('.') as packet:
+ seeder.pull(Sequence([[4, None]]), packet)
+ self.assertEqual(4, seqno.value)
+ for i in InPacket(packet.path):
+ leecher.push(i)
+
+ self.assertEqual(
+ '[[5, null]]',
+ file('dst/sequence').read())
+ assert exists('dst/files/1')
+ assert not exists('dst/files/2/3')
+ assert exists('dst/files/4/5/6')
+
+ os.unlink('src/files/1')
+ self.touch(('src/files/2/3', 'new_3'))
+ os.unlink('src/files/4/5/6')
+ self.utime('src/files', 3)
+ os.utime('src/files', (3, 3))
+
+ with OutFilePacket('.') as packet:
+ seeder.pull(Sequence([[5, None]]), packet)
+ self.assertEqual(7, seqno.value)
+ for i in InPacket(packet.path):
+ leecher.push(i)
+
+ self.assertEqual(
+ '[[8, null]]',
+ file('dst/sequence').read())
+ assert not exists('dst/files/1')
+ assert exists('dst/files/2/3')
+ assert not exists('dst/files/4/5/6')
+ self.assertEqual(
+ 'new_3',
+ file('dst/files/2/3').read())
+
def read_records(in_packet):
records = []
diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py
index 611e495..4abd3a4 100755
--- a/tests/units/sync_master.py
+++ b/tests/units/sync_master.py
@@ -573,7 +573,7 @@ class SyncMasterTest(tests.Test):
def test_pull_ProcessFilePulls(self):
seqno = ad.Seqno('seqno')
- master = MasterCommands('master', file_syncs={'files': Seeder('files', 'index', seqno)})
+ master = MasterCommands('master', sync_dirs=['files'])
request = Request()
response = ad.Response()