Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2013-02-22 17:26:45 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2013-02-22 17:26:45 (GMT)
commitcc10d44687899215c960cb4622a37765a89185c1 (patch)
tree2938cf2b47b64e27770bb31984277a312b848cdf
parent798dc4c9768cf9762cd6a5816ecdc929953a730e (diff)
Send files on online sync
-rw-r--r--AUTHORS2
-rw-r--r--README8
-rwxr-xr-xsugar-network-client2
-rw-r--r--sugar_network/client/journal.py9
-rw-r--r--sugar_network/node/__init__.py4
-rw-r--r--sugar_network/node/files.py (renamed from sugar_network/node/files_sync.py)152
-rw-r--r--sugar_network/node/master.py16
-rw-r--r--sugar_network/node/slave.py22
-rw-r--r--sugar_network/node/sneakernet.py9
-rw-r--r--sugar_network/node/sync.py35
-rw-r--r--sugar_network/toolkit/application.py4
-rw-r--r--tests/__init__.py11
-rw-r--r--tests/units/node/__main__.py2
-rwxr-xr-xtests/units/node/files.py304
-rwxr-xr-xtests/units/node/files_sync.py402
-rwxr-xr-xtests/units/node/stats_user.py123
-rwxr-xr-xtests/units/node/sync.py71
-rwxr-xr-xtests/units/node/sync_online.py40
18 files changed, 663 insertions, 553 deletions
diff --git a/AUTHORS b/AUTHORS
index d56f2b6..b41064a 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -4,7 +4,7 @@ Authors
**Original idea**
This work was initially started as an international research initiative
-in late 2012 within the `Sugar Labs Perú`_ in order to attend the needs
+in late 2011 within the `Sugar Labs Perú`_ in order to attend the needs
identified during field work on Perú from 2007 to date.
**Contributors**
diff --git a/README b/README
index 9c88929..b0c7a5d 100644
--- a/README
+++ b/README
@@ -3,10 +3,10 @@
#################
Sugar Network is a `Sugar Labs`_ project. It is a content sharing and
-social activity system for `Sugar Learning Environment`_. The system
-is intended to be a share point for various content (like Sugar applications,
-content created by such applications, books and articles, etc.) created
-and/or supported within the Sugar community. The Sugar Network is being
+social activity, around this content, system for `Sugar Learning Environment`_.
+The system is intended to be a share point for various content (like Sugar
+applications, content created by such applications, books and articles, etc.)
+created and/or supported within the Sugar community. The Sugar Network is being
targeted not only to the Internet auditory but also to primary and secondary
educational institutions including ones that have lack of Internet connectivity
and/or skilled administrative personnel, e.g., rural schools.
diff --git a/sugar-network-client b/sugar-network-client
index ca7e307..5d7c652 100755
--- a/sugar-network-client
+++ b/sugar-network-client
@@ -202,7 +202,7 @@ Option.seek('main', application)
Option.seek('webui', webui)
Option.seek('client', client)
Option.seek('client', [sugar.keyfile, toolkit.tmpdir])
-Option.seek('node', [node.port, slave.sync_dirs])
+Option.seek('node', [node.port, node.files_root])
Option.seek('stats', stats_user)
Option.seek('db', db)
diff --git a/sugar_network/client/journal.py b/sugar_network/client/journal.py
index 783b9ca..9fdfc5e 100644
--- a/sugar_network/client/journal.py
+++ b/sugar_network/client/journal.py
@@ -20,10 +20,11 @@ import uuid
import random
import hashlib
import logging
+from shutil import copyfileobj
from tempfile import NamedTemporaryFile
from sugar_network import db, client
-from sugar_network.toolkit import BUFFER_SIZE, sugar, router, enforce
+from sugar_network.toolkit import sugar, router, enforce
from sugar_network.toolkit.router import Request
@@ -127,11 +128,7 @@ class Commands(object):
if hasattr(data, 'read'):
with NamedTemporaryFile(delete=False) as f:
- while True:
- chunk = data.read(BUFFER_SIZE)
- if not chunk:
- break
- f.write(chunk)
+ copyfileobj(data, f)
data = f.name
transfer_ownership = True
elif isinstance(data, dict):
diff --git a/sugar_network/node/__init__.py b/sugar_network/node/__init__.py
index 486db93..08c8eea 100644
--- a/sugar_network/node/__init__.py
+++ b/sugar_network/node/__init__.py
@@ -54,3 +54,7 @@ static_url = Option(
stats_root = Option(
'path to the root directory for placing stats',
default='/var/lib/sugar-network/stats')
+
+files_root = Option(
+ 'path to a directory to keep files synchronized between nodes',
+ default='/var/lib/sugar-network/files', name='files_root')
diff --git a/sugar_network/node/files_sync.py b/sugar_network/node/files.py
index 5ce9258..55650ff 100644
--- a/sugar_network/node/files_sync.py
+++ b/sugar_network/node/files.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2012 Aleksey Lim
+# Copyright (C) 2012-2013 Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -17,20 +17,44 @@ import os
import json
import logging
from bisect import bisect_left
-from os.path import join, exists, relpath, lexists, basename, dirname
+from shutil import copyfileobj
+from os.path import join, exists, relpath, lexists, dirname
-from sugar_network.node.sneakernet import DiskFull
-from sugar_network.toolkit import BUFFER_SIZE, util, coroutine
+from sugar_network.node.sync import EOF
+from sugar_network.toolkit import util, coroutine
-_logger = logging.getLogger('files_sync')
+_logger = logging.getLogger('node.sync_files')
-class Seeder(object):
+def merge(files_path, packet):
+ files_path = files_path.rstrip(os.sep)
+ if not exists(files_path):
+ os.makedirs(files_path)
+ commit_seq = None
+
+ for record in packet:
+ op = record.get('op')
+ if op == 'update':
+ path = join(files_path, record['path'])
+ if not exists(dirname(path)):
+ os.makedirs(dirname(path))
+ with util.new_file(path) as f:
+ copyfileobj(record['blob'], f)
+ elif op == 'delete':
+ path = join(files_path, record['path'])
+ if lexists(path):
+ os.unlink(path)
+ elif op == 'commit':
+ commit_seq = record['sequence']
+
+ return commit_seq
+
+
+class Index(object):
def __init__(self, files_path, index_path, seqno):
self._files_path = files_path.rstrip(os.sep)
- self._directory = basename(self._files_path)
self._index_path = index_path
self._seqno = seqno
self._index = []
@@ -44,71 +68,56 @@ class Seeder(object):
if not exists(self._files_path):
os.makedirs(self._files_path)
- def pull(self, in_seq, packet):
- # Below calls will mutate `self._index` and trigger coroutine switches.
- # Thus, avoid changing `self._index` by different coroutines.
+ def sync(self):
+ with self._mutex:
+ return self._sync()
+
+ def diff(self, in_seq):
+ # Below calls will trigger coroutine switches, thius,
+ # avoid changing `self._index` by different coroutines.
with self._mutex:
self._sync()
- orig_seq = util.Sequence(in_seq)
out_seq = util.Sequence()
-
try:
- self._pull(in_seq, packet, out_seq, False)
- except DiskFull:
+ for record in self._diff(in_seq, out_seq):
+ if (yield record) is EOF:
+ raise StopIteration()
+ finally:
if out_seq:
- packet.push(force=True, cmd='files_commit',
- directory=self._directory, sequence=out_seq)
- raise
-
- if out_seq:
- orig_seq.floor(out_seq.last)
- packet.push(force=True, cmd='files_commit',
- directory=self._directory, sequence=orig_seq)
-
- def pending(self, in_seq):
- with self._mutex:
- self._sync()
- return self._pull(in_seq, None, None, True)
+ # We processed all files till `out_seq.last`, thus,
+ # collapse the sequence to avoid possible holes
+ out_seq = [[out_seq.first, out_seq.last]]
+ yield {'op': 'commit', 'sequence': out_seq}
- def _pull(self, in_seq, packet, out_seq, dry_run):
+ def _diff(self, in_seq, out_seq):
_logger.debug('Start sync: in_seq=%r', in_seq)
files = 0
deleted = 0
pos = 0
- for start, end in in_seq[:]:
+ for start, end in in_seq:
pos = bisect_left(self._index, [start, None, None], pos)
for pos, (seqno, path, mtime) in enumerate(self._index[pos:]):
if end is not None and seqno > end:
break
- if dry_run:
- return True
-
coroutine.dispatch()
if mtime < 0:
- packet.push(arcname=join('files', path),
- cmd='files_delete', directory=self._directory,
- path=path)
+ yield {'op': 'delete', 'path': path}
deleted += 1
else:
- packet.push_file(join(self._files_path, path),
- arcname=join('files', path), cmd='files_push',
- directory=self._directory, path=path)
- in_seq.exclude(seqno, seqno)
+ yield {'op': 'update', 'path': path,
+ 'blob': join(self._files_path, path)}
out_seq.include(start, seqno)
start = seqno
files += 1
- if dry_run:
- return False
-
_logger.debug('Stop sync: in_seq=%r out_seq=%r updates=%r deletes=%r',
in_seq, out_seq, files, deleted)
def _sync(self):
if os.stat(self._files_path).st_mtime <= self._stamp:
- return
+ return False
new_files = set()
updates = 0
@@ -166,59 +175,4 @@ class Seeder(object):
with util.new_file(self._index_path) as f:
json.dump((self._index, self._stamp), f)
-
-class Seeders(dict):
-
- def __init__(self, sync_dirs, index_root, seqno):
- dict.__init__(self)
-
- if not exists(index_root):
- os.makedirs(index_root)
-
- for path in sync_dirs or []:
- name = basename(path)
- self[name] = Seeder(path, join(index_root, name + '.files'), seqno)
-
-
-class Leecher(object):
-
- def __init__(self, files_path, sequence_path):
- self._files_path = files_path.rstrip(os.sep)
- self.sequence = util.PersistentSequence(sequence_path, [1, None])
-
- if not exists(self._files_path):
- os.makedirs(self._files_path)
-
- def push(self, record):
- cmd = record.get('cmd')
- if cmd == 'files_push':
- blob = record['blob']
- path = join(self._files_path, record['path'])
- if not exists(dirname(path)):
- os.makedirs(dirname(path))
- with util.new_file(path) as f:
- while True:
- chunk = blob.read(BUFFER_SIZE)
- if not chunk:
- break
- f.write(chunk)
- elif cmd == 'files_delete':
- path = join(self._files_path, record['path'])
- if exists(path):
- os.unlink(path)
- elif cmd == 'files_commit':
- self.sequence.exclude(record['sequence'])
- self.sequence.commit()
-
-
-class Leechers(dict):
-
- def __init__(self, sync_dirs, sequences_root):
- dict.__init__(self)
-
- if not exists(sequences_root):
- os.makedirs(sequences_root)
-
- for path in sync_dirs or []:
- name = basename(path)
- self[name] = Leecher(path, join(sequences_root, name + '.files'))
+ return True
diff --git a/sugar_network/node/master.py b/sugar_network/node/master.py
index 902c2a5..881b5d4 100644
--- a/sugar_network/node/master.py
+++ b/sugar_network/node/master.py
@@ -15,9 +15,10 @@
import logging
from urlparse import urlsplit
+from os.path import join
from sugar_network import db, client
-from sugar_network.node import sync, stats_user
+from sugar_network.node import sync, stats_user, files_root, files
from sugar_network.node.commands import NodeCommands
from sugar_network.toolkit import util
@@ -30,6 +31,11 @@ class MasterCommands(NodeCommands):
def __init__(self, volume):
guid = urlsplit(client.api_url.value).netloc
NodeCommands.__init__(self, True, guid, volume)
+ self._files = None
+
+ if files_root.value:
+ self._files = files.Index(files_root.value,
+ join(volume.root, 'files.index'), volume.seqno)
@db.volume_command(method='POST', cmd='sync',
permissions=db.ACCESS_AUTH)
@@ -38,8 +44,12 @@ class MasterCommands(NodeCommands):
for packet in sync.decode(request.content_stream):
if packet.name == 'pull':
- pull_seq = util.Sequence(packet['sequence'])
- reply.append(('diff', None, sync.diff(self.volume, pull_seq)))
+ seq = util.Sequence(packet['sequence'])
+ reply.append(('diff', None, sync.diff(self.volume, seq)))
+ elif packet.name == 'files_pull':
+ if self._files is not None:
+ seq = util.Sequence(packet['sequence'])
+ reply.append(('files_diff', None, self._files.diff(seq)))
elif packet.name == 'diff':
seq, ack_seq = sync.merge(self.volume, packet)
reply.append(('ack', {'ack': ack_seq, 'sequence': seq}, None))
diff --git a/sugar_network/node/slave.py b/sugar_network/node/slave.py
index f5c3a2c..cbd8598 100644
--- a/sugar_network/node/slave.py
+++ b/sugar_network/node/slave.py
@@ -18,9 +18,9 @@ from os.path import join
from sugar_network import db
from sugar_network.client import Client
-from sugar_network.node import sync, stats_user
+from sugar_network.node import sync, stats_user, files, files_root
from sugar_network.node.commands import NodeCommands
-from sugar_network.toolkit import Option, util
+from sugar_network.toolkit import util
_SYNC_DIRNAME = '.sugar-network-sync'
@@ -28,28 +28,24 @@ _SYNC_DIRNAME = '.sugar-network-sync'
_logger = logging.getLogger('node.slave')
-sync_dirs = Option(
- 'colon separated list of paths to directories to synchronize with '
- 'master server',
- type_cast=Option.paths_cast, type_repr=Option.paths_repr,
- name='sync_dirs')
-
-
class SlaveCommands(NodeCommands):
def __init__(self, guid, volume):
NodeCommands.__init__(self, False, guid, volume)
self._push_seq = util.PersistentSequence(
- join(volume.root, 'push'), [1, None])
+ join(volume.root, 'push.sequence'), [1, None])
self._pull_seq = util.PersistentSequence(
- join(volume.root, 'pull'), [1, None])
+ join(volume.root, 'pull.sequence'), [1, None])
+ self._files_seq = util.PersistentSequence(
+ join(volume.root, 'files.sequence'), [1, None])
@db.volume_command(method='POST', cmd='online_sync',
permissions=db.ACCESS_LOCAL)
def online_sync(self):
push = [('diff', None, sync.diff(self.volume, self._push_seq)),
('pull', {'sequence': self._pull_seq}, None),
+ ('files_pull', {'sequence': self._files_seq}, None),
]
if stats_user.stats_user.value:
push.append(('stats_diff', None, stats_user.diff()))
@@ -69,6 +65,10 @@ class SlaveCommands(NodeCommands):
self._push_seq.commit()
elif packet.name == 'stats_ack':
stats_user.commit(packet['sequence'])
+ elif packet.name == 'files_diff':
+ seq = files.merge(files_root.value, packet)
+ self._files_seq.exclude(seq)
+ self._files_seq.commit()
"""
diff --git a/sugar_network/node/sneakernet.py b/sugar_network/node/sneakernet.py
index 3cd6601..349d2e5 100644
--- a/sugar_network/node/sneakernet.py
+++ b/sugar_network/node/sneakernet.py
@@ -19,12 +19,13 @@ import time
import gzip
import tarfile
import logging
+from shutil import copyfileobj
from cStringIO import StringIO
from contextlib import contextmanager
from os.path import join, exists
from sugar_network import db
-from sugar_network.toolkit import BUFFER_SIZE, util, exception, enforce
+from sugar_network.toolkit import util, exception, enforce
_RESERVED_SIZE = 1024 * 1024
@@ -65,11 +66,7 @@ class InPacket(object):
self._file = util.NamedTemporaryFile()
if hasattr(stream, 'read'):
- while True:
- chunk = stream.read(BUFFER_SIZE)
- if not chunk:
- break
- self._file.write(chunk)
+ copyfileobj(stream, self._file)
else:
for chunk in stream:
self._file.write(chunk)
diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py
index 8bdf764..b2d64c6 100644
--- a/sugar_network/node/sync.py
+++ b/sugar_network/node/sync.py
@@ -13,10 +13,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
import logging
import cPickle as pickle
-from sugar_network.toolkit import util, coroutine, enforce
+from sugar_network.toolkit import BUFFER_SIZE, util, coroutine, enforce
EOF = object()
@@ -39,8 +40,25 @@ def encode(*args):
props = {}
props['packet'] = packet
yield pickle.dumps(props)
+
for record in content or []:
+ blob = record.get('blob')
+ if blob:
+ del record['blob']
+ record['blob_size'] = os.stat(blob).st_size
yield pickle.dumps(record)
+
+ if blob:
+ sent_bytes = 0
+ with file(blob, 'rb') as f:
+ while True:
+ chunk = f.read(BUFFER_SIZE)
+ if not chunk:
+ break
+ yield chunk
+ sent_bytes += len(chunk)
+ enforce(sent_bytes == record['blob_size'])
+
yield pickle.dumps({'packet': 'last'})
@@ -165,6 +183,9 @@ class _PacketsIterator(object):
self._props = record
self._shift = False
break
+ blob_size = record.get('blob_size')
+ if blob_size:
+ record['blob'] = _Blob(self._stream, blob_size)
yield record
def __enter__(self):
@@ -172,3 +193,15 @@ class _PacketsIterator(object):
def __exit__(self, exc_type, exc_value, traceback):
pass
+
+
+class _Blob(object):
+
+ def __init__(self, stream, size):
+ self._stream = stream
+ self._size_to_read = size
+
+ def read(self, size=BUFFER_SIZE):
+ chunk = self._stream.read(min(size, self._size_to_read))
+ self._size_to_read -= len(chunk)
+ return chunk
diff --git a/sugar_network/toolkit/application.py b/sugar_network/toolkit/application.py
index 5187730..520583c 100644
--- a/sugar_network/toolkit/application.py
+++ b/sugar_network/toolkit/application.py
@@ -49,11 +49,11 @@ no_hints = Option(
name='no-hints')
logdir = Option(
- 'path to the directory to place log files',
+ 'path to a directory to place log files',
name='logdir', default='/var/log')
rundir = Option(
- 'path to the directory to place pid files',
+ 'path to a directory to place pid files',
name='rundir')
diff --git a/tests/__init__.py b/tests/__init__.py
index 6c21625..947a966 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -82,7 +82,7 @@ class Test(unittest.TestCase):
node.find_limit.value = 1024
node.data_root.value = tmpdir
node.static_url.value = None
- slave.sync_dirs.value = []
+ node.files_root.value = None
db.index_write_queue.value = 10
client.local_root.value = tmpdir
client.activity_dirs.value = [tmpdir + '/Activities']
@@ -181,6 +181,7 @@ class Test(unittest.TestCase):
setattr(mod, name, new_handler)
def touch(self, *files):
+ utime = None
for i in files:
if isinstance(i, basestring):
if i.endswith(os.sep):
@@ -191,7 +192,10 @@ class Test(unittest.TestCase):
else:
content = i
else:
- path, content = i
+ if len(i) == 2:
+ path, content = i
+ else:
+ path, content, utime = i
if isinstance(content, list):
content = '\n'.join(content)
path = join(tmpdir, path)
@@ -205,6 +209,9 @@ class Test(unittest.TestCase):
f.write(str(content))
f.close()
+ if utime:
+ os.utime(path, (utime, utime))
+
def utime(self, path, ts):
if isfile(path):
os.utime(path, (ts, ts))
diff --git a/tests/units/node/__main__.py b/tests/units/node/__main__.py
index cd3e3d6..13b3f79 100644
--- a/tests/units/node/__main__.py
+++ b/tests/units/node/__main__.py
@@ -3,7 +3,7 @@
from __init__ import tests
from auth import *
-#from files_sync import *
+from files import *
from node import *
from obs import *
from sneakernet import *
diff --git a/tests/units/node/files.py b/tests/units/node/files.py
new file mode 100755
index 0000000..5698683
--- /dev/null
+++ b/tests/units/node/files.py
@@ -0,0 +1,304 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+import os
+import time
+import json
+from glob import glob
+from os.path import exists
+from cStringIO import StringIO
+
+from __init__ import tests
+
+from sugar_network import db
+from sugar_network.toolkit import util
+from sugar_network.node import files
+from sugar_network.node.sync import EOF
+from sugar_network.node.sneakernet import InPacket, DiskFull, OutFilePacket
+
+
+class FilesTest(tests.Test):
+
+ def setUp(self):
+ tests.Test.setUp(self)
+ self.uuid = 0
+ self.override(db, 'uuid', self.next_uuid)
+
+ def next_uuid(self):
+ self.uuid += 1
+ return str(self.uuid)
+
+ def test_Index_Populate(self):
+ seqno = util.Seqno('seqno')
+ seeder = files.Index('files', 'index', seqno)
+
+ os.utime('files', (1, 1))
+ assert seeder.sync()
+
+ assert not seeder.sync()
+ in_seq = util.Sequence([[1, None]])
+ self.assertEqual([], [i for i in seeder.diff(in_seq)])
+ self.assertEqual(0, seqno.value)
+ assert not exists('index')
+
+ self.touch(('files/1', '1'))
+ self.touch(('files/2/3', '3'))
+ self.touch(('files/4/5/6', '6'))
+ self.utime('files', 1)
+ os.utime('files', (1, 1))
+
+ assert not seeder.sync()
+ in_seq = util.Sequence([[1, None]])
+ self.assertEqual([], [i for i in seeder.diff(in_seq)])
+ self.assertEqual(0, seqno.value)
+ assert not exists('index')
+
+ self.utime('files', 2)
+ os.utime('files', (2, 2))
+
+ assert seeder.sync()
+ in_seq = util.Sequence([[1, None]])
+ self.assertEqual(sorted([
+ {'op': 'update', 'blob': 'files/1', 'path': '1'},
+ {'op': 'update', 'blob': 'files/2/3', 'path': '2/3'},
+ {'op': 'update', 'blob': 'files/4/5/6', 'path': '4/5/6'},
+ {'op': 'commit', 'sequence': [[1, 3]]},
+ ]),
+ sorted([i for i in seeder.diff(in_seq)]))
+ self.assertEqual(3, seqno.value)
+ assert exists('index')
+ self.assertEqual(
+ [[
+ [1, '1', os.stat('files/1').st_mtime],
+ [2, '2/3', os.stat('files/2/3').st_mtime],
+ [3, '4/5/6', os.stat('files/4/5/6').st_mtime],
+ ],
+ os.stat('files').st_mtime],
+ json.load(file('index')))
+
+ assert not seeder.sync()
+ in_seq = util.Sequence([[4, None]])
+ self.assertEqual([], [i for i in seeder.diff(in_seq)])
+ self.assertEqual(3, seqno.value)
+
+ def test_Index_SelectiveDiff(self):
+ seqno = util.Seqno('seqno')
+ seeder = files.Index('files', 'index', seqno)
+
+ self.touch(('files/1', '1'))
+ self.touch(('files/2', '2'))
+ self.touch(('files/3', '3'))
+ self.touch(('files/4', '4'))
+ self.touch(('files/5', '5'))
+ self.utime('files', 1)
+
+ in_seq = util.Sequence([[2, 2], [4, 10], [20, None]])
+ self.assertEqual(sorted([
+ {'op': 'update', 'blob': 'files/2', 'path': '2'},
+ {'op': 'update', 'blob': 'files/4', 'path': '4'},
+ {'op': 'update', 'blob': 'files/5', 'path': '5'},
+ {'op': 'commit', 'sequence': [[2, 5]]},
+ ]),
+ sorted([i for i in seeder.diff(in_seq)]))
+
+ def test_Index_PartialDiff(self):
+ seqno = util.Seqno('seqno')
+ seeder = files.Index('files', 'index', seqno)
+
+ self.touch('files/1')
+ self.touch('files/2')
+ self.touch('files/3')
+ self.utime('files', 1)
+
+ in_seq = util.Sequence([[1, None]])
+ diff = seeder.diff(in_seq)
+ self.assertEqual(
+ {'op': 'update', 'blob': 'files/1', 'path': '1'},
+ diff.send(None))
+ self.assertRaises(StopIteration, diff.send, EOF)
+
+ diff = seeder.diff(in_seq)
+ self.assertEqual(
+ {'op': 'update', 'blob': 'files/1', 'path': '1'},
+ diff.send(None))
+ self.assertEqual(
+ {'op': 'update', 'blob': 'files/2', 'path': '2'},
+ diff.send(None))
+ self.assertEqual(
+ {'op': 'commit', 'sequence': [[1, 1]]},
+ diff.send(EOF))
+ self.assertRaises(StopIteration, diff.next)
+
+ def test_Index_DiffUpdatedFiles(self):
+ seqno = util.Seqno('seqno')
+ seeder = files.Index('files', 'index', seqno)
+
+ self.touch(('files/1', '1'))
+ self.touch(('files/2', '2'))
+ self.touch(('files/3', '3'))
+ self.utime('files', 1)
+ os.utime('files', (1, 1))
+
+ for __ in seeder.diff(util.Sequence([[1, None]])):
+ pass
+ self.assertEqual(3, seqno.value)
+
+ os.utime('files/2', (2, 2))
+
+ self.assertEqual([], [i for i in seeder.diff(util.Sequence([[4, None]]))])
+ self.assertEqual(3, seqno.value)
+
+ os.utime('files', (3, 3))
+
+ self.assertEqual(sorted([
+ {'op': 'update', 'blob': 'files/2', 'path': '2'},
+ {'op': 'commit', 'sequence': [[4, 4]]},
+ ]),
+ sorted([i for i in seeder.diff(util.Sequence([[4, None]]))]))
+ self.assertEqual(4, seqno.value)
+
+ os.utime('files/1', (4, 4))
+ os.utime('files/3', (4, 4))
+ os.utime('files', (4, 4))
+
+ self.assertEqual(sorted([
+ {'op': 'update', 'blob': 'files/1', 'path': '1'},
+ {'op': 'update', 'blob': 'files/3', 'path': '3'},
+ {'op': 'commit', 'sequence': [[5, 6]]},
+ ]),
+ sorted([i for i in seeder.diff(util.Sequence([[5, None]]))]))
+ self.assertEqual(6, seqno.value)
+
+ self.assertEqual(sorted([
+ {'op': 'update', 'blob': 'files/1', 'path': '1'},
+ {'op': 'update', 'blob': 'files/2', 'path': '2'},
+ {'op': 'update', 'blob': 'files/3', 'path': '3'},
+ {'op': 'commit', 'sequence': [[1, 6]]},
+ ]),
+ sorted([i for i in seeder.diff(util.Sequence([[1, None]]))]))
+ self.assertEqual(6, seqno.value)
+
+ def test_Index_DiffCreatedFiles(self):
+ seqno = util.Seqno('seqno')
+ seeder = files.Index('files', 'index', seqno)
+
+ self.touch(('files/1', '1'))
+ self.touch(('files/2', '2'))
+ self.touch(('files/3', '3'))
+ self.utime('files', 1)
+ os.utime('files', (1, 1))
+
+ for __ in seeder.diff(util.Sequence([[1, None]])):
+ pass
+ self.assertEqual(3, seqno.value)
+
+ self.touch(('files/4', '4'))
+ os.utime('files/4', (2, 2))
+ os.utime('files', (1, 1))
+
+ self.assertEqual([], [i for i in seeder.diff(util.Sequence([[4, None]]))])
+ self.assertEqual(3, seqno.value)
+
+ os.utime('files/4', (2, 2))
+ os.utime('files', (2, 2))
+
+ self.assertEqual(sorted([
+ {'op': 'update', 'blob': 'files/4', 'path': '4'},
+ {'op': 'commit', 'sequence': [[4, 4]]},
+ ]),
+ sorted([i for i in seeder.diff(util.Sequence([[4, None]]))]))
+ self.assertEqual(4, seqno.value)
+
+ self.touch(('files/5', '5'))
+ os.utime('files/5', (3, 3))
+ self.touch(('files/6', '6'))
+ os.utime('files/6', (3, 3))
+ os.utime('files', (3, 3))
+
+ self.assertEqual(sorted([
+ {'op': 'update', 'blob': 'files/5', 'path': '5'},
+ {'op': 'update', 'blob': 'files/6', 'path': '6'},
+ {'op': 'commit', 'sequence': [[5, 6]]},
+ ]),
+ sorted([i for i in seeder.diff(util.Sequence([[5, None]]))]))
+ self.assertEqual(6, seqno.value)
+
+ def test_Index_DiffDeletedFiles(self):
+ seqno = util.Seqno('seqno')
+ seeder = files.Index('files', 'index', seqno)
+
+ self.touch(('files/1', '1'))
+ self.touch(('files/2', '2'))
+ self.touch(('files/3', '3'))
+ self.utime('files', 1)
+ os.utime('files', (1, 1))
+
+ for __ in seeder.diff(util.Sequence([[1, None]])):
+ pass
+ self.assertEqual(3, seqno.value)
+
+ os.unlink('files/2')
+ os.utime('files', (2, 2))
+
+ assert seeder.sync()
+ self.assertEqual(sorted([
+ {'op': 'update', 'blob': 'files/1', 'path': '1'},
+ {'op': 'update', 'blob': 'files/3', 'path': '3'},
+ {'op': 'delete', 'path': '2'},
+ {'op': 'commit', 'sequence': [[1, 4]]},
+ ]),
+ sorted([i for i in seeder.diff(util.Sequence([[1, None]]))]))
+ self.assertEqual(4, seqno.value)
+
+ os.unlink('files/1')
+ os.unlink('files/3')
+ os.utime('files', (3, 3))
+
+ assert seeder.sync()
+ self.assertEqual(sorted([
+ {'op': 'delete', 'path': '1'},
+ {'op': 'delete', 'path': '2'},
+ {'op': 'delete', 'path': '3'},
+ {'op': 'commit', 'sequence': [[1, 6]]},
+ ]),
+ sorted([i for i in seeder.diff(util.Sequence([[1, None]]))]))
+ self.assertEqual(6, seqno.value)
+
+ assert not seeder.sync()
+ self.assertEqual(sorted([
+ {'op': 'delete', 'path': '1'},
+ {'op': 'delete', 'path': '2'},
+ {'op': 'delete', 'path': '3'},
+ {'op': 'commit', 'sequence': [[1, 6]]},
+ ]),
+ sorted([i for i in seeder.diff(util.Sequence([[1, None]]))]))
+ self.assertEqual(6, seqno.value)
+
+ def test_merge_Updated(self):
+ self.assertEqual('commit-sequence', files.merge('dst', [
+ {'op': 'update', 'path': '1', 'blob': StringIO('1')},
+ {'op': 'update', 'path': '2/2', 'blob': StringIO('22')},
+ {'op': 'update', 'path': '3/3/3', 'blob': StringIO('333')},
+ {'op': 'commit', 'sequence': 'commit-sequence'},
+ ]))
+ self.assertEqual('1', file('dst/1').read())
+ self.assertEqual('22', file('dst/2/2').read())
+ self.assertEqual('333', file('dst/3/3/3').read())
+
+ def test_merge_Deleted(self):
+ self.touch('dst/1')
+ self.touch('dst/2/2')
+
+ self.assertEqual('commit-sequence', files.merge('dst', [
+ {'op': 'delete', 'path': '1'},
+ {'op': 'delete', 'path': '2/2'},
+ {'op': 'delete', 'path': '3/3/3'},
+ {'op': 'commit', 'sequence': 'commit-sequence'},
+ ]))
+ assert not exists('dst/1')
+ assert not exists('dst/2/2')
+ assert not exists('dst/3/3/3')
+
+
+if __name__ == '__main__':
+ tests.main()
diff --git a/tests/units/node/files_sync.py b/tests/units/node/files_sync.py
deleted file mode 100755
index eaeb0c0..0000000
--- a/tests/units/node/files_sync.py
+++ /dev/null
@@ -1,402 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import os
-import time
-import json
-from glob import glob
-from os.path import exists
-
-from __init__ import tests
-
-from sugar_network import db
-from sugar_network.toolkit import util
-from sugar_network.toolkit.files_sync import Seeder, Leecher
-from sugar_network.toolkit.sneakernet import OutBufferPacket, InPacket, DiskFull, OutFilePacket
-
-
-CHUNK = 100000
-
-
-class FilesSyncTest(tests.Test):
-
- def setUp(self):
- tests.Test.setUp(self)
- self.uuid = 0
- self.override(db, 'uuid', self.next_uuid)
-
- def next_uuid(self):
- self.uuid += 1
- return str(self.uuid)
-
- def test_Seeder_pull_Populate(self):
- seqno = util.Seqno('seqno')
- seeder = Seeder('files', 'index', seqno)
-
- os.utime('files', (1, 1))
-
- assert not seeder.pending(util.Sequence([[1, None]]))
- packet = OutBufferPacket()
- in_seq = util.Sequence([[1, None]])
- seeder.pull(in_seq, packet)
- self.assertEqual([[1, None]], in_seq)
- self.assertEqual(0, seqno.value)
- self.assertEqual(True, packet.empty)
- assert not exists('index')
-
- self.touch(('files/1', '1'))
- self.touch(('files/2/3', '3'))
- self.touch(('files/4/5/6', '6'))
- self.utime('files', 1)
- os.utime('files', (1, 1))
-
- assert not seeder.pending(util.Sequence([[1, None]]))
- in_seq = util.Sequence([[1, None]])
- seeder.pull(in_seq, packet)
- self.assertEqual([[1, None]], in_seq)
- self.assertEqual(0, seqno.value)
- self.assertEqual(True, packet.empty)
- assert not exists('index')
-
- self.utime('files', 2)
- os.utime('files', (2, 2))
-
- assert seeder.pending(util.Sequence([[1, None]]))
- in_seq = util.Sequence([[1, None]])
- seeder.pull(in_seq, packet)
- self.assertEqual([[4, None]], in_seq)
- self.assertEqual(3, seqno.value)
- self.assertEqual(False, packet.empty)
- assert exists('index')
- self.assertEqual(
- [[
- [1, '1', os.stat('files/1').st_mtime],
- [2, '2/3', os.stat('files/2/3').st_mtime],
- [3, '4/5/6', os.stat('files/4/5/6').st_mtime],
- ],
- os.stat('files').st_mtime],
- json.load(file('index')))
- self.assertEqual(
- sorted([
- {'filename': '1.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '1', 'content_type': 'blob', 'path': '1'},
- {'filename': '1.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '3', 'content_type': 'blob', 'path': '2/3'},
- {'filename': '1.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '6', 'content_type': 'blob', 'path': '4/5/6'},
- {'filename': '1.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[1, 3]]},
- ]),
- read_records(packet))
-
- assert not seeder.pending(util.Sequence([[4, None]]))
- packet = OutBufferPacket()
- in_seq = util.Sequence([[4, None]])
- seeder.pull(in_seq, packet)
- self.assertEqual([[4, None]], in_seq)
- self.assertEqual(3, seqno.value)
- self.assertEqual(True, packet.empty)
-
- def test_Seeder_pull_NotFull(self):
- seqno = util.Seqno('seqno')
- seeder = Seeder('files', 'index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.touch(('files/4', '4'))
- self.touch(('files/5', '5'))
- self.utime('files', 1)
-
- out_packet = OutBufferPacket()
- in_seq = util.Sequence([[2, 2], [4, 10], [20, None]])
- seeder.pull(in_seq, out_packet)
- self.assertEqual([[6, 10], [20,None]], in_seq)
- self.assertEqual(
- sorted([
- {'filename': '1.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '2', 'content_type': 'blob', 'path': '2'},
- {'filename': '1.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '4', 'content_type': 'blob', 'path': '4'},
- {'filename': '1.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '5', 'content_type': 'blob', 'path': '5'},
- {'filename': '1.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[2, 2], [4, 5]]},
- ]),
- read_records(out_packet))
-
- def test_Seeder_pull_DiskFull(self):
- seqno = util.Seqno('seqno')
- seeder = Seeder('files', 'index', seqno)
-
- self.touch(('files/1', '*' * CHUNK))
- self.touch(('files/2', '*' * CHUNK))
- self.touch(('files/3', '*' * CHUNK))
- self.utime('files', 1)
-
- out_packet = OutBufferPacket(limit=CHUNK * 2.5)
- in_seq = util.Sequence([[1, None]])
- try:
- seeder.pull(in_seq, out_packet)
- assert False
- except DiskFull:
- pass
- self.assertEqual([[3, None]], in_seq)
- self.assertEqual(
- sorted([
- {'filename': '1.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '*' * CHUNK, 'content_type': 'blob', 'path': '1'},
- {'filename': '1.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '*' * CHUNK, 'content_type': 'blob', 'path': '2'},
- {'filename': '1.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[1, 2]]},
- ]),
- read_records(out_packet))
-
- def test_Seeder_pull_UpdateFiles(self):
- seqno = util.Seqno('seqno')
- seeder = Seeder('files', 'index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
- os.utime('files', (1, 1))
-
- out_packet = OutBufferPacket()
- seeder.pull(util.Sequence([[1, None]]), out_packet)
- self.assertEqual(3, seqno.value)
-
- os.utime('files/2', (2, 2))
-
- out_packet = OutBufferPacket()
- seeder.pull(util.Sequence([[4, None]]), out_packet)
- self.assertEqual(3, seqno.value)
-
- os.utime('files', (3, 3))
-
- out_packet = OutBufferPacket()
- seeder.pull(util.Sequence([[4, None]]), out_packet)
- self.assertEqual(4, seqno.value)
- self.assertEqual(
- sorted([
- {'filename': '3.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '2', 'content_type': 'blob', 'path': '2'},
- {'filename': '3.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[4, 4]]},
- ]),
- read_records(out_packet))
-
- os.utime('files/1', (4, 4))
- os.utime('files/3', (4, 4))
- os.utime('files', (4, 4))
-
- out_packet = OutBufferPacket()
- seeder.pull(util.Sequence([[5, None]]), out_packet)
- self.assertEqual(6, seqno.value)
- self.assertEqual(
- sorted([
- {'filename': '4.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '1', 'content_type': 'blob', 'path': '1'},
- {'filename': '4.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '3', 'content_type': 'blob', 'path': '3'},
- {'filename': '4.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[5, 6]]},
- ]),
- read_records(out_packet))
-
- out_packet = OutBufferPacket()
- seeder.pull(util.Sequence([[1, None]]), out_packet)
- self.assertEqual(6, seqno.value)
- self.assertEqual(
- sorted([
- {'filename': '5.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '1', 'content_type': 'blob', 'path': '1'},
- {'filename': '5.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '2', 'content_type': 'blob', 'path': '2'},
- {'filename': '5.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '3', 'content_type': 'blob', 'path': '3'},
- {'filename': '5.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[1, 6]]},
- ]),
- read_records(out_packet))
-
- def test_Seeder_pull_CreateFiles(self):
- seqno = util.Seqno('seqno')
- seeder = Seeder('files', 'index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
- os.utime('files', (1, 1))
-
- out_packet = OutBufferPacket()
- seeder.pull(util.Sequence([[1, None]]), out_packet)
- self.assertEqual(3, seqno.value)
-
- self.touch(('files/4', '4'))
- os.utime('files/4', (2, 2))
- os.utime('files', (1, 1))
-
- out_packet = OutBufferPacket()
- seeder.pull(util.Sequence([[4, None]]), out_packet)
- self.assertEqual(3, seqno.value)
-
- os.utime('files/4', (2, 2))
- os.utime('files', (2, 2))
-
- out_packet = OutBufferPacket()
- seeder.pull(util.Sequence([[4, None]]), out_packet)
- self.assertEqual(4, seqno.value)
- self.assertEqual(
- sorted([
- {'filename': '3.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '4', 'content_type': 'blob', 'path': '4'},
- {'filename': '3.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[4, 4]]},
- ]),
- read_records(out_packet))
-
- self.touch(('files/5', '5'))
- os.utime('files/5', (3, 3))
- self.touch(('files/6', '6'))
- os.utime('files/6', (3, 3))
- os.utime('files', (3, 3))
-
- out_packet = OutBufferPacket()
- seeder.pull(util.Sequence([[5, None]]), out_packet)
- self.assertEqual(6, seqno.value)
- self.assertEqual(
- sorted([
- {'filename': '4.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '5', 'content_type': 'blob', 'path': '5'},
- {'filename': '4.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '6', 'content_type': 'blob', 'path': '6'},
- {'filename': '4.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[5, 6]]},
- ]),
- read_records(out_packet))
-
- def test_Seeder_pull_DeleteFiles(self):
- seqno = util.Seqno('seqno')
- seeder = Seeder('files', 'index', seqno)
-
- self.touch(('files/1', '1'))
- self.touch(('files/2', '2'))
- self.touch(('files/3', '3'))
- self.utime('files', 1)
- os.utime('files', (1, 1))
-
- out_packet = OutBufferPacket()
- in_seq = util.Sequence([[1, None]])
- seeder.pull(in_seq, out_packet)
- self.assertEqual([[4, None]], in_seq)
- self.assertEqual(3, seqno.value)
-
- os.unlink('files/2')
- os.utime('files', (2, 2))
-
- assert seeder.pending(util.Sequence([[4, None]]))
- out_packet = OutBufferPacket()
- in_seq = util.Sequence([[1, None]])
- seeder.pull(in_seq, out_packet)
- self.assertEqual([[2, 2], [5, None]], in_seq)
- self.assertEqual(4, seqno.value)
- self.assertEqual(
- sorted([
- {'filename': '2.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '1', 'content_type': 'blob', 'path': '1'},
- {'filename': '2.packet', 'cmd': 'files_push', 'directory': 'files', 'blob': '3', 'content_type': 'blob', 'path': '3'},
- {'filename': '2.packet', 'cmd': 'files_delete', 'directory': 'files', 'path': '2'},
- {'filename': '2.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[1, 4]]},
- ]),
- read_records(out_packet))
-
- os.unlink('files/1')
- os.unlink('files/3')
- os.utime('files', (3, 3))
-
- assert seeder.pending(util.Sequence([[5, None]]))
- out_packet = OutBufferPacket()
- in_seq = util.Sequence([[1, None]])
- seeder.pull(in_seq, out_packet)
- self.assertEqual([[1, 3], [7, None]], in_seq)
- self.assertEqual(6, seqno.value)
- self.assertEqual(
- sorted([
- {'filename': '3.packet', 'cmd': 'files_delete', 'directory': 'files', 'path': '1'},
- {'filename': '3.packet', 'cmd': 'files_delete', 'directory': 'files', 'path': '2'},
- {'filename': '3.packet', 'cmd': 'files_delete', 'directory': 'files', 'path': '3'},
- {'filename': '3.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[1, 6]]},
- ]),
- read_records(out_packet))
-
- out_packet = OutBufferPacket()
- in_seq = util.Sequence([[4, None]])
- seeder.pull(in_seq, out_packet)
- self.assertEqual([[7, None]], in_seq)
- self.assertEqual(6, seqno.value)
- self.assertEqual(
- sorted([
- {'filename': '4.packet', 'cmd': 'files_delete', 'directory': 'files', 'path': '1'},
- {'filename': '4.packet', 'cmd': 'files_delete', 'directory': 'files', 'path': '2'},
- {'filename': '4.packet', 'cmd': 'files_delete', 'directory': 'files', 'path': '3'},
- {'filename': '4.packet', 'cmd': 'files_commit', 'directory': 'files', 'sequence': [[4, 6]]},
- ]),
- read_records(out_packet))
-
- def test_Leecher_push(self):
- seqno = util.Seqno('seqno')
- seeder = Seeder('src/files', 'src/index', seqno)
- leecher = Leecher('dst/files', 'dst/sequence')
-
- self.touch(('src/files/1', '1'))
- self.touch(('src/files/2/3', '3'))
- self.touch(('src/files/4/5/6', '6'))
- self.utime('src/files', 1)
- os.utime('src/files', (1, 1))
-
- with OutFilePacket('.') as packet:
- seeder.pull(util.Sequence([[1, None]]), packet)
- self.assertEqual(3, seqno.value)
- for i in InPacket(packet.path):
- leecher.push(i)
-
- self.assertEqual(
- '[[4, null]]',
- file('dst/sequence').read())
- self.assertEqual(
- '1',
- file('dst/files/1').read())
- self.assertEqual(
- '3',
- file('dst/files/2/3').read())
- self.assertEqual(
- '6',
- file('dst/files/4/5/6').read())
-
- os.unlink('src/files/2/3')
- os.utime('src/files', (2, 2))
-
- with OutFilePacket('.') as packet:
- seeder.pull(util.Sequence([[4, None]]), packet)
- self.assertEqual(4, seqno.value)
- for i in InPacket(packet.path):
- leecher.push(i)
-
- self.assertEqual(
- '[[5, null]]',
- file('dst/sequence').read())
- assert exists('dst/files/1')
- assert not exists('dst/files/2/3')
- assert exists('dst/files/4/5/6')
-
- os.unlink('src/files/1')
- self.touch(('src/files/2/3', 'new_3'))
- os.unlink('src/files/4/5/6')
- self.utime('src/files', 3)
- os.utime('src/files', (3, 3))
-
- with OutFilePacket('.') as packet:
- seeder.pull(util.Sequence([[5, None]]), packet)
- self.assertEqual(7, seqno.value)
- for i in InPacket(packet.path):
- leecher.push(i)
-
- self.assertEqual(
- '[[8, null]]',
- file('dst/sequence').read())
- assert not exists('dst/files/1')
- assert exists('dst/files/2/3')
- assert not exists('dst/files/4/5/6')
- self.assertEqual(
- 'new_3',
- file('dst/files/2/3').read())
-
-
-def read_records(in_packet):
- records = []
- for i in InPacket(stream=in_packet.pop()):
- if i.get('content_type') == 'blob':
- i['blob'] = i['blob'].read()
- records.append(i)
- return sorted(records)
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/node/stats_user.py b/tests/units/node/stats_user.py
new file mode 100755
index 0000000..49275b5
--- /dev/null
+++ b/tests/units/node/stats_user.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+import json
+import time
+
+from __init__ import tests
+
+from sugar_network.toolkit.rrd import Rrd
+from sugar_network.node.stats_user import stats_user_step, stats_user_rras, diff, merge, commit
+
+
+class StatsTest(tests.Test):
+
+ def setUp(self):
+ tests.Test.setUp(self)
+ stats_user_step.value = 1
+ stats_user_rras.value = ['RRA:AVERAGE:0.5:1:100']
+
+ def test_diff(self):
+ ts = int(time.time())
+
+ rrd = Rrd('stats/user/dir1/user1', stats_user_step.value, stats_user_rras.value)
+ rrd['db1'].put({'a': 1}, ts)
+ rrd['db1'].put({'a': 2}, ts + 1)
+
+ rrd = Rrd('stats/user/dir1/user2', stats_user_step.value, stats_user_rras.value)
+ rrd['db2'].put({'b': 3}, ts)
+
+ rrd = Rrd('stats/user/dir2/user3', stats_user_step.value, stats_user_rras.value)
+ rrd['db3'].put({'c': 4}, ts)
+ rrd['db4'].put({'d': 5}, ts)
+
+ self.assertEqual([
+ {'db': 'db3', 'user': 'user3'},
+ {'timestamp': ts, 'values': {'c': 4.0}},
+ {'db': 'db4', 'user': 'user3'},
+ {'timestamp': ts, 'values': {'d': 5.0}},
+ {'db': 'db2', 'user': 'user2'},
+ {'timestamp': ts, 'values': {'b': 3.0}},
+ {'db': 'db1', 'user': 'user1'},
+ {'timestamp': ts, 'values': {'a': 1.0}},
+ {'timestamp': ts + 1, 'values': {'a': 2.0}},
+ {'commit': {
+ 'user1': {
+ 'db1': [[1, ts + 1]],
+ },
+ 'user2': {
+ 'db2': [[1, ts]],
+ },
+ 'user3': {
+ 'db3': [[1, ts]],
+ 'db4': [[1, ts]],
+ },
+ }},
+ ],
+ [i for i in diff()])
+
+ def test_merge(self):
+ ts = int(time.time())
+
+ self.assertEqual(
+ 'info',
+ merge([
+ {'db': 'db3', 'user': 'user3'},
+ {'timestamp': ts, 'values': {'c': 4.0}},
+ {'db': 'db4', 'user': 'user3'},
+ {'timestamp': ts, 'values': {'d': 5.0}},
+ {'db': 'db2', 'user': 'user2'},
+ {'timestamp': ts, 'values': {'b': 3.0}},
+ {'db': 'db1', 'user': 'user1'},
+ {'timestamp': ts, 'values': {'a': 1.0}},
+ {'timestamp': ts + 1, 'values': {'a': 2.0}},
+ {'commit': 'info'},
+ ]))
+
+ self.assertEqual([
+ [('db1', ts, {'a': 1.0}), ('db1', ts + 1, {'a': 2.0})],
+ ],
+ [[(db.name,) + i for i in db.get(db.first, db.last)] for db in Rrd('stats/user/us/user1', 1)])
+
+ self.assertEqual([
+ [('db2', ts, {'b': 3.0})],
+ ],
+ [[(db.name,) + i for i in db.get(db.first, db.last)] for db in Rrd('stats/user/us/user2', 1)])
+
+ self.assertEqual([
+ [('db3', ts, {'c': 4.0})],
+ [('db4', ts, {'d': 5.0})],
+ ],
+ [[(db.name,) + i for i in db.get(db.first, db.last)] for db in Rrd('stats/user/us/user3', 1)])
+
+ def test_commit(self):
+ ts = int(time.time())
+ commit({
+ 'user1': {
+ 'db1': [[1, ts + 1]],
+ },
+ 'user2': {
+ 'db2': [[1, ts]],
+ },
+ 'user3': {
+ 'db3': [[1, ts]],
+ 'db4': [[1, ts]],
+ },
+ })
+
+ self.assertEqual(
+ [[ts + 2, None]],
+ json.load(file('stats/user/us/user1/db1.push')))
+ self.assertEqual(
+ [[ts + 1, None]],
+ json.load(file('stats/user/us/user2/db2.push')))
+ self.assertEqual(
+ [[ts + 1, None]],
+ json.load(file('stats/user/us/user3/db3.push')))
+ self.assertEqual(
+ [[ts + 1, None]],
+ json.load(file('stats/user/us/user3/db4.push')))
+
+
+if __name__ == '__main__':
+ tests.main()
diff --git a/tests/units/node/sync.py b/tests/units/node/sync.py
index 91c5f63..793b52b 100755
--- a/tests/units/node/sync.py
+++ b/tests/units/node/sync.py
@@ -441,27 +441,27 @@ class SyncTest(tests.Test):
self.assertEqual([
pickle.dumps({'packet': 1}),
- pickle.dumps(1),
+ pickle.dumps({1: 1}),
pickle.dumps({'packet': 2}),
- pickle.dumps(2),
- pickle.dumps(2),
+ pickle.dumps({2: 2}),
+ pickle.dumps({2: 2}),
pickle.dumps({'packet': 3}),
- pickle.dumps(3),
- pickle.dumps(3),
- pickle.dumps(3),
+ pickle.dumps({3: 3}),
+ pickle.dumps({3: 3}),
+ pickle.dumps({3: 3}),
pickle.dumps({'packet': 'last'}),
],
[i for i in sync.encode(
- (1, None, [1]),
- (2, None, [2, 2]),
- (3, None, [3, 3, 3]),
+ (1, None, [{1: 1}]),
+ (2, None, [{2: 2}, {2: 2}]),
+ (3, None, [{3: 3}, {3: 3}, {3: 3}]),
)])
def test_chunked_encode(self):
output = sync.chunked_encode()
self.assertEqual({'packet': 'last'}, pickle.loads(decode_chunked(output.read(100))))
- data = [{'foo': 1}, {'bar': 2}, 3]
+ data = [{'foo': 1}, {'bar': 2}]
data_stream = pickle.dumps({'packet': 'packet'})
for record in data:
data_stream += pickle.dumps(record)
@@ -494,6 +494,57 @@ class SyncTest(tests.Test):
dump.write(chunk)
self.assertEqual(data_stream, decode_chunked(dump.getvalue()))
+ def test_encode_Blobs(self):
+ self.touch(('1', 'a'))
+ self.touch(('2', 'bb'))
+ self.touch(('3', 'ccc'))
+
+ self.assertEqual([
+ pickle.dumps({'packet': 1}),
+ pickle.dumps({'num': 1, 'blob_size': 1}),
+ 'a',
+ pickle.dumps({'num': 2, 'blob_size': 2}),
+ 'bb',
+ pickle.dumps({'packet': 2}),
+ pickle.dumps({'num': 3, 'blob_size': 3}),
+ 'ccc',
+ pickle.dumps({'packet': 'last'}),
+ ],
+ [i for i in sync.encode(
+ (1, None, [{'num': 1, 'blob': '1'}, {'num': 2, 'blob': '2'}]),
+ (2, None, [{'num': 3, 'blob': '3'}]),
+ )])
+
+ def test_decode_Blobs(self):
+ stream = StringIO()
+ pickle.dump({'packet': 1}, stream)
+ pickle.dump({'num': 1, 'blob_size': 1}, stream)
+ stream.write('a')
+ pickle.dump({'num': 2, 'blob_size': 2}, stream)
+ stream.write('bb')
+ pickle.dump({'packet': 2}, stream)
+ pickle.dump({'num': 3, 'blob_size': 3}, stream)
+ stream.write('ccc')
+ pickle.dump({'packet': 'last'}, stream)
+ stream.seek(0)
+
+ packets_iter = sync.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ self.assertEqual([
+ (1, 1, 'a'),
+ (2, 2, 'bb'),
+ ],
+ [(i['num'], i['blob_size'], i['blob'].read()) for i in packet])
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ self.assertEqual([
+ (3, 3, 'ccc'),
+ ],
+ [(i['num'], i['blob_size'], i['blob'].read()) for i in packet])
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
def decode_chunked(encdata):
offset = 0
diff --git a/tests/units/node/sync_online.py b/tests/units/node/sync_online.py
index 79d04b0..4dbd301 100755
--- a/tests/units/node/sync_online.py
+++ b/tests/units/node/sync_online.py
@@ -1,11 +1,14 @@
#!/usr/bin/env python
# sugar-lint: disable
+import os
+import json
+
from __init__ import tests
from sugar_network import db
from sugar_network.client import Client, api_url
-from sugar_network.node import sync, stats_user
+from sugar_network.node import sync, stats_user, files_root
from sugar_network.node.master import MasterCommands
from sugar_network.node.slave import SlaveCommands
from sugar_network.resources.volume import Volume
@@ -19,7 +22,6 @@ class SyncOnlineTest(tests.Test):
def setUp(self):
tests.Test.setUp(self)
- stats_user.stats_user.value = True
self.stats_commit = []
self.stats_merge = []
@@ -35,6 +37,7 @@ class SyncOnlineTest(tests.Test):
class Document(Feedback):
pass
+ files_root.value = 'master/files'
self.master_volume = Volume('master', [User, Document])
self.master_server = coroutine.WSGIServer(('localhost', 9000), Router(MasterCommands(self.master_volume)))
coroutine.spawn(self.master_server.serve_forever)
@@ -42,6 +45,7 @@ class SyncOnlineTest(tests.Test):
client = Client('http://localhost:9000')
client.get(cmd='whoami')
+ files_root.value = 'slave/files'
api_url.value = 'http://localhost:9000'
self.slave_volume = Volume('slave', [User, Document])
self.slave_server = coroutine.WSGIServer(('localhost', 9001), Router(SlaveCommands('slave', self.slave_volume)))
@@ -65,8 +69,6 @@ class SyncOnlineTest(tests.Test):
{'guid': guid2, 'content': {'en-us': '2'}},
],
[i.properties(['guid', 'content']) for i in self.master_volume['document'].find()[0]])
- self.assertEqual(['ok'], self.stats_commit)
- self.assertEqual([{'stats': 'probe'}], self.stats_merge)
guid3 = client.post(['document'], {'context': '', 'content': '3', 'title': '', 'type': 'idea'})
client.post(cmd='online_sync')
@@ -109,6 +111,13 @@ class SyncOnlineTest(tests.Test):
],
[i.properties(['guid', 'content', 'layer']) for i in self.master_volume['document'].find()[0]])
+ def test_PushStats(self):
+ stats_user.stats_user.value = True
+ client = Client('http://localhost:9001')
+ client.post(cmd='online_sync')
+ self.assertEqual(['ok'], self.stats_commit)
+ self.assertEqual([{'stats': 'probe'}], self.stats_merge)
+
def test_Pull(self):
client = Client('http://localhost:9000')
slave_client = Client('http://localhost:9001')
@@ -164,6 +173,29 @@ class SyncOnlineTest(tests.Test):
],
[i.properties(['guid', 'content', 'layer']) for i in self.slave_volume['document'].find()[0]])
+ def test_PullFiles(self):
+ self.touch(('master/files/1', 'a', 1))
+ self.touch(('master/files/2/2', 'bb', 2))
+ self.touch(('master/files/3/3/3', 'ccc', 3))
+ os.utime('master/files', (1, 1))
+
+ client = Client('http://localhost:9001')
+ client.post(cmd='online_sync')
+
+ files, stamp = json.load(file('master/files.index'))
+ self.assertEqual(1, stamp)
+ self.assertEqual(sorted([
+ [2, '1', 1],
+ [3, '2/2', 2],
+ [4, '3/3/3', 3],
+ ]),
+ sorted(files))
+
+ self.assertEqual([[5, None]], json.load(file('slave/files.sequence')))
+ self.assertEqual('a', file('slave/files/1').read())
+ self.assertEqual('bb', file('slave/files/2/2').read())
+ self.assertEqual('ccc', file('slave/files/3/3/3').read())
+
if __name__ == '__main__':
tests.main()