diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2012-07-25 22:34:42 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2012-07-25 22:34:42 (GMT) |
commit | c74c05c58cda48a08dedeccacd644e70d935a27e (patch) | |
tree | 03d6b5e7b4ecb2f3f8c63de5fb8ff438b5a806b0 | |
parent | a9df7e0ef8c95ab7a345b6fa3b721f34fb456331 (diff) |
Start implementing files offline sync
-rw-r--r-- | TODO | 1 | ||||
-rw-r--r-- | sugar_network/toolkit/files_sync.py | 126 | ||||
-rw-r--r-- | sugar_network/toolkit/sneakernet.py | 7 | ||||
-rw-r--r-- | tests/units/__main__.py | 1 | ||||
-rwxr-xr-x | tests/units/files_sync.py | 293 |
5 files changed, 428 insertions, 0 deletions
@@ -11,6 +11,7 @@ - restrict sync content, especially for actiivity versions - after switching to global seqno, tweek blobs chaching model - activities migth need MIME registering while checking-in +- changed pulls should take into account accept_length 1.0 === diff --git a/sugar_network/toolkit/files_sync.py b/sugar_network/toolkit/files_sync.py new file mode 100644 index 0000000..9cde8fd --- /dev/null +++ b/sugar_network/toolkit/files_sync.py @@ -0,0 +1,126 @@ +# Copyright (C) 2012 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import json +import logging +from bisect import bisect_left +from os.path import join, exists, relpath, lexists + +from sugar_network.toolkit.collection import Sequence +from active_toolkit import util, coroutine + + +_logger = logging.getLogger('files_sync') + + +class Seeder(object): + + def __init__(self, files_path, index_path, seqno): + self._files_path = files_path.rstrip(os.sep) + self._index_path = index_path + self._seqno = seqno + self._index = [] + self._stamp = 0 + self._mutex = coroutine.Lock() + + if exists(self._index_path): + with file(self._index_path) as f: + self._index, self._stamp = json.load(f) + + if not exists(self._files_path): + os.makedirs(self._files_path) + + def pull(self, sequence, packet): + with self._mutex: + self._sync() + packet.header['sequence'] = out_seq = Sequence() + packet.header['deleted'] = deleted = [] + self._pull(sequence, packet, out_seq, deleted) + + def _pull(self, in_seq, packet, out_seq, deleted): + pos = 0 + for start, end in in_seq: + pos = bisect_left(self._index, [start, None, None], pos) + for pos, (seqno, path, mtime) in enumerate(self._index[pos:]): + if end is not None and seqno > end: + break + if mtime < 0: + deleted.append(path) + else: + packet.push_file(join(self._files_path, path), + arcname=join('files', path)) + out_seq.include(start, seqno) + start = seqno + + def _sync(self): + if os.stat(self._files_path).st_mtime <= self._stamp: + return + + _logger.debug('Sync index with %r directory', self._files_path) + new_files = set() + + # Populate list of new files at first + for root, __, files in os.walk(self._files_path): + rel_root = relpath(root, self._files_path) + if rel_root == '.': + rel_root = '' + else: + rel_root += os.sep + for filename in files: + path = join(root, filename) + if os.lstat(path).st_mtime > self._stamp: + new_files.add(rel_root + filename) + + # Check for updates for already tracked files + tail = [] + for pos, (__, rel_path, mtime) in enumerate(self._index[:]): + path = join(self._files_path, rel_path) + existing = lexists(path) + if existing == (mtime >= 0) and \ + (not existing or os.lstat(path).st_mtime == mtime): + continue + if existing: + new_files.discard(rel_path) + pos -= len(tail) + self._index = self._index[:pos] + self._index[pos + 1:] + tail.append([ + self._seqno.next(), + rel_path, + int(os.lstat(path).st_mtime) if existing else -1, + ]) + self._index.extend(tail) + + # Finally, add new files + for rel_path in sorted(new_files): + mtime = os.lstat(join(self._files_path, rel_path)).st_mtime + self._index.append([self._seqno.next(), rel_path, mtime]) + + self._stamp = os.stat(self._files_path).st_mtime + if self._seqno.commit(): + with util.new_file(self._index_path) as f: + json.dump((self._index, self._stamp), f) + + +class Leecher(object): + + def __init__(self, files_path, sequence_path): + pass + + def push(self, packet): + pass + + def pull(self): + pass diff --git a/sugar_network/toolkit/sneakernet.py b/sugar_network/toolkit/sneakernet.py index 9b4d221..bfcfebf 100644 --- a/sugar_network/toolkit/sneakernet.py +++ b/sugar_network/toolkit/sneakernet.py @@ -348,6 +348,13 @@ class OutPacket(object): arcfile.seek(0) self._add(arcname, arcfile, meta) + def push_file(self, path, arcname=None): + size = os.lstat(path).st_size + self._flush(size, False) + self._enforce_limit(size) + self._tarball.add(path, arcname=arcname) + self._empty = False + def _add(self, arcname, data, meta): if not arcname: self._file_num += 1 diff --git a/tests/units/__main__.py b/tests/units/__main__.py index 38afe5b..d7ad186 100644 --- a/tests/units/__main__.py +++ b/tests/units/__main__.py @@ -22,5 +22,6 @@ from sync_master import * from sync_node import * from datastore import * from dbus_datastore import * +from files_sync import * tests.main() diff --git a/tests/units/files_sync.py b/tests/units/files_sync.py new file mode 100755 index 0000000..56bd7e7 --- /dev/null +++ b/tests/units/files_sync.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python +# sugar-lint: disable + +import os +import time +import json +from os.path import exists + +from __init__ import tests + +import active_document as ad +from sugar_network.toolkit.files_sync import Seeder +from sugar_network.toolkit.sneakernet import OutBufferPacket, InPacket, DiskFull + + +class FilesSyncTest(tests.Test): + + def test_Seeder_pull_Populate(self): + seqno = ad.Seqno('seqno') + seeder = Seeder('files', 'index', seqno) + + packet = OutBufferPacket() + seeder.pull([[1, None]], packet) + self.assertEqual(0, seqno.value) + self.assertEqual(True, packet.empty) + assert not exists('index') + + self.touch('files/1') + self.touch('files/2/3') + self.touch('files/4/5/6') + + seeder.pull([[1, None]], packet) + self.assertEqual(0, seqno.value) + self.assertEqual(True, packet.empty) + assert not exists('index') + + os.utime('files/1', (time.time() + 3, time.time() + 3)) + os.utime('files/2/3', (time.time() + 3, time.time() + 3)) + os.utime('files/4/5/6', (time.time() + 3, time.time() + 3)) + os.utime('files', (time.time() + 3, time.time() + 3)) + + seeder.pull([[1, None]], packet) + self.assertEqual(3, seqno.value) + self.assertEqual(False, packet.empty) + assert exists('index') + self.assertEqual( + [[ + [1, '1', os.stat('files/1').st_mtime], + [2, '2/3', os.stat('files/2/3').st_mtime], + [3, '4/5/6', os.stat('files/4/5/6').st_mtime], + ], + os.stat('files').st_mtime], + json.load(file('index'))) + in_packet = InPacket(stream=packet.pop()) + self.assertEqual([[1, 3]], in_packet.header['sequence']) + self.assertEqual([], in_packet.header['deleted']) + self.assertEqual( + sorted([ + 'header', + 'files/1', + 'files/2/3', + 'files/4/5/6', + ]), + sorted(in_packet._tarball.getnames())) + + packet = OutBufferPacket() + seeder.pull([[4, None]], packet) + self.assertEqual(3, seqno.value) + self.assertEqual(True, packet.empty) + + def test_Seeder_pull_NotFull(self): + seqno = ad.Seqno('seqno') + seeder = Seeder('files', 'index', seqno) + + self.touch('files/1') + self.touch('files/2') + self.touch('files/3') + self.touch('files/4') + self.touch('files/5') + + out_packet = OutBufferPacket() + seeder.pull([[2, 2], [4, 10], [20, None]], out_packet) + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[2, 2], [4, 5]], in_packet.header['sequence']) + self.assertEqual( + sorted([ + 'header', + 'files/2', + 'files/4', + 'files/5', + ]), + sorted(in_packet._tarball.getnames())) + + def test_Seeder_pull_DiskFull(self): + seqno = ad.Seqno('seqno') + seeder = Seeder('files', 'index', seqno) + + self.touch(('files/1', '*' * 1000)) + self.touch(('files/2', '*' * 1000)) + self.touch(('files/3', '*' * 1000)) + + out_packet = OutBufferPacket(limit=2750) + try: + seeder.pull([[1, None]], out_packet) + assert False + except DiskFull: + pass + + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[1, 2]], in_packet.header['sequence']) + self.assertEqual( + sorted([ + 'header', + 'files/1', + 'files/2', + ]), + sorted(in_packet._tarball.getnames())) + + def test_Seeder_pull_UpdateFiles(self): + seqno = ad.Seqno('seqno') + seeder = Seeder('files', 'index', seqno) + + self.touch('files/1') + self.touch('files/2') + self.touch('files/3') + + out_packet = OutBufferPacket() + seeder.pull([[1, None]], out_packet) + self.assertEqual(3, seqno.value) + + os.utime('files/2', (time.time() + 3, time.time() + 3)) + + out_packet = OutBufferPacket() + seeder.pull([[4, None]], out_packet) + self.assertEqual(3, seqno.value) + + os.utime('files', (time.time() + 3, time.time() + 3)) + + out_packet = OutBufferPacket() + seeder.pull([[4, None]], out_packet) + self.assertEqual(4, seqno.value) + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[4, 4]], in_packet.header['sequence']) + self.assertEqual( + sorted([ + 'header', + 'files/2', + ]), + sorted(in_packet._tarball.getnames())) + + os.utime('files/1', (time.time() + 6, time.time() + 6)) + os.utime('files/3', (time.time() + 6, time.time() + 6)) + os.utime('files', (time.time() + 6, time.time() + 6)) + + out_packet = OutBufferPacket() + seeder.pull([[5, None]], out_packet) + self.assertEqual(6, seqno.value) + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[5, 6]], in_packet.header['sequence']) + self.assertEqual( + sorted([ + 'header', + 'files/1', + 'files/3', + ]), + sorted(in_packet._tarball.getnames())) + + out_packet = OutBufferPacket() + seeder.pull([[1, None]], out_packet) + self.assertEqual(6, seqno.value) + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[1, 6]], in_packet.header['sequence']) + self.assertEqual( + sorted([ + 'header', + 'files/1', + 'files/2', + 'files/3', + ]), + sorted(in_packet._tarball.getnames())) + + def test_Seeder_pull_CreateFiles(self): + seqno = ad.Seqno('seqno') + seeder = Seeder('files', 'index', seqno) + + self.touch('files/1') + self.touch('files/2') + self.touch('files/3') + + out_packet = OutBufferPacket() + seeder.pull([[1, None]], out_packet) + self.assertEqual(3, seqno.value) + + self.touch('files/4') + os.utime('files/4', (time.time() + 3, time.time() + 3)) + + out_packet = OutBufferPacket() + seeder.pull([[4, None]], out_packet) + self.assertEqual(3, seqno.value) + + os.utime('files', (time.time() + 3, time.time() + 3)) + + out_packet = OutBufferPacket() + seeder.pull([[4, None]], out_packet) + self.assertEqual(4, seqno.value) + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[4, 4]], in_packet.header['sequence']) + self.assertEqual( + sorted([ + 'header', + 'files/4', + ]), + sorted(in_packet._tarball.getnames())) + + self.touch('files/5') + os.utime('files/5', (time.time() + 6, time.time() + 6)) + self.touch('files/6') + os.utime('files/6', (time.time() + 6, time.time() + 6)) + os.utime('files', (time.time() + 6, time.time() + 6)) + + out_packet = OutBufferPacket() + seeder.pull([[5, None]], out_packet) + self.assertEqual(6, seqno.value) + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[5, 6]], in_packet.header['sequence']) + self.assertEqual( + sorted([ + 'header', + 'files/5', + 'files/6', + ]), + sorted(in_packet._tarball.getnames())) + + def test_Seeder_pull_DeleteFiles(self): + seqno = ad.Seqno('seqno') + seeder = Seeder('files', 'index', seqno) + + self.touch('files/1') + self.touch('files/2') + self.touch('files/3') + + out_packet = OutBufferPacket() + seeder.pull([[1, None]], out_packet) + self.assertEqual(3, seqno.value) + + os.unlink('files/2') + os.utime('files', (time.time() + 3, time.time() + 3)) + + out_packet = OutBufferPacket() + seeder.pull([[1, None]], out_packet) + self.assertEqual(4, seqno.value) + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[1, 4]], in_packet.header['sequence']) + self.assertEqual(['2'], in_packet.header['deleted']) + self.assertEqual( + sorted([ + 'header', + 'files/1', + 'files/3', + ]), + sorted(in_packet._tarball.getnames())) + + os.unlink('files/1') + os.unlink('files/3') + os.utime('files', (time.time() + 6, time.time() + 6)) + + out_packet = OutBufferPacket() + seeder.pull([[1, None]], out_packet) + self.assertEqual(6, seqno.value) + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[1, 6]], in_packet.header['sequence']) + self.assertEqual(['2', '1', '3'], in_packet.header['deleted']) + self.assertEqual( + sorted([ + 'header', + ]), + sorted(in_packet._tarball.getnames())) + + out_packet = OutBufferPacket() + seeder.pull([[4, None]], out_packet) + self.assertEqual(6, seqno.value) + in_packet = InPacket(stream=out_packet.pop()) + self.assertEqual([[4, 6]], in_packet.header['sequence']) + self.assertEqual(['2', '1', '3'], in_packet.header['deleted']) + self.assertEqual( + sorted([ + 'header', + ]), + sorted(in_packet._tarball.getnames())) + + +if __name__ == '__main__': + tests.main() |