Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2012-07-25 22:34:42 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2012-07-25 22:34:42 (GMT)
commitc74c05c58cda48a08dedeccacd644e70d935a27e (patch)
tree03d6b5e7b4ecb2f3f8c63de5fb8ff438b5a806b0
parenta9df7e0ef8c95ab7a345b6fa3b721f34fb456331 (diff)
Start implementing files offline sync
-rw-r--r--TODO1
-rw-r--r--sugar_network/toolkit/files_sync.py126
-rw-r--r--sugar_network/toolkit/sneakernet.py7
-rw-r--r--tests/units/__main__.py1
-rwxr-xr-xtests/units/files_sync.py293
5 files changed, 428 insertions, 0 deletions
diff --git a/TODO b/TODO
index 87b141b..22b334e 100644
--- a/TODO
+++ b/TODO
@@ -11,6 +11,7 @@
- restrict sync content, especially for actiivity versions
- after switching to global seqno, tweek blobs chaching model
- activities migth need MIME registering while checking-in
+- changed pulls should take into account accept_length
1.0
===
diff --git a/sugar_network/toolkit/files_sync.py b/sugar_network/toolkit/files_sync.py
new file mode 100644
index 0000000..9cde8fd
--- /dev/null
+++ b/sugar_network/toolkit/files_sync.py
@@ -0,0 +1,126 @@
+# Copyright (C) 2012 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import json
+import logging
+from bisect import bisect_left
+from os.path import join, exists, relpath, lexists
+
+from sugar_network.toolkit.collection import Sequence
+from active_toolkit import util, coroutine
+
+
+_logger = logging.getLogger('files_sync')
+
+
+class Seeder(object):
+
+ def __init__(self, files_path, index_path, seqno):
+ self._files_path = files_path.rstrip(os.sep)
+ self._index_path = index_path
+ self._seqno = seqno
+ self._index = []
+ self._stamp = 0
+ self._mutex = coroutine.Lock()
+
+ if exists(self._index_path):
+ with file(self._index_path) as f:
+ self._index, self._stamp = json.load(f)
+
+ if not exists(self._files_path):
+ os.makedirs(self._files_path)
+
+ def pull(self, sequence, packet):
+ with self._mutex:
+ self._sync()
+ packet.header['sequence'] = out_seq = Sequence()
+ packet.header['deleted'] = deleted = []
+ self._pull(sequence, packet, out_seq, deleted)
+
+ def _pull(self, in_seq, packet, out_seq, deleted):
+ pos = 0
+ for start, end in in_seq:
+ pos = bisect_left(self._index, [start, None, None], pos)
+ for pos, (seqno, path, mtime) in enumerate(self._index[pos:]):
+ if end is not None and seqno > end:
+ break
+ if mtime < 0:
+ deleted.append(path)
+ else:
+ packet.push_file(join(self._files_path, path),
+ arcname=join('files', path))
+ out_seq.include(start, seqno)
+ start = seqno
+
+ def _sync(self):
+ if os.stat(self._files_path).st_mtime <= self._stamp:
+ return
+
+ _logger.debug('Sync index with %r directory', self._files_path)
+ new_files = set()
+
+ # Populate list of new files at first
+ for root, __, files in os.walk(self._files_path):
+ rel_root = relpath(root, self._files_path)
+ if rel_root == '.':
+ rel_root = ''
+ else:
+ rel_root += os.sep
+ for filename in files:
+ path = join(root, filename)
+ if os.lstat(path).st_mtime > self._stamp:
+ new_files.add(rel_root + filename)
+
+ # Check for updates for already tracked files
+ tail = []
+ for pos, (__, rel_path, mtime) in enumerate(self._index[:]):
+ path = join(self._files_path, rel_path)
+ existing = lexists(path)
+ if existing == (mtime >= 0) and \
+ (not existing or os.lstat(path).st_mtime == mtime):
+ continue
+ if existing:
+ new_files.discard(rel_path)
+ pos -= len(tail)
+ self._index = self._index[:pos] + self._index[pos + 1:]
+ tail.append([
+ self._seqno.next(),
+ rel_path,
+ int(os.lstat(path).st_mtime) if existing else -1,
+ ])
+ self._index.extend(tail)
+
+ # Finally, add new files
+ for rel_path in sorted(new_files):
+ mtime = os.lstat(join(self._files_path, rel_path)).st_mtime
+ self._index.append([self._seqno.next(), rel_path, mtime])
+
+ self._stamp = os.stat(self._files_path).st_mtime
+ if self._seqno.commit():
+ with util.new_file(self._index_path) as f:
+ json.dump((self._index, self._stamp), f)
+
+
+class Leecher(object):
+
+ def __init__(self, files_path, sequence_path):
+ pass
+
+ def push(self, packet):
+ pass
+
+ def pull(self):
+ pass
diff --git a/sugar_network/toolkit/sneakernet.py b/sugar_network/toolkit/sneakernet.py
index 9b4d221..bfcfebf 100644
--- a/sugar_network/toolkit/sneakernet.py
+++ b/sugar_network/toolkit/sneakernet.py
@@ -348,6 +348,13 @@ class OutPacket(object):
arcfile.seek(0)
self._add(arcname, arcfile, meta)
+ def push_file(self, path, arcname=None):
+ size = os.lstat(path).st_size
+ self._flush(size, False)
+ self._enforce_limit(size)
+ self._tarball.add(path, arcname=arcname)
+ self._empty = False
+
def _add(self, arcname, data, meta):
if not arcname:
self._file_num += 1
diff --git a/tests/units/__main__.py b/tests/units/__main__.py
index 38afe5b..d7ad186 100644
--- a/tests/units/__main__.py
+++ b/tests/units/__main__.py
@@ -22,5 +22,6 @@ from sync_master import *
from sync_node import *
from datastore import *
from dbus_datastore import *
+from files_sync import *
tests.main()
diff --git a/tests/units/files_sync.py b/tests/units/files_sync.py
new file mode 100755
index 0000000..56bd7e7
--- /dev/null
+++ b/tests/units/files_sync.py
@@ -0,0 +1,293 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+import os
+import time
+import json
+from os.path import exists
+
+from __init__ import tests
+
+import active_document as ad
+from sugar_network.toolkit.files_sync import Seeder
+from sugar_network.toolkit.sneakernet import OutBufferPacket, InPacket, DiskFull
+
+
+class FilesSyncTest(tests.Test):
+
+ def test_Seeder_pull_Populate(self):
+ seqno = ad.Seqno('seqno')
+ seeder = Seeder('files', 'index', seqno)
+
+ packet = OutBufferPacket()
+ seeder.pull([[1, None]], packet)
+ self.assertEqual(0, seqno.value)
+ self.assertEqual(True, packet.empty)
+ assert not exists('index')
+
+ self.touch('files/1')
+ self.touch('files/2/3')
+ self.touch('files/4/5/6')
+
+ seeder.pull([[1, None]], packet)
+ self.assertEqual(0, seqno.value)
+ self.assertEqual(True, packet.empty)
+ assert not exists('index')
+
+ os.utime('files/1', (time.time() + 3, time.time() + 3))
+ os.utime('files/2/3', (time.time() + 3, time.time() + 3))
+ os.utime('files/4/5/6', (time.time() + 3, time.time() + 3))
+ os.utime('files', (time.time() + 3, time.time() + 3))
+
+ seeder.pull([[1, None]], packet)
+ self.assertEqual(3, seqno.value)
+ self.assertEqual(False, packet.empty)
+ assert exists('index')
+ self.assertEqual(
+ [[
+ [1, '1', os.stat('files/1').st_mtime],
+ [2, '2/3', os.stat('files/2/3').st_mtime],
+ [3, '4/5/6', os.stat('files/4/5/6').st_mtime],
+ ],
+ os.stat('files').st_mtime],
+ json.load(file('index')))
+ in_packet = InPacket(stream=packet.pop())
+ self.assertEqual([[1, 3]], in_packet.header['sequence'])
+ self.assertEqual([], in_packet.header['deleted'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ 'files/1',
+ 'files/2/3',
+ 'files/4/5/6',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ packet = OutBufferPacket()
+ seeder.pull([[4, None]], packet)
+ self.assertEqual(3, seqno.value)
+ self.assertEqual(True, packet.empty)
+
+ def test_Seeder_pull_NotFull(self):
+ seqno = ad.Seqno('seqno')
+ seeder = Seeder('files', 'index', seqno)
+
+ self.touch('files/1')
+ self.touch('files/2')
+ self.touch('files/3')
+ self.touch('files/4')
+ self.touch('files/5')
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[2, 2], [4, 10], [20, None]], out_packet)
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[2, 2], [4, 5]], in_packet.header['sequence'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ 'files/2',
+ 'files/4',
+ 'files/5',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ def test_Seeder_pull_DiskFull(self):
+ seqno = ad.Seqno('seqno')
+ seeder = Seeder('files', 'index', seqno)
+
+ self.touch(('files/1', '*' * 1000))
+ self.touch(('files/2', '*' * 1000))
+ self.touch(('files/3', '*' * 1000))
+
+ out_packet = OutBufferPacket(limit=2750)
+ try:
+ seeder.pull([[1, None]], out_packet)
+ assert False
+ except DiskFull:
+ pass
+
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[1, 2]], in_packet.header['sequence'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ 'files/1',
+ 'files/2',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ def test_Seeder_pull_UpdateFiles(self):
+ seqno = ad.Seqno('seqno')
+ seeder = Seeder('files', 'index', seqno)
+
+ self.touch('files/1')
+ self.touch('files/2')
+ self.touch('files/3')
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[1, None]], out_packet)
+ self.assertEqual(3, seqno.value)
+
+ os.utime('files/2', (time.time() + 3, time.time() + 3))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[4, None]], out_packet)
+ self.assertEqual(3, seqno.value)
+
+ os.utime('files', (time.time() + 3, time.time() + 3))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[4, None]], out_packet)
+ self.assertEqual(4, seqno.value)
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[4, 4]], in_packet.header['sequence'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ 'files/2',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ os.utime('files/1', (time.time() + 6, time.time() + 6))
+ os.utime('files/3', (time.time() + 6, time.time() + 6))
+ os.utime('files', (time.time() + 6, time.time() + 6))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[5, None]], out_packet)
+ self.assertEqual(6, seqno.value)
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[5, 6]], in_packet.header['sequence'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ 'files/1',
+ 'files/3',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[1, None]], out_packet)
+ self.assertEqual(6, seqno.value)
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[1, 6]], in_packet.header['sequence'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ 'files/1',
+ 'files/2',
+ 'files/3',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ def test_Seeder_pull_CreateFiles(self):
+ seqno = ad.Seqno('seqno')
+ seeder = Seeder('files', 'index', seqno)
+
+ self.touch('files/1')
+ self.touch('files/2')
+ self.touch('files/3')
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[1, None]], out_packet)
+ self.assertEqual(3, seqno.value)
+
+ self.touch('files/4')
+ os.utime('files/4', (time.time() + 3, time.time() + 3))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[4, None]], out_packet)
+ self.assertEqual(3, seqno.value)
+
+ os.utime('files', (time.time() + 3, time.time() + 3))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[4, None]], out_packet)
+ self.assertEqual(4, seqno.value)
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[4, 4]], in_packet.header['sequence'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ 'files/4',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ self.touch('files/5')
+ os.utime('files/5', (time.time() + 6, time.time() + 6))
+ self.touch('files/6')
+ os.utime('files/6', (time.time() + 6, time.time() + 6))
+ os.utime('files', (time.time() + 6, time.time() + 6))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[5, None]], out_packet)
+ self.assertEqual(6, seqno.value)
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[5, 6]], in_packet.header['sequence'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ 'files/5',
+ 'files/6',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ def test_Seeder_pull_DeleteFiles(self):
+ seqno = ad.Seqno('seqno')
+ seeder = Seeder('files', 'index', seqno)
+
+ self.touch('files/1')
+ self.touch('files/2')
+ self.touch('files/3')
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[1, None]], out_packet)
+ self.assertEqual(3, seqno.value)
+
+ os.unlink('files/2')
+ os.utime('files', (time.time() + 3, time.time() + 3))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[1, None]], out_packet)
+ self.assertEqual(4, seqno.value)
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[1, 4]], in_packet.header['sequence'])
+ self.assertEqual(['2'], in_packet.header['deleted'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ 'files/1',
+ 'files/3',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ os.unlink('files/1')
+ os.unlink('files/3')
+ os.utime('files', (time.time() + 6, time.time() + 6))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[1, None]], out_packet)
+ self.assertEqual(6, seqno.value)
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[1, 6]], in_packet.header['sequence'])
+ self.assertEqual(['2', '1', '3'], in_packet.header['deleted'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+ out_packet = OutBufferPacket()
+ seeder.pull([[4, None]], out_packet)
+ self.assertEqual(6, seqno.value)
+ in_packet = InPacket(stream=out_packet.pop())
+ self.assertEqual([[4, 6]], in_packet.header['sequence'])
+ self.assertEqual(['2', '1', '3'], in_packet.header['deleted'])
+ self.assertEqual(
+ sorted([
+ 'header',
+ ]),
+ sorted(in_packet._tarball.getnames()))
+
+
+if __name__ == '__main__':
+ tests.main()