diff options
Diffstat (limited to 'sugar_network/node/sync.py')
-rw-r--r-- | sugar_network/node/sync.py | 307 |
1 files changed, 0 insertions, 307 deletions
diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py deleted file mode 100644 index f5b946c..0000000 --- a/sugar_network/node/sync.py +++ /dev/null @@ -1,307 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import gzip -import zlib -import json -import logging -from cStringIO import StringIO -from types import GeneratorType -from os.path import exists, join, dirname, basename, splitext - -from sugar_network import toolkit -from sugar_network.toolkit import coroutine, enforce - - -# Filename suffix to use for sneakernet synchronization files -_SNEAKERNET_SUFFIX = '.sneakernet' - -# Leave at leat n bytes in fs whle calling `encode_to_file()` -_SNEAKERNET_RESERVED_SIZE = 1024 * 1024 - -_logger = logging.getLogger('node.sync') - - -def decode(stream): - packet = _PacketsIterator(stream) - while True: - packet.next() - if packet.name == 'last': - break - yield packet - - -def encode(packets, **header): - return _encode(None, packets, False, header, _EncodingStatus()) - - -def limited_encode(limit, packets, **header): - return _encode(limit, packets, False, header, _EncodingStatus()) - - -def package_decode(stream): - stream = _GzipStream(stream) - package_props = json.loads(stream.readline()) - - for packet in decode(stream): - packet.props.update(package_props) - yield packet - - -def package_encode(packets, **header): - # XXX Only for small amount of data - # TODO Support real streaming - buf = StringIO() - zipfile = gzip.GzipFile(mode='wb', fileobj=buf) - - header['filename'] = toolkit.uuid() + _SNEAKERNET_SUFFIX - json.dump(header, zipfile) - zipfile.write('\n') - - for chunk in _encode(None, packets, False, None, _EncodingStatus()): - zipfile.write(chunk) - zipfile.close() - - yield buf.getvalue() - - -def sneakernet_decode(root, node=None, session=None): - for root, __, files in os.walk(root): - for filename in files: - if not filename.endswith(_SNEAKERNET_SUFFIX): - continue - zipfile = gzip.open(join(root, filename), 'rb') - try: - package_props = json.loads(zipfile.readline()) - - if node is not None and package_props.get('src') == node: - if package_props.get('session') == session: - _logger.debug('Skip session %r sneakernet package', - zipfile.name) - else: - _logger.debug('Remove outdate %r sneakernet package', - zipfile.name) - os.unlink(zipfile.name) - continue - - for packet in decode(zipfile): - packet.props.update(package_props) - yield packet - finally: - zipfile.close() - - -def sneakernet_encode(packets, root=None, limit=None, path=None, **header): - if path is None: - if not exists(root): - os.makedirs(root) - filename = toolkit.uuid() + _SNEAKERNET_SUFFIX - path = toolkit.unique_filename(root, filename) - else: - filename = splitext(basename(path))[0] + _SNEAKERNET_SUFFIX - if 'filename' not in header: - header['filename'] = filename - - if limit <= 0: - stat = os.statvfs(dirname(path)) - limit = stat.f_bfree * stat.f_frsize - _SNEAKERNET_RESERVED_SIZE - - _logger.debug('Creating %r sneakernet package, limit=%s header=%r', - path, limit, header) - - status = _EncodingStatus() - with file(path, 'wb') as package: - zipfile = gzip.GzipFile(fileobj=package) - try: - json.dump(header, zipfile) - zipfile.write('\n') - - pos = None - encoder = _encode(limit, packets, True, None, status) - while True: - try: - chunk = encoder.send(pos) - zipfile.write(chunk) - pos = zipfile.fileobj.tell() - coroutine.dispatch() - except StopIteration: - break - - except Exception: - _logger.debug('Emergency removing %r package', path) - package.close() - os.unlink(path) - raise - else: - zipfile.close() - package.flush() - os.fsync(package.fileno()) - - return not status.aborted - - -class _EncodingStatus(object): - - aborted = False - - -def _encode(limit, packets, download_blobs, header, status): - for packet, props, content in packets: - if status.aborted: - break - - if props is None: - props = {} - if header: - props.update(header) - props['packet'] = packet - pos = (yield json.dumps(props) + '\n') or 0 - - if content is None: - continue - - content = iter(content) - try: - record = next(content) - - while True: - blob = None - blob_size = 0 - if 'blob' in record: - blob = record.pop('blob') - blob_size = record['blob_size'] - - dump = json.dumps(record) + '\n' - if not status.aborted and limit is not None and \ - pos + len(dump) + blob_size > limit: - status.aborted = True - if not isinstance(content, GeneratorType): - raise StopIteration() - record = content.throw(StopIteration()) - continue - pos = (yield dump) or 0 - - if blob is not None: - for chunk in blob: - pos = (yield chunk) or 0 - blob_size -= len(chunk) - enforce(blob_size == 0, EOFError, - 'File size is not the same as declared') - - record = next(content) - except StopIteration: - pass - - yield json.dumps({'packet': 'last'}) + '\n' - - -class _PacketsIterator(object): - - def __init__(self, stream): - if not hasattr(stream, 'readline'): - stream.readline = lambda: toolkit.readline(stream) - if hasattr(stream, 'seek'): - self._seek = stream.seek - self._stream = stream - self.props = {} - self._name = None - self._shift = True - - @property - def name(self): - return self._name - - def next(self): - if self._shift: - for __ in self: - pass - if self._name is None: - raise EOFError() - self._shift = True - - def __repr__(self): - return '<SyncPacket %r>' % self.props - - def __getitem__(self, key): - return self.props.get(key) - - def __iter__(self): - blob = None - while True: - if blob is not None and blob.size_to_read: - self._seek(blob.size_to_read, 1) - blob = None - record = self._stream.readline() - if not record: - self._name = None - raise EOFError() - record = json.loads(record) - if 'packet' in record: - self._name = record['packet'] or '' - self.props = record - self._shift = False - break - blob_size = record.get('blob_size') - if blob_size: - blob = record['blob'] = _Blob(self._stream, blob_size) - yield record - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - pass - - # pylint: disable-msg=E0202 - def _seek(self, distance, where): - while distance: - chunk = self._stream.read(min(distance, toolkit.BUFFER_SIZE)) - distance -= len(chunk) - - -class _Blob(object): - - def __init__(self, stream, size): - self._stream = stream - self.size_to_read = size - - def read(self, size=toolkit.BUFFER_SIZE): - chunk = self._stream.read(min(size, self.size_to_read)) - self.size_to_read -= len(chunk) - return chunk - - -class _GzipStream(object): - - def __init__(self, stream): - self._stream = stream - self._zip = zlib.decompressobj(16 + zlib.MAX_WBITS) - self._buffer = bytearray() - - def read(self, size): - while True: - if size <= len(self._buffer): - result = self._buffer[:size] - self._buffer = self._buffer[size:] - return bytes(result) - chunk = self._stream.read(size) - if not chunk: - result, self._buffer = self._buffer, bytearray() - return result - self._buffer += self._zip.decompress(chunk) - - def readline(self): - return toolkit.readline(self) |