From 1028755053ef3d8c538138b37e61ece13b9c1a23 Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Thu, 27 Feb 2014 12:21:23 +0000 Subject: Zip sync stream all time --- diff --git a/TODO b/TODO index 8213191..904aacf 100644 --- a/TODO +++ b/TODO @@ -2,6 +2,7 @@ - diff/merge while checking in node context - deliver spawn events only to local subscribers - test/run presolve +- if node relocates api calls, do it only once in toolkit.http 0.10 diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index da06483..a9d66e0 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -37,8 +37,14 @@ def init(path): os.makedirs(_root) -def post(content, mime_type=None, digest_to_assert=None): - meta = [] +def post(content, mime_type=None, digest_to_assert=None, meta=None): + if meta is None: + meta = [] + meta.append(('content-type', mime_type or 'application/octet-stream')) + else: + meta = meta.items() + if mime_type: + meta.append(('content-type', mime_type)) @contextmanager def write_blob(): @@ -70,7 +76,7 @@ def post(content, mime_type=None, digest_to_assert=None): blob.unlink() raise http.BadRequest('Digest mismatch') path = _path(digest) - meta.append(('content-type', mime_type or 'application/octet-stream')) + meta.append(('content-length', str(blob.tell()))) with toolkit.new_file(path + _META_SUFFIX) as f: for key, value in meta: f.write('%s: %s\n' % (key, value)) diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py index 88d644b..9ba5998 100644 --- a/sugar_network/db/metadata.py +++ b/sugar_network/db/metadata.py @@ -311,7 +311,11 @@ class Blob(Property): return '' if not isinstance(value, dict): - mime_type = this.request.content_type or self.mime_type + mime_type = None + if this.request.prop == self.name: + mime_type = this.request.content_type + if not mime_type: + mime_type = self.mime_type return blobs.post(value, mime_type).digest digest = this.resource[self.name] if self.name else None diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py deleted file mode 100644 index f5b946c..0000000 --- a/sugar_network/node/sync.py +++ /dev/null @@ -1,307 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import gzip -import zlib -import json -import logging -from cStringIO import StringIO -from types import GeneratorType -from os.path import exists, join, dirname, basename, splitext - -from sugar_network import toolkit -from sugar_network.toolkit import coroutine, enforce - - -# Filename suffix to use for sneakernet synchronization files -_SNEAKERNET_SUFFIX = '.sneakernet' - -# Leave at leat n bytes in fs whle calling `encode_to_file()` -_SNEAKERNET_RESERVED_SIZE = 1024 * 1024 - -_logger = logging.getLogger('node.sync') - - -def decode(stream): - packet = _PacketsIterator(stream) - while True: - packet.next() - if packet.name == 'last': - break - yield packet - - -def encode(packets, **header): - return _encode(None, packets, False, header, _EncodingStatus()) - - -def limited_encode(limit, packets, **header): - return _encode(limit, packets, False, header, _EncodingStatus()) - - -def package_decode(stream): - stream = _GzipStream(stream) - package_props = json.loads(stream.readline()) - - for packet in decode(stream): - packet.props.update(package_props) - yield packet - - -def package_encode(packets, **header): - # XXX Only for small amount of data - # TODO Support real streaming - buf = StringIO() - zipfile = gzip.GzipFile(mode='wb', fileobj=buf) - - header['filename'] = toolkit.uuid() + _SNEAKERNET_SUFFIX - json.dump(header, zipfile) - zipfile.write('\n') - - for chunk in _encode(None, packets, False, None, _EncodingStatus()): - zipfile.write(chunk) - zipfile.close() - - yield buf.getvalue() - - -def sneakernet_decode(root, node=None, session=None): - for root, __, files in os.walk(root): - for filename in files: - if not filename.endswith(_SNEAKERNET_SUFFIX): - continue - zipfile = gzip.open(join(root, filename), 'rb') - try: - package_props = json.loads(zipfile.readline()) - - if node is not None and package_props.get('src') == node: - if package_props.get('session') == session: - _logger.debug('Skip session %r sneakernet package', - zipfile.name) - else: - _logger.debug('Remove outdate %r sneakernet package', - zipfile.name) - os.unlink(zipfile.name) - continue - - for packet in decode(zipfile): - packet.props.update(package_props) - yield packet - finally: - zipfile.close() - - -def sneakernet_encode(packets, root=None, limit=None, path=None, **header): - if path is None: - if not exists(root): - os.makedirs(root) - filename = toolkit.uuid() + _SNEAKERNET_SUFFIX - path = toolkit.unique_filename(root, filename) - else: - filename = splitext(basename(path))[0] + _SNEAKERNET_SUFFIX - if 'filename' not in header: - header['filename'] = filename - - if limit <= 0: - stat = os.statvfs(dirname(path)) - limit = stat.f_bfree * stat.f_frsize - _SNEAKERNET_RESERVED_SIZE - - _logger.debug('Creating %r sneakernet package, limit=%s header=%r', - path, limit, header) - - status = _EncodingStatus() - with file(path, 'wb') as package: - zipfile = gzip.GzipFile(fileobj=package) - try: - json.dump(header, zipfile) - zipfile.write('\n') - - pos = None - encoder = _encode(limit, packets, True, None, status) - while True: - try: - chunk = encoder.send(pos) - zipfile.write(chunk) - pos = zipfile.fileobj.tell() - coroutine.dispatch() - except StopIteration: - break - - except Exception: - _logger.debug('Emergency removing %r package', path) - package.close() - os.unlink(path) - raise - else: - zipfile.close() - package.flush() - os.fsync(package.fileno()) - - return not status.aborted - - -class _EncodingStatus(object): - - aborted = False - - -def _encode(limit, packets, download_blobs, header, status): - for packet, props, content in packets: - if status.aborted: - break - - if props is None: - props = {} - if header: - props.update(header) - props['packet'] = packet - pos = (yield json.dumps(props) + '\n') or 0 - - if content is None: - continue - - content = iter(content) - try: - record = next(content) - - while True: - blob = None - blob_size = 0 - if 'blob' in record: - blob = record.pop('blob') - blob_size = record['blob_size'] - - dump = json.dumps(record) + '\n' - if not status.aborted and limit is not None and \ - pos + len(dump) + blob_size > limit: - status.aborted = True - if not isinstance(content, GeneratorType): - raise StopIteration() - record = content.throw(StopIteration()) - continue - pos = (yield dump) or 0 - - if blob is not None: - for chunk in blob: - pos = (yield chunk) or 0 - blob_size -= len(chunk) - enforce(blob_size == 0, EOFError, - 'File size is not the same as declared') - - record = next(content) - except StopIteration: - pass - - yield json.dumps({'packet': 'last'}) + '\n' - - -class _PacketsIterator(object): - - def __init__(self, stream): - if not hasattr(stream, 'readline'): - stream.readline = lambda: toolkit.readline(stream) - if hasattr(stream, 'seek'): - self._seek = stream.seek - self._stream = stream - self.props = {} - self._name = None - self._shift = True - - @property - def name(self): - return self._name - - def next(self): - if self._shift: - for __ in self: - pass - if self._name is None: - raise EOFError() - self._shift = True - - def __repr__(self): - return '' % self.props - - def __getitem__(self, key): - return self.props.get(key) - - def __iter__(self): - blob = None - while True: - if blob is not None and blob.size_to_read: - self._seek(blob.size_to_read, 1) - blob = None - record = self._stream.readline() - if not record: - self._name = None - raise EOFError() - record = json.loads(record) - if 'packet' in record: - self._name = record['packet'] or '' - self.props = record - self._shift = False - break - blob_size = record.get('blob_size') - if blob_size: - blob = record['blob'] = _Blob(self._stream, blob_size) - yield record - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - pass - - # pylint: disable-msg=E0202 - def _seek(self, distance, where): - while distance: - chunk = self._stream.read(min(distance, toolkit.BUFFER_SIZE)) - distance -= len(chunk) - - -class _Blob(object): - - def __init__(self, stream, size): - self._stream = stream - self.size_to_read = size - - def read(self, size=toolkit.BUFFER_SIZE): - chunk = self._stream.read(min(size, self.size_to_read)) - self.size_to_read -= len(chunk) - return chunk - - -class _GzipStream(object): - - def __init__(self, stream): - self._stream = stream - self._zip = zlib.decompressobj(16 + zlib.MAX_WBITS) - self._buffer = bytearray() - - def read(self, size): - while True: - if size <= len(self._buffer): - result = self._buffer[:size] - self._buffer = self._buffer[size:] - return bytes(result) - chunk = self._stream.read(size) - if not chunk: - result, self._buffer = self._buffer, bytearray() - return result - self._buffer += self._zip.decompress(chunk) - - def readline(self): - return toolkit.readline(self) diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index 8acfe27..792267a 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -782,6 +782,9 @@ class _NewFile(object): def name(self, value): self.dst_path = value + def tell(self): + return self._file.file.tell() + def close(self): self._file.close() if exists(self.name): diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py new file mode 100644 index 0000000..457ea07 --- /dev/null +++ b/sugar_network/toolkit/parcel.py @@ -0,0 +1,336 @@ +# Copyright (C) 2012-2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +import sys +import zlib +import time +import json +import struct +import logging +from types import GeneratorType +from os.path import dirname, exists, join + +from sugar_network import toolkit +from sugar_network.toolkit.router import File +from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce + + +_FILENAME_SUFFIX = '.parcel' +_RESERVED_DISK_SPACE = 1024 * 1024 + +_ZLIB_WBITS = 15 +_ZLIB_WBITS_SIZE = 32768 # 2 ** 15 + +_logger = logging.getLogger('parcel') + + +def decode(stream, limit=None): + _logger.debug('Decode %r stream limit=%r', stream, limit) + + stream = _UnzipStream(stream, limit) + header = stream.read_record() + + packet = _DecodeIterator(stream) + while True: + packet.next() + if packet.name == 'last': + break + packet.props.update(header) + yield packet + + +def encode(packets, limit=None, header=None, compresslevel=6): + _logger.debug('Encode %r packets limit=%r header=%r', + packets, limit, header) + + ostream = _ZipStream(compresslevel) + + if limit is None: + limit = sys.maxint + if header is None: + header = {} + chunk = ostream.write_record(header) + if chunk: + yield chunk + + for packet, props, content in packets: + if props is None: + props = {} + props['packet'] = packet + chunk = ostream.write_record(props) + if chunk: + yield chunk + + if content is None: + continue + + content = iter(content) + try: + finalizing = False + record = next(content) + while True: + if record is None: + finalizing = True + record = next(content) + continue + blob_len = 0 + if isinstance(record, File) and record.path: + blob_len = record.size + chunk = ostream.write_record(record, + None if finalizing else limit - blob_len) + if chunk is None: + _logger.debug('Reach the encoding limit') + if not isinstance(content, GeneratorType): + raise StopIteration() + finalizing = True + record = content.throw(StopIteration()) + continue + if chunk: + yield chunk + if blob_len: + with file(record.path, 'rb') as blob: + while True: + chunk = blob.read(BUFFER_SIZE) + if not chunk: + break + blob_len -= len(chunk) + if not blob_len: + chunk += '\n' + chunk = ostream.write(chunk) + if chunk: + yield chunk + enforce(blob_len == 0, EOFError, 'Blob size mismatch') + record = next(content) + except StopIteration: + pass + + chunk = ostream.write_record({'packet': 'last'}) + if chunk: + yield chunk + chunk = ostream.flush() + if chunk: + yield chunk + + +def decode_dir(root, recipient=None, session=None): + for root, __, files in os.walk(root): + for filename in files: + if not filename.endswith(_FILENAME_SUFFIX): + continue + with file(join(root, filename), 'rb') as parcel: + for packet in decode(parcel): + if recipient is not None and packet['from'] == recipient: + if session and packet['session'] == session: + _logger.debug('Skip the same session %r parcel', + parcel.name) + else: + _logger.debug('Remove outdated %r parcel', + parcel.name) + os.unlink(parcel.name) + break + yield packet + + +def encode_dir(packets, root=None, limit=None, path=None, sender=None, + header=None): + if path is None: + if not exists(root): + os.makedirs(root) + path = toolkit.unique_filename(root, toolkit.uuid() + _FILENAME_SUFFIX) + if limit <= 0: + stat = os.statvfs(dirname(path)) + limit = stat.f_bfree * stat.f_frsize - _RESERVED_DISK_SPACE + if header is None: + header = {} + if sender is not None: + header['from'] = sender + + _logger.debug('Creating %r parcel limit=%s header=%r', path, limit, header) + + with toolkit.NamedTemporaryFile(dir=dirname(path)) as parcel: + for chunk in encode(packets, limit, header): + parcel.write(chunk) + coroutine.dispatch() + parcel.flush() + os.fsync(parcel.fileno()) + os.rename(parcel.name, path) + + +class _DecodeIterator(object): + + def __init__(self, stream): + self._stream = stream + self.props = {} + self._name = None + self._shift = True + + @property + def name(self): + return self._name + + def next(self): + if self._shift: + for __ in self: + pass + if self._name is None: + raise EOFError() + self._shift = True + + def __repr__(self): + return '' % self.props + + def __getitem__(self, key): + return self.props.get(key) + + def __iter__(self): + while True: + record = self._stream.read_record() + if record is None: + self._name = None + raise EOFError() + if 'packet' in record: + self._name = record['packet'] or '' + self.props = record + self._shift = False + break + blob_len = record.get('content-length') + if blob_len is None: + yield record + continue + blob_len = int(blob_len) + with toolkit.NamedTemporaryFile() as blob: + while blob_len: + chunk = self._stream.read(min(blob_len, BUFFER_SIZE)) + enforce(chunk, 'Blob size mismatch') + blob.write(chunk) + blob_len -= len(chunk) + blob.flush() + yield File(blob.name, meta=record) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + pass + + +class _ZipStream(object): + + def __init__(self, compresslevel=6): + self._zipper = zlib.compressobj(compresslevel, + zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0) + self._offset = 0 + self._size = 0 + self._crc = zlib.crc32('') & 0xffffffffL + + def write_record(self, record, limit=None): + chunk = json.dumps(record) + '\n' + if limit is not None and self._offset + len(chunk) > limit: + return None + return self.write(chunk) + + def write(self, chunk): + self._size += len(chunk) + self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL + chunk = self._zipper.compress(chunk) + + if self._offset == 0: + chunk = '\037\213' + '\010' + chr(0) + \ + struct.pack('= 8, http.BadRequest, + 'Malformed gzipped file') + crc = struct.unpack(' RECORD + assert len(stream) < RECORD * 2 + self.assertEqual(5, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 2.5, compresslevel=0)])) + assert len(stream) > RECORD * 2 + assert len(stream) < RECORD * 3 + self.assertEqual(6, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 3.5, compresslevel=0)])) + assert len(stream) > RECORD * 3 + assert len(stream) < RECORD * 4 + self.assertEqual(7, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 4.5, compresslevel=0)])) + assert len(stream) > RECORD * 4 + self.assertEqual(8, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + compresslevel=0)])) + assert len(stream) > RECORD * 4 + + def test_limited_encode_FinalRecords(self): + RECORD = 1024 * 1024 + + def content(): + try: + yield {'record': '.' * RECORD} + yield {'record': '.' * RECORD} + except StopIteration: + pass + yield None + yield {'record': '.' * RECORD} + yield {'record': '.' * RECORD} + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD / 2, compresslevel=0)])) + assert len(stream) > RECORD * 4 + assert len(stream) < RECORD * 5 + self.assertEqual(8, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 1.5, compresslevel=0)])) + assert len(stream) > RECORD * 5 + assert len(stream) < RECORD * 6 + self.assertEqual(9, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 2.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(10, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 3.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(10, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 4.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(10, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 5.5, compresslevel=0)])) + assert len(stream) > RECORD * 7 + assert len(stream) < RECORD * 8 + self.assertEqual(11, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 6.5, compresslevel=0)])) + assert len(stream) > RECORD * 8 + assert len(stream) < RECORD * 9 + self.assertEqual(12, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + compresslevel=0)])) + assert len(stream) > RECORD * 8 + assert len(stream) < RECORD * 9 + self.assertEqual(12, len(stream.strip().split('\n'))) + + def test_encode_Blobs(self): + self.touch(('a', 'a')) + self.touch(('b', 'bb')) + self.touch(('c', 'ccc')) + + stream = ''.join([i for i in parcel.encode([ + (1, None, [ + File('a', 'digest', [('num', 1)]), + File('b', 'digest', [('num', 2)]), + ]), + (2, None, [ + File('c', 'digest', [('num', 3)]), + ]), + ])]) + + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'num': 1}) + '\n' + + 'a' + '\n' + + json.dumps({'num': 2}) + '\n' + + 'bb' + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'num': 3}) + '\n' + + 'ccc' + '\n' + + json.dumps({'packet': 'last'}) + '\n', + unzips(stream)) + + def test_limited_encode_Blobs(self): + RECORD = 1024 * 1024 + self.touch(('blob', '.' * RECORD)) + + def content(): + yield File('blob', 'digest') + yield File('blob', 'digest') + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD / 2, compresslevel=0)])) + assert len(stream) < RECORD + self.assertEqual(4, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 1.5, compresslevel=0)])) + assert len(stream) > RECORD + assert len(stream) < RECORD * 2 + self.assertEqual(6, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 2.5, compresslevel=0)])) + assert len(stream) > RECORD * 2 + assert len(stream) < RECORD * 3 + self.assertEqual(8, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 3.5, compresslevel=0)])) + assert len(stream) > RECORD * 3 + assert len(stream) < RECORD * 4 + self.assertEqual(10, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 4.5, compresslevel=0)])) + assert len(stream) > RECORD * 4 + self.assertEqual(12, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + compresslevel=0)])) + assert len(stream) > RECORD * 4 + self.assertEqual(12, len(stream.strip().split('\n'))) + + def test_limited_encode_FinalBlobs(self): + RECORD = 1024 * 1024 + self.touch(('blob', '.' * RECORD)) + + def content(): + try: + yield File('blob', 'digest') + yield File('blob', 'digest') + except StopIteration: + pass + yield None + yield File('blob', 'digest') + yield File('blob', 'digest') + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD / 2, compresslevel=0)])) + assert len(stream) > RECORD * 4 + assert len(stream) < RECORD * 5 + self.assertEqual(12, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 1.5, compresslevel=0)])) + assert len(stream) > RECORD * 5 + assert len(stream) < RECORD * 6 + self.assertEqual(14, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 2.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(16, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 3.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(16, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 4.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(16, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 5.5, compresslevel=0)])) + assert len(stream) > RECORD * 7 + assert len(stream) < RECORD * 8 + self.assertEqual(18, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 6.5, compresslevel=0)])) + assert len(stream) > RECORD * 8 + assert len(stream) < RECORD * 9 + self.assertEqual(20, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + compresslevel=0)])) + assert len(stream) > RECORD * 8 + assert len(stream) < RECORD * 9 + self.assertEqual(20, len(stream.strip().split('\n'))) + + def test_decode_dir(self): + stream = zips( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'num': 1, 'content-length': '8'}) + '\n' + + 'content1' + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/1.parcel', stream.getvalue())) + + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'num': 2, 'content-length': '8'}) + '\n' + + 'content2' + '\n' + + json.dumps({'payload': 3}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/2.parcel', stream.getvalue())) + + packets_iter = parcel.decode_dir('parcels') + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertEqual({'packet': 2}, packet.props) + items = iter(packet) + blob = next(items) + self.assertEqual({'num': 2, 'content-length': '8'}, blob) + self.assertEqual('content2', file(blob.path).read()) + self.assertEqual({'payload': 3}, next(items)) + self.assertRaises(StopIteration, items.next) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + self.assertEqual({'foo': 'bar', 'packet': 1}, packet.props) + items = iter(packet) + self.assertEqual({'payload': 1}, next(items)) + blob = next(items) + self.assertEqual({'num': 1, 'content-length': '8'}, blob) + self.assertEqual('content1', file(blob.path).read()) + self.assertEqual({'payload': 2}, next(items)) + self.assertRaises(StopIteration, items.next) + self.assertRaises(StopIteration, packets_iter.next) + + def test_decode_dir_RemoveOutdatedParcels(self): + stream = zips( + json.dumps({'from': 'principal'}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/1.parcel', stream.getvalue())) + + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/2.parcel', stream.getvalue())) + + packets_iter = parcel.decode_dir('parcels', recipient='principal') + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertRaises(StopIteration, packets_iter.next) + assert not exists('parcels/1.parcel') + assert exists('parcels/2.parcel') + + stream = zips( + json.dumps({'from': 'principal', 'session': 'old'}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/3.parcel', stream.getvalue())) + + packets_iter = parcel.decode_dir('parcels', recipient='principal', session='new') + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertRaises(StopIteration, packets_iter.next) + assert not exists('parcels/1.parcel') + assert exists('parcels/2.parcel') + assert not exists('parcels/3.parcel') + + def test_decode_dir_SkipTheSameSessionParcels(self): + stream = zips( + json.dumps({'from': 'principal', 'session': 'new'}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/1.parcel', stream.getvalue())) + + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/2.parcel', stream.getvalue())) + + packets_iter = parcel.decode_dir('parcels', recipient='principal', session='new') + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertRaises(StopIteration, packets_iter.next) + assert exists('parcels/1.parcel') + assert exists('parcels/2.parcel') + + def test_encode_dir(self): + self.touch(('blob', 'content')) + parcel.encode_dir([ + (1, None, [ + {'payload': 1}, + File('blob', 'digest', [('num', 1)]), + {'payload': 2}, + ]), + (2, None, [ + File('blob', 'digest', [('num', 2)]), + {'payload': 3}, + ]), + ], path='./parcel') + + assert exists('parcel') + + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'num': 1}) + '\n' + + 'content' + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'num': 2}) + '\n' + + 'content' + '\n' + + json.dumps({'payload': 3}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + unzips(file('parcel').read())) + + +def zips(data): + result = StringIO() + f = gzip.GzipFile(fileobj=result, mode='wb') + f.write(data) + f.close() + result.seek(0) + return result + + +def unzips(data): + return gzip.GzipFile(fileobj=StringIO(data)).read() + + +if __name__ == '__main__': + tests.main() -- cgit v0.9.1