# Copyright (C) 2012-2013 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import gzip import zlib import json import logging from cStringIO import StringIO from types import GeneratorType from os.path import exists, join, dirname, basename, splitext from sugar_network import toolkit from sugar_network.toolkit import coroutine, enforce # Filename suffix to use for sneakernet synchronization files _SNEAKERNET_SUFFIX = '.sneakernet' # Leave at leat n bytes in fs whle calling `encode_to_file()` _SNEAKERNET_RESERVED_SIZE = 1024 * 1024 _logger = logging.getLogger('node.sync') def decode(stream): packet = _PacketsIterator(stream) while True: packet.next() if packet.name == 'last': break yield packet def encode(packets, **header): return _encode(None, packets, False, header, _EncodingStatus()) def limited_encode(limit, packets, **header): return _encode(limit, packets, False, header, _EncodingStatus()) def package_decode(stream): stream = _GzipStream(stream) package_props = json.loads(stream.readline()) for packet in decode(stream): packet.props.update(package_props) yield packet def package_encode(packets, **header): # XXX Only for small amount of data # TODO Support real streaming buf = StringIO() zipfile = gzip.GzipFile(mode='wb', fileobj=buf) header['filename'] = toolkit.uuid() + _SNEAKERNET_SUFFIX json.dump(header, zipfile) zipfile.write('\n') for chunk in _encode(None, packets, False, None, _EncodingStatus()): zipfile.write(chunk) zipfile.close() yield buf.getvalue() def sneakernet_decode(root, node=None, session=None): for root, __, files in os.walk(root): for filename in files: if not filename.endswith(_SNEAKERNET_SUFFIX): continue zipfile = gzip.open(join(root, filename), 'rb') try: package_props = json.loads(zipfile.readline()) if node is not None and package_props.get('src') == node: if package_props.get('session') == session: _logger.debug('Skip session %r sneakernet package', zipfile.name) else: _logger.debug('Remove outdate %r sneakernet package', zipfile.name) os.unlink(zipfile.name) continue for packet in decode(zipfile): packet.props.update(package_props) yield packet finally: zipfile.close() def sneakernet_encode(packets, root=None, limit=None, path=None, **header): if path is None: if not exists(root): os.makedirs(root) filename = toolkit.uuid() + _SNEAKERNET_SUFFIX path = toolkit.unique_filename(root, filename) else: filename = splitext(basename(path))[0] + _SNEAKERNET_SUFFIX if 'filename' not in header: header['filename'] = filename if limit <= 0: stat = os.statvfs(dirname(path)) limit = stat.f_bfree * stat.f_frsize - _SNEAKERNET_RESERVED_SIZE _logger.debug('Creating %r sneakernet package, limit=%s header=%r', path, limit, header) status = _EncodingStatus() with file(path, 'wb') as package: zipfile = gzip.GzipFile(fileobj=package) try: json.dump(header, zipfile) zipfile.write('\n') pos = None encoder = _encode(limit, packets, True, None, status) while True: try: chunk = encoder.send(pos) zipfile.write(chunk) pos = zipfile.fileobj.tell() coroutine.dispatch() except StopIteration: break except Exception: _logger.debug('Emergency removing %r package', path) package.close() os.unlink(path) raise else: zipfile.close() package.flush() os.fsync(package.fileno()) return not status.aborted class _EncodingStatus(object): aborted = False def _encode(limit, packets, download_blobs, header, status): for packet, props, content in packets: if status.aborted: break if props is None: props = {} if header: props.update(header) props['packet'] = packet pos = (yield json.dumps(props) + '\n') or 0 if content is None: continue content = iter(content) try: record = next(content) while True: blob = None blob_size = 0 if 'blob' in record: blob = record.pop('blob') blob_size = record['blob_size'] dump = json.dumps(record) + '\n' if not status.aborted and limit is not None and \ pos + len(dump) + blob_size > limit: status.aborted = True if not isinstance(content, GeneratorType): raise StopIteration() record = content.throw(StopIteration()) continue pos = (yield dump) or 0 if blob is not None: for chunk in blob: pos = (yield chunk) or 0 blob_size -= len(chunk) enforce(blob_size == 0, EOFError, 'Blob size is not the same as declared') record = next(content) except StopIteration: pass yield json.dumps({'packet': 'last'}) + '\n' class _PacketsIterator(object): def __init__(self, stream): if not hasattr(stream, 'readline'): stream.readline = lambda: toolkit.readline(stream) if hasattr(stream, 'seek'): self._seek = stream.seek self._stream = stream self.props = {} self._name = None self._shift = True @property def name(self): return self._name def next(self): if self._shift: for __ in self: pass if self._name is None: raise EOFError() self._shift = True def __repr__(self): return '' % self.props def __getitem__(self, key): return self.props.get(key) def __iter__(self): blob = None while True: if blob is not None and blob.size_to_read: self._seek(blob.size_to_read, 1) blob = None record = self._stream.readline() if not record: self._name = None raise EOFError() record = json.loads(record) if 'packet' in record: self._name = record['packet'] or '' self.props = record self._shift = False break blob_size = record.get('blob_size') if blob_size: blob = record['blob'] = _Blob(self._stream, blob_size) yield record def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): pass # pylint: disable-msg=E0202 def _seek(self, distance, where): while distance: chunk = self._stream.read(min(distance, toolkit.BUFFER_SIZE)) distance -= len(chunk) class _Blob(object): def __init__(self, stream, size): self._stream = stream self.size_to_read = size def read(self, size=toolkit.BUFFER_SIZE): chunk = self._stream.read(min(size, self.size_to_read)) self.size_to_read -= len(chunk) return chunk class _GzipStream(object): def __init__(self, stream): self._stream = stream self._zip = zlib.decompressobj(16 + zlib.MAX_WBITS) self._buffer = bytearray() def read(self, size): while True: if size <= len(self._buffer): result = self._buffer[:size] self._buffer = self._buffer[size:] return bytes(result) chunk = self._stream.read(size) if not chunk: result, self._buffer = self._buffer, bytearray() return result self._buffer += self._zip.decompress(chunk) def readline(self): return toolkit.readline(self)