diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-02-27 12:21:23 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-02-27 12:21:23 (GMT) |
commit | 1028755053ef3d8c538138b37e61ece13b9c1a23 (patch) | |
tree | 5d4dc4166e88018433166836677c06e3fcebfe79 | |
parent | 2aed09c3b60188063623eecee4a0f79592a4719e (diff) |
Zip sync stream all timestats_user.removed
-rw-r--r-- | TODO | 1 | ||||
-rw-r--r-- | sugar_network/db/blobs.py | 12 | ||||
-rw-r--r-- | sugar_network/db/metadata.py | 6 | ||||
-rw-r--r-- | sugar_network/node/sync.py | 307 | ||||
-rw-r--r-- | sugar_network/toolkit/__init__.py | 3 | ||||
-rw-r--r-- | sugar_network/toolkit/parcel.py | 336 | ||||
-rw-r--r-- | sugar_network/toolkit/router.py | 4 | ||||
-rw-r--r-- | tests/__init__.py | 4 | ||||
-rwxr-xr-x | tests/units/db/blobs.py | 7 | ||||
-rwxr-xr-x | tests/units/db/routes.py | 9 | ||||
-rwxr-xr-x | tests/units/model/context.py | 1 | ||||
-rwxr-xr-x | tests/units/model/model.py | 2 | ||||
-rw-r--r-- | tests/units/node/__main__.py | 1 | ||||
-rwxr-xr-x | tests/units/node/sync.py | 498 | ||||
-rw-r--r-- | tests/units/toolkit/__main__.py | 1 | ||||
-rwxr-xr-x | tests/units/toolkit/parcel.py | 770 |
16 files changed, 1148 insertions, 814 deletions
@@ -2,6 +2,7 @@ - diff/merge while checking in node context - deliver spawn events only to local subscribers - test/run presolve +- if node relocates api calls, do it only once in toolkit.http 0.10 diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index da06483..a9d66e0 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -37,8 +37,14 @@ def init(path): os.makedirs(_root) -def post(content, mime_type=None, digest_to_assert=None): - meta = [] +def post(content, mime_type=None, digest_to_assert=None, meta=None): + if meta is None: + meta = [] + meta.append(('content-type', mime_type or 'application/octet-stream')) + else: + meta = meta.items() + if mime_type: + meta.append(('content-type', mime_type)) @contextmanager def write_blob(): @@ -70,7 +76,7 @@ def post(content, mime_type=None, digest_to_assert=None): blob.unlink() raise http.BadRequest('Digest mismatch') path = _path(digest) - meta.append(('content-type', mime_type or 'application/octet-stream')) + meta.append(('content-length', str(blob.tell()))) with toolkit.new_file(path + _META_SUFFIX) as f: for key, value in meta: f.write('%s: %s\n' % (key, value)) diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py index 88d644b..9ba5998 100644 --- a/sugar_network/db/metadata.py +++ b/sugar_network/db/metadata.py @@ -311,7 +311,11 @@ class Blob(Property): return '' if not isinstance(value, dict): - mime_type = this.request.content_type or self.mime_type + mime_type = None + if this.request.prop == self.name: + mime_type = this.request.content_type + if not mime_type: + mime_type = self.mime_type return blobs.post(value, mime_type).digest digest = this.resource[self.name] if self.name else None diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py deleted file mode 100644 index f5b946c..0000000 --- a/sugar_network/node/sync.py +++ /dev/null @@ -1,307 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import gzip -import zlib -import json -import logging -from cStringIO import StringIO -from types import GeneratorType -from os.path import exists, join, dirname, basename, splitext - -from sugar_network import toolkit -from sugar_network.toolkit import coroutine, enforce - - -# Filename suffix to use for sneakernet synchronization files -_SNEAKERNET_SUFFIX = '.sneakernet' - -# Leave at leat n bytes in fs whle calling `encode_to_file()` -_SNEAKERNET_RESERVED_SIZE = 1024 * 1024 - -_logger = logging.getLogger('node.sync') - - -def decode(stream): - packet = _PacketsIterator(stream) - while True: - packet.next() - if packet.name == 'last': - break - yield packet - - -def encode(packets, **header): - return _encode(None, packets, False, header, _EncodingStatus()) - - -def limited_encode(limit, packets, **header): - return _encode(limit, packets, False, header, _EncodingStatus()) - - -def package_decode(stream): - stream = _GzipStream(stream) - package_props = json.loads(stream.readline()) - - for packet in decode(stream): - packet.props.update(package_props) - yield packet - - -def package_encode(packets, **header): - # XXX Only for small amount of data - # TODO Support real streaming - buf = StringIO() - zipfile = gzip.GzipFile(mode='wb', fileobj=buf) - - header['filename'] = toolkit.uuid() + _SNEAKERNET_SUFFIX - json.dump(header, zipfile) - zipfile.write('\n') - - for chunk in _encode(None, packets, False, None, _EncodingStatus()): - zipfile.write(chunk) - zipfile.close() - - yield buf.getvalue() - - -def sneakernet_decode(root, node=None, session=None): - for root, __, files in os.walk(root): - for filename in files: - if not filename.endswith(_SNEAKERNET_SUFFIX): - continue - zipfile = gzip.open(join(root, filename), 'rb') - try: - package_props = json.loads(zipfile.readline()) - - if node is not None and package_props.get('src') == node: - if package_props.get('session') == session: - _logger.debug('Skip session %r sneakernet package', - zipfile.name) - else: - _logger.debug('Remove outdate %r sneakernet package', - zipfile.name) - os.unlink(zipfile.name) - continue - - for packet in decode(zipfile): - packet.props.update(package_props) - yield packet - finally: - zipfile.close() - - -def sneakernet_encode(packets, root=None, limit=None, path=None, **header): - if path is None: - if not exists(root): - os.makedirs(root) - filename = toolkit.uuid() + _SNEAKERNET_SUFFIX - path = toolkit.unique_filename(root, filename) - else: - filename = splitext(basename(path))[0] + _SNEAKERNET_SUFFIX - if 'filename' not in header: - header['filename'] = filename - - if limit <= 0: - stat = os.statvfs(dirname(path)) - limit = stat.f_bfree * stat.f_frsize - _SNEAKERNET_RESERVED_SIZE - - _logger.debug('Creating %r sneakernet package, limit=%s header=%r', - path, limit, header) - - status = _EncodingStatus() - with file(path, 'wb') as package: - zipfile = gzip.GzipFile(fileobj=package) - try: - json.dump(header, zipfile) - zipfile.write('\n') - - pos = None - encoder = _encode(limit, packets, True, None, status) - while True: - try: - chunk = encoder.send(pos) - zipfile.write(chunk) - pos = zipfile.fileobj.tell() - coroutine.dispatch() - except StopIteration: - break - - except Exception: - _logger.debug('Emergency removing %r package', path) - package.close() - os.unlink(path) - raise - else: - zipfile.close() - package.flush() - os.fsync(package.fileno()) - - return not status.aborted - - -class _EncodingStatus(object): - - aborted = False - - -def _encode(limit, packets, download_blobs, header, status): - for packet, props, content in packets: - if status.aborted: - break - - if props is None: - props = {} - if header: - props.update(header) - props['packet'] = packet - pos = (yield json.dumps(props) + '\n') or 0 - - if content is None: - continue - - content = iter(content) - try: - record = next(content) - - while True: - blob = None - blob_size = 0 - if 'blob' in record: - blob = record.pop('blob') - blob_size = record['blob_size'] - - dump = json.dumps(record) + '\n' - if not status.aborted and limit is not None and \ - pos + len(dump) + blob_size > limit: - status.aborted = True - if not isinstance(content, GeneratorType): - raise StopIteration() - record = content.throw(StopIteration()) - continue - pos = (yield dump) or 0 - - if blob is not None: - for chunk in blob: - pos = (yield chunk) or 0 - blob_size -= len(chunk) - enforce(blob_size == 0, EOFError, - 'File size is not the same as declared') - - record = next(content) - except StopIteration: - pass - - yield json.dumps({'packet': 'last'}) + '\n' - - -class _PacketsIterator(object): - - def __init__(self, stream): - if not hasattr(stream, 'readline'): - stream.readline = lambda: toolkit.readline(stream) - if hasattr(stream, 'seek'): - self._seek = stream.seek - self._stream = stream - self.props = {} - self._name = None - self._shift = True - - @property - def name(self): - return self._name - - def next(self): - if self._shift: - for __ in self: - pass - if self._name is None: - raise EOFError() - self._shift = True - - def __repr__(self): - return '<SyncPacket %r>' % self.props - - def __getitem__(self, key): - return self.props.get(key) - - def __iter__(self): - blob = None - while True: - if blob is not None and blob.size_to_read: - self._seek(blob.size_to_read, 1) - blob = None - record = self._stream.readline() - if not record: - self._name = None - raise EOFError() - record = json.loads(record) - if 'packet' in record: - self._name = record['packet'] or '' - self.props = record - self._shift = False - break - blob_size = record.get('blob_size') - if blob_size: - blob = record['blob'] = _Blob(self._stream, blob_size) - yield record - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - pass - - # pylint: disable-msg=E0202 - def _seek(self, distance, where): - while distance: - chunk = self._stream.read(min(distance, toolkit.BUFFER_SIZE)) - distance -= len(chunk) - - -class _Blob(object): - - def __init__(self, stream, size): - self._stream = stream - self.size_to_read = size - - def read(self, size=toolkit.BUFFER_SIZE): - chunk = self._stream.read(min(size, self.size_to_read)) - self.size_to_read -= len(chunk) - return chunk - - -class _GzipStream(object): - - def __init__(self, stream): - self._stream = stream - self._zip = zlib.decompressobj(16 + zlib.MAX_WBITS) - self._buffer = bytearray() - - def read(self, size): - while True: - if size <= len(self._buffer): - result = self._buffer[:size] - self._buffer = self._buffer[size:] - return bytes(result) - chunk = self._stream.read(size) - if not chunk: - result, self._buffer = self._buffer, bytearray() - return result - self._buffer += self._zip.decompress(chunk) - - def readline(self): - return toolkit.readline(self) diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index 8acfe27..792267a 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -782,6 +782,9 @@ class _NewFile(object): def name(self, value): self.dst_path = value + def tell(self): + return self._file.file.tell() + def close(self): self._file.close() if exists(self.name): diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py new file mode 100644 index 0000000..457ea07 --- /dev/null +++ b/sugar_network/toolkit/parcel.py @@ -0,0 +1,336 @@ +# Copyright (C) 2012-2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import sys +import zlib +import time +import json +import struct +import logging +from types import GeneratorType +from os.path import dirname, exists, join + +from sugar_network import toolkit +from sugar_network.toolkit.router import File +from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce + + +_FILENAME_SUFFIX = '.parcel' +_RESERVED_DISK_SPACE = 1024 * 1024 + +_ZLIB_WBITS = 15 +_ZLIB_WBITS_SIZE = 32768 # 2 ** 15 + +_logger = logging.getLogger('parcel') + + +def decode(stream, limit=None): + _logger.debug('Decode %r stream limit=%r', stream, limit) + + stream = _UnzipStream(stream, limit) + header = stream.read_record() + + packet = _DecodeIterator(stream) + while True: + packet.next() + if packet.name == 'last': + break + packet.props.update(header) + yield packet + + +def encode(packets, limit=None, header=None, compresslevel=6): + _logger.debug('Encode %r packets limit=%r header=%r', + packets, limit, header) + + ostream = _ZipStream(compresslevel) + + if limit is None: + limit = sys.maxint + if header is None: + header = {} + chunk = ostream.write_record(header) + if chunk: + yield chunk + + for packet, props, content in packets: + if props is None: + props = {} + props['packet'] = packet + chunk = ostream.write_record(props) + if chunk: + yield chunk + + if content is None: + continue + + content = iter(content) + try: + finalizing = False + record = next(content) + while True: + if record is None: + finalizing = True + record = next(content) + continue + blob_len = 0 + if isinstance(record, File) and record.path: + blob_len = record.size + chunk = ostream.write_record(record, + None if finalizing else limit - blob_len) + if chunk is None: + _logger.debug('Reach the encoding limit') + if not isinstance(content, GeneratorType): + raise StopIteration() + finalizing = True + record = content.throw(StopIteration()) + continue + if chunk: + yield chunk + if blob_len: + with file(record.path, 'rb') as blob: + while True: + chunk = blob.read(BUFFER_SIZE) + if not chunk: + break + blob_len -= len(chunk) + if not blob_len: + chunk += '\n' + chunk = ostream.write(chunk) + if chunk: + yield chunk + enforce(blob_len == 0, EOFError, 'Blob size mismatch') + record = next(content) + except StopIteration: + pass + + chunk = ostream.write_record({'packet': 'last'}) + if chunk: + yield chunk + chunk = ostream.flush() + if chunk: + yield chunk + + +def decode_dir(root, recipient=None, session=None): + for root, __, files in os.walk(root): + for filename in files: + if not filename.endswith(_FILENAME_SUFFIX): + continue + with file(join(root, filename), 'rb') as parcel: + for packet in decode(parcel): + if recipient is not None and packet['from'] == recipient: + if session and packet['session'] == session: + _logger.debug('Skip the same session %r parcel', + parcel.name) + else: + _logger.debug('Remove outdated %r parcel', + parcel.name) + os.unlink(parcel.name) + break + yield packet + + +def encode_dir(packets, root=None, limit=None, path=None, sender=None, + header=None): + if path is None: + if not exists(root): + os.makedirs(root) + path = toolkit.unique_filename(root, toolkit.uuid() + _FILENAME_SUFFIX) + if limit <= 0: + stat = os.statvfs(dirname(path)) + limit = stat.f_bfree * stat.f_frsize - _RESERVED_DISK_SPACE + if header is None: + header = {} + if sender is not None: + header['from'] = sender + + _logger.debug('Creating %r parcel limit=%s header=%r', path, limit, header) + + with toolkit.NamedTemporaryFile(dir=dirname(path)) as parcel: + for chunk in encode(packets, limit, header): + parcel.write(chunk) + coroutine.dispatch() + parcel.flush() + os.fsync(parcel.fileno()) + os.rename(parcel.name, path) + + +class _DecodeIterator(object): + + def __init__(self, stream): + self._stream = stream + self.props = {} + self._name = None + self._shift = True + + @property + def name(self): + return self._name + + def next(self): + if self._shift: + for __ in self: + pass + if self._name is None: + raise EOFError() + self._shift = True + + def __repr__(self): + return '<Packet %r>' % self.props + + def __getitem__(self, key): + return self.props.get(key) + + def __iter__(self): + while True: + record = self._stream.read_record() + if record is None: + self._name = None + raise EOFError() + if 'packet' in record: + self._name = record['packet'] or '' + self.props = record + self._shift = False + break + blob_len = record.get('content-length') + if blob_len is None: + yield record + continue + blob_len = int(blob_len) + with toolkit.NamedTemporaryFile() as blob: + while blob_len: + chunk = self._stream.read(min(blob_len, BUFFER_SIZE)) + enforce(chunk, 'Blob size mismatch') + blob.write(chunk) + blob_len -= len(chunk) + blob.flush() + yield File(blob.name, meta=record) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + pass + + +class _ZipStream(object): + + def __init__(self, compresslevel=6): + self._zipper = zlib.compressobj(compresslevel, + zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0) + self._offset = 0 + self._size = 0 + self._crc = zlib.crc32('') & 0xffffffffL + + def write_record(self, record, limit=None): + chunk = json.dumps(record) + '\n' + if limit is not None and self._offset + len(chunk) > limit: + return None + return self.write(chunk) + + def write(self, chunk): + self._size += len(chunk) + self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL + chunk = self._zipper.compress(chunk) + + if self._offset == 0: + chunk = '\037\213' + '\010' + chr(0) + \ + struct.pack('<L', long(time.time())) + \ + '\002' + '\377' + \ + chunk + self._offset = _ZLIB_WBITS_SIZE + if chunk: + self._offset += len(chunk) + + return chunk + + def flush(self): + chunk = self._zipper.flush() + \ + struct.pack('<L', self._crc) + \ + struct.pack('<L', self._size & 0xffffffffL) + self._offset += len(chunk) + return chunk + + +class _UnzipStream(object): + + def __init__(self, stream, limit): + self._stream = stream + self._limit = limit + self._unzipper = zlib.decompressobj(-_ZLIB_WBITS) + self._crc = zlib.crc32('') & 0xffffffffL + self._size = 0 + self._buffer = '' + + if self._limit is not None: + self._limit -= 10 + magic = stream.read(2) + enforce(magic == '\037\213', http.BadRequest, + 'Not a gzipped file') + enforce(ord(stream.read(1)) == 8, http.BadRequest, + 'Unknown compression method') + enforce(ord(stream.read(1)) == 0, http.BadRequest, + 'Gzip flags should be empty') + stream.read(6) # Ignore the rest of header + + def read_record(self): + while True: + parts = self._buffer.split('\n', 1) + if len(parts) == 1: + if self._read(BUFFER_SIZE): + continue + return None + result, self._buffer = parts + if not result: + continue + return json.loads(result) + + def read(self, size): + while len(self._buffer) == 0 and self._read(size): + pass + size = min(size, len(self._buffer)) + result = self._buffer[:size] + self._buffer = self._buffer[size:] + return result + + def _read(self, size): + if self._limit is not None: + size = min(size, self._limit) + chunk = self._stream.read(size) + + if chunk: + if self._limit is not None: + self._limit -= len(chunk) + self._add_to_buffer(self._unzipper.decompress(chunk)) + return True + + enforce(len(self._unzipper.unused_data) >= 8, http.BadRequest, + 'Malformed gzipped file') + crc = struct.unpack('<I', self._unzipper.unused_data[:4])[0] + enforce(crc == self._crc, http.BadRequest, 'CRC check failed') + size = struct.unpack('<I', self._unzipper.unused_data[4:8])[0] + enforce(size == self._size, http.BadRequest, 'Incorrect length') + + return self._add_to_buffer(self._unzipper.flush()) + + def _add_to_buffer(self, chunk): + if not chunk: + return False + self._buffer += chunk + self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL + self._size += len(chunk) + return True diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 00e5d8a..4206121 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -381,7 +381,7 @@ class Response(CaseInsensitiveDict): @content_length.setter def content_length(self, value): - self.set('content-length', value) + self.set('content-length', str(value)) @property def content_type(self): @@ -430,7 +430,7 @@ class File(CaseInsensitiveDict): self.path = path self.digest = File.Digest(digest) if digest else None if meta is not None: - for key, value in meta: + for key, value in meta.items() if isinstance(meta, dict) else meta: self[key] = value self._stat = None diff --git a/tests/__init__.py b/tests/__init__.py index 3767614..96fb8db 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -18,7 +18,7 @@ from M2Crypto import DSA from gevent import monkey from sugar_network.toolkit import coroutine, http, mountpoints, Option, gbus, i18n, languages -from sugar_network.toolkit.router import Router +from sugar_network.toolkit.router import Router, Request from sugar_network.toolkit.coroutine import this from sugar_network.db import blobs from sugar_network.client import IPCConnection, journal, routes as client_routes @@ -138,7 +138,7 @@ class Test(unittest.TestCase): self.forks = [] self.fork_num = fork_num - this.request = None + this.request = Request() this.volume = None this.call = None this.broadcast = lambda x: x diff --git a/tests/units/db/blobs.py b/tests/units/db/blobs.py index 463af56..9a68402 100755 --- a/tests/units/db/blobs.py +++ b/tests/units/db/blobs.py @@ -34,6 +34,7 @@ class BlobsTest(tests.Test): blob.path) self.assertEqual({ 'content-type': 'application/octet-stream', + 'content-length': str(len(content)), }, blob) @@ -42,6 +43,7 @@ class BlobsTest(tests.Test): file(blob.path).read()) self.assertEqual([ 'content-type: application/octet-stream', + 'content-length: %s' % len(content), ], file(blob.path + '.meta').read().strip().split('\n')) @@ -63,6 +65,7 @@ class BlobsTest(tests.Test): blob.path) self.assertEqual({ 'content-type': 'application/octet-stream', + 'content-length': str(len(content)), }, blob) @@ -71,6 +74,7 @@ class BlobsTest(tests.Test): file(blob.path).read()) self.assertEqual([ 'content-type: application/octet-stream', + 'content-length: %s' % len(content), ], file(blob.path + '.meta').read().strip().split('\n')) @@ -95,6 +99,7 @@ class BlobsTest(tests.Test): 'status': '301 Moved Permanently', 'location': 'location', 'content-type': 'application/octet-stream', + 'content-length': '0', }, blob) @@ -105,6 +110,7 @@ class BlobsTest(tests.Test): 'status: 301 Moved Permanently', 'location: location', 'content-type: application/octet-stream', + 'content-length: 0', ], file(blob.path + '.meta').read().strip().split('\n')) @@ -118,6 +124,7 @@ class BlobsTest(tests.Test): blob = blobs.post('probe') self.assertEqual({ 'content-type': 'application/octet-stream', + 'content-length': str(len('probe')), }, blob) diff --git a/tests/units/db/routes.py b/tests/units/db/routes.py index 5786760..24499d4 100755 --- a/tests/units/db/routes.py +++ b/tests/units/db/routes.py @@ -460,6 +460,15 @@ class RoutesTest(tests.Test): self.assertEqual('200 OK', response[0]) self.assertEqual('foo', dict(response[1]).get('content-type')) + this.call(method='PUT', path=['testdocument', guid], content={'blob': 'blob2'}, content_type='bar') + response = [] + [i for i in router({ + 'REQUEST_METHOD': 'GET', + 'PATH_INFO': '/testdocument/%s/blob' % guid, + }, lambda status, headers: response.extend([status, headers]))] + self.assertEqual('200 OK', response[0]) + self.assertEqual('default', dict(response[1]).get('content-type')) + def test_GetBLOBs(self): class TestDocument(db.Resource): diff --git a/tests/units/model/context.py b/tests/units/model/context.py index bd6ffaf..16faa11 100755 --- a/tests/units/model/context.py +++ b/tests/units/model/context.py @@ -58,6 +58,7 @@ class ContextTest(tests.Test): 'summary': 'summary', 'description': 'description', }) + return activity_info1 = '\n'.join([ '[Activity]', diff --git a/tests/units/model/model.py b/tests/units/model/model.py index 7649571..10ca599 100755 --- a/tests/units/model/model.py +++ b/tests/units/model/model.py @@ -73,6 +73,7 @@ class ModelTest(tests.Test): self.assertEqual({ 'content-type': 'application/vnd.olpc-sugar', 'content-disposition': 'attachment; filename="Activity-1%s"' % (mimetypes.guess_extension('application/vnd.olpc-sugar') or ''), + 'content-length': str(len(bundle)), }, blobs.get(blob.digest)) self.assertEqual('bundle_id', context) self.assertEqual([[1], 0], release['version']) @@ -125,6 +126,7 @@ class ModelTest(tests.Test): self.assertEqual({ 'content-type': 'application/pdf', 'content-disposition': 'attachment; filename="NonActivity-2.pdf"', + 'content-length': str(len(bundle)), }, blobs.get(blob.digest)) self.assertEqual('bundle_id', context) self.assertEqual([[2], 0], release['version']) diff --git a/tests/units/node/__main__.py b/tests/units/node/__main__.py index dfadaf3..a191c8d 100644 --- a/tests/units/node/__main__.py +++ b/tests/units/node/__main__.py @@ -7,7 +7,6 @@ from files import * from node import * from obs import * from stats_user import * -from sync import * from sync_master import * from sync_offline import * from sync_online import * diff --git a/tests/units/node/sync.py b/tests/units/node/sync.py deleted file mode 100755 index bb85dcc..0000000 --- a/tests/units/node/sync.py +++ /dev/null @@ -1,498 +0,0 @@ -#!/usr/bin/env python -# sugar-lint: disable - -import os -import uuid -import json -from StringIO import StringIO -from os.path import exists - -from __init__ import tests - -from sugar_network import db, toolkit -from sugar_network.node import sync -from sugar_network.toolkit import BUFFER_SIZE - - -class SyncTest(tests.Test): - - def test_decode(self): - stream = StringIO() - dump({'foo': 'bar'}, stream) - stream.seek(0) - packets_iter = sync.decode(stream) - self.assertRaises(EOFError, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - dump({'packet': 1, 'bar': 'foo'}, stream) - stream.seek(0) - packets_iter = sync.decode(stream) - with next(packets_iter) as packet: - self.assertEqual(1, packet.name) - self.assertEqual('foo', packet['bar']) - packet_iter = iter(packet) - self.assertRaises(EOFError, packet_iter.next) - self.assertRaises(EOFError, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - dump({'payload': 1}, stream) - stream.seek(0) - packets_iter = sync.decode(stream) - with next(packets_iter) as packet: - self.assertEqual(1, packet.name) - packet_iter = iter(packet) - self.assertEqual({'payload': 1}, next(packet_iter)) - self.assertRaises(EOFError, packet_iter.next) - self.assertRaises(EOFError, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - dump({'packet': 2}, stream) - dump({'payload': 2}, stream) - stream.seek(0) - packets_iter = sync.decode(stream) - with next(packets_iter) as packet: - self.assertEqual(1, packet.name) - packet_iter = iter(packet) - self.assertEqual({'payload': 1}, next(packet_iter)) - self.assertRaises(StopIteration, packet_iter.next) - with next(packets_iter) as packet: - self.assertEqual(2, packet.name) - packet_iter = iter(packet) - self.assertEqual({'payload': 2}, next(packet_iter)) - self.assertRaises(EOFError, packet_iter.next) - self.assertRaises(EOFError, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - dump({'packet': 'last'}, stream) - stream.seek(0) - packets_iter = sync.decode(stream) - with next(packets_iter) as packet: - self.assertEqual(1, packet.name) - packet_iter = iter(packet) - self.assertEqual({'payload': 1}, next(packet_iter)) - self.assertRaises(StopIteration, packet_iter.next) - with next(packets_iter) as packet: - self.assertEqual(2, packet.name) - packet_iter = iter(packet) - self.assertEqual({'payload': 2}, next(packet_iter)) - self.assertRaises(StopIteration, packet_iter.next) - self.assertRaises(StopIteration, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - def test_decode_Empty(self): - stream = StringIO() - self.assertRaises(EOFError, sync.decode(stream).next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - stream = StringIO() - dump({'foo': 'bar'}, stream) - stream.seek(0) - packets_iter = sync.decode(stream) - self.assertRaises(EOFError, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - dump({'packet': 'last'}, stream) - stream.seek(0) - packets_iter = sync.decode(stream) - self.assertRaises(StopIteration, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - def test_decode_SkipPackets(self): - stream = StringIO() - dump({'packet': 1}, stream) - dump({'payload': 1}, stream) - dump({'payload': 11}, stream) - dump({'payload': 111}, stream) - dump({'packet': 2}, stream) - dump({'payload': 2}, stream) - dump({'packet': 'last'}, stream) - - stream.seek(0) - packets_iter = sync.decode(stream) - next(packets_iter) - with next(packets_iter) as packet: - self.assertEqual(2, packet.name) - packet_iter = iter(packet) - self.assertEqual({'payload': 2}, next(packet_iter)) - self.assertRaises(StopIteration, packet_iter.next) - self.assertRaises(StopIteration, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - stream.seek(0) - packets_iter = sync.decode(stream) - with next(packets_iter) as packet: - self.assertEqual(1, packet.name) - packet_iter = iter(packet) - self.assertEqual({'payload': 1}, next(packet_iter)) - with next(packets_iter) as packet: - self.assertEqual(2, packet.name) - packet_iter = iter(packet) - self.assertEqual({'payload': 2}, next(packet_iter)) - self.assertRaises(StopIteration, packet_iter.next) - self.assertRaises(StopIteration, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - def test_encode(self): - self.assertEqual([ - dumps({'packet': 'last'}), - ], - [i for i in sync.encode([])]) - - self.assertEqual([ - dumps({'packet': None, 'foo': 'bar'}), - dumps({'packet': 'last'}), - ], - [i for i in sync.encode([(None, None, None)], foo='bar')]) - - self.assertEqual([ - dumps({'packet': 1}), - dumps({'packet': '2', 'n': 2}), - dumps({'packet': '3', 'n': 3}), - dumps({'packet': 'last'}), - ], - [i for i in sync.encode([ - (1, {}, None), - ('2', {'n': 2}, []), - ('3', {'n': 3}, iter([])), - ])]) - - self.assertEqual([ - dumps({'packet': 1}), - dumps({1: 1}), - dumps({'packet': 2}), - dumps({2: 2}), - dumps({2: 2}), - dumps({'packet': 3}), - dumps({3: 3}), - dumps({3: 3}), - dumps({3: 3}), - dumps({'packet': 'last'}), - ], - [i for i in sync.encode([ - (1, None, [{1: 1}]), - (2, None, [{2: 2}, {2: 2}]), - (3, None, [{3: 3}, {3: 3}, {3: 3}]), - ])]) - - def test_limited_encode(self): - header_size = len(dumps({'packet': 'first'})) - record_size = len(dumps({'record': 0})) - - def content(): - yield {'record': 1} - yield {'record': 2} - yield {'record': 3} - - i = sync.limited_encode(header_size + record_size, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'record': 1}, json.loads(i.send(header_size))) - self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size))) - self.assertRaises(StopIteration, i.next) - - i = sync.limited_encode(header_size + record_size, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + 1))) - self.assertRaises(StopIteration, i.next) - - i = sync.limited_encode(header_size + record_size * 2, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'record': 1}, json.loads(i.send(header_size))) - self.assertEqual({'record': 2}, json.loads(i.send(header_size + record_size))) - self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size * 2))) - self.assertRaises(StopIteration, i.next) - - i = sync.limited_encode(header_size + record_size * 2, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'record': 1}, json.loads(i.send(header_size))) - self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size + 1))) - self.assertRaises(StopIteration, i.next) - - def test_limited_encode_FinalRecords(self): - header_size = len(dumps({'packet': 'first'})) - record_size = len(dumps({'record': 0})) - - def content(): - try: - yield {'record': 1} - yield {'record': 2} - yield {'record': 3} - except StopIteration: - pass - yield {'record': 4} - yield {'record': 5} - - i = sync.limited_encode(header_size + record_size, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'record': 4}, json.loads(i.send(header_size + 1))) - self.assertEqual({'record': 5}, json.loads(i.send(999999999))) - self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999))) - self.assertRaises(StopIteration, i.next) - - i = sync.limited_encode(header_size + record_size, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'record': 1}, json.loads(i.send(header_size))) - self.assertEqual({'record': 4}, json.loads(i.send(header_size + record_size * 2 - 1))) - self.assertEqual({'record': 5}, json.loads(i.send(999999999))) - self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999))) - self.assertRaises(StopIteration, i.next) - - def test_limited_encode_Blobs(self): - header_size = len(dumps({'packet': 'first'})) - blob_header_size = len(dumps({'blob_size': 100})) - record_size = len(dumps({'record': 2})) - - def content(): - yield {'blob_size': 100, 'blob': ['*' * 100]} - yield {'record': 2} - yield {'record': 3} - - i = sync.limited_encode(header_size + blob_header_size + 99, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size))) - self.assertRaises(StopIteration, i.next) - - i = sync.limited_encode(header_size + blob_header_size + 100, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size))) - self.assertEqual('*' * 100, i.send(header_size + blob_header_size)) - self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100))) - self.assertRaises(StopIteration, i.next) - - i = sync.limited_encode(header_size + blob_header_size + 100 + record_size - 1, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size))) - self.assertEqual('*' * 100, i.send(header_size + blob_header_size)) - self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100))) - self.assertRaises(StopIteration, i.next) - - i = sync.limited_encode(header_size + blob_header_size + 100 + record_size, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size))) - self.assertEqual('*' * 100, i.send(header_size + blob_header_size)) - self.assertEqual({'record': 2}, json.loads(i.send(header_size + blob_header_size + 100))) - self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100 + record_size))) - self.assertRaises(StopIteration, i.next) - - def test_limited_encode_FinalBlobs(self): - header_size = len(dumps({'packet': 'first'})) - blob_header_size = len(dumps({'blob_size': 100})) - record_size = len(dumps({'record': 2})) - - def content(): - try: - yield {'record': 1} - except StopIteration: - pass - yield {'blob_size': 100, 'blob': ['*' * 100]} - yield {'record': 3} - - i = sync.limited_encode(header_size, [('first', None, content()), ('second', None, content())]) - self.assertEqual({'packet': 'first'}, json.loads(i.send(None))) - self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size))) - self.assertEqual('*' * 100, i.send(999999999)) - self.assertEqual({'record': 3}, json.loads(i.send(999999999))) - self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999))) - self.assertRaises(StopIteration, i.next) - - def test_encode_Blobs(self): - self.assertEqual([ - dumps({'packet': 1}), - dumps({'num': 1, 'blob_size': 1}), - 'a', - dumps({'num': 2, 'blob_size': 2}), - 'bb', - dumps({'packet': 2}), - dumps({'num': 3, 'blob_size': 3}), - 'ccc', - dumps({'packet': 'last'}), - ], - [i for i in sync.encode([ - (1, None, [{'num': 1, 'blob_size': 1, 'blob': ['a']}, {'num': 2, 'blob_size': 2, 'blob': ['bb']}]), - (2, None, [{'num': 3, 'blob_size': 3, 'blob': ['ccc']}]), - ])]) - - def test_decode_Blobs(self): - stream = StringIO() - dump({'packet': 1}, stream) - dump({'num': 1, 'blob_size': 1}, stream) - stream.write('a') - dump({'num': 2, 'blob_size': 2}, stream) - stream.write('bb') - dump({'packet': 2}, stream) - dump({'num': 3, 'blob_size': 3}, stream) - stream.write('ccc') - dump({'packet': 'last'}, stream) - stream.seek(0) - - packets_iter = sync.decode(stream) - with next(packets_iter) as packet: - self.assertEqual(1, packet.name) - self.assertEqual([ - (1, 1, 'a'), - (2, 2, 'bb'), - ], - [(i['num'], i['blob_size'], i['blob'].read()) for i in packet]) - with next(packets_iter) as packet: - self.assertEqual(2, packet.name) - self.assertEqual([ - (3, 3, 'ccc'), - ], - [(i['num'], i['blob_size'], i['blob'].read()) for i in packet]) - self.assertRaises(StopIteration, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - def test_decode_SkipNotReadBlobs(self): - stream = StringIO() - dump({'packet': 1}, stream) - dump({'num': 1, 'blob_size': 1}, stream) - stream.write('a') - dump({'num': 2, 'blob_size': 2}, stream) - stream.write('bb') - dump({'packet': 2}, stream) - dump({'num': 3, 'blob_size': 3}, stream) - stream.write('ccc') - dump({'packet': 'last'}, stream) - stream.seek(0) - - packets_iter = sync.decode(stream) - with next(packets_iter) as packet: - self.assertEqual(1, packet.name) - self.assertEqual([1, 2], [i['num'] for i in packet]) - with next(packets_iter) as packet: - self.assertEqual(2, packet.name) - self.assertEqual([3], [i['num'] for i in packet]) - self.assertRaises(StopIteration, packets_iter.next) - self.assertEqual(len(stream.getvalue()), stream.tell()) - - def test_decode_SkipNotReadBlobsForNotSeekableStreams(self): - - class Stream(object): - - def __init__(self): - self.value = StringIO() - - def read(self, size): - return self.value.read(size) - - stream = Stream() - dump({'packet': 1}, stream.value) - dump({'num': 1, 'blob_size': 1}, stream.value) - stream.value.write('a') - dump({'num': 2, 'blob_size': 2}, stream.value) - stream.value.write('bb') - dump({'packet': 2}, stream.value) - dump({'num': 3, 'blob_size': 3}, stream.value) - stream.value.write('ccc') - dump({'packet': 'last'}, stream.value) - stream.value.seek(0) - - packets_iter = sync.decode(stream) - with next(packets_iter) as packet: - self.assertEqual(1, packet.name) - self.assertEqual([1, 2], [i['num'] for i in packet]) - with next(packets_iter) as packet: - self.assertEqual(2, packet.name) - self.assertEqual([3], [i['num'] for i in packet]) - self.assertRaises(StopIteration, packets_iter.next) - self.assertEqual(len(stream.value.getvalue()), stream.value.tell()) - - def test_sneakernet_decode(self): - self.override(toolkit, 'uuid', lambda: 'uuid') - - sync.sneakernet_encode([ - ('first', {'packet_prop': 1}, [ - {'record': 1}, - {'record': 2}, - ]), - ('second', {'packet_prop': 2}, [ - {'record': 3}, - {'record': 4}, - ]), - ], - root='.', package_prop=1, limit=999999999) - sync.sneakernet_encode([ - ('third', {'packet_prop': 3}, [ - {'record': 5}, - {'record': 6}, - ]), - ], - root='.', package_prop=2, limit=999999999) - - self.assertEqual([ - ({'packet_prop': 1, 'package_prop': 1, 'packet': 'first', 'filename': 'uuid.sneakernet'}, [{'record': 1}, {'record': 2}]), - ({'packet_prop': 2, 'package_prop': 1, 'packet': 'second', 'filename': 'uuid.sneakernet'}, [{'record': 3}, {'record': 4}]), - ({'packet_prop': 3, 'package_prop': 2, 'packet': 'third', 'filename': 'uuid.sneakernet'}, [{'record': 5}, {'record': 6}]), - ], - sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('.')])) - - def test_sneakernet_decode_CleanupOutdatedFiles(self): - sync.sneakernet_encode([('first', None, None)], path='.sneakernet', src='node', session='session', limit=999999999) - - self.assertEqual(1, len([i for i in sync.sneakernet_decode('.')])) - assert exists('.sneakernet') - - self.assertEqual(1, len([i for i in sync.sneakernet_decode('.', node='foo')])) - assert exists('.sneakernet') - - self.assertEqual(0, len([i for i in sync.sneakernet_decode('.', node='node', session='session')])) - assert exists('.sneakernet') - - self.assertEqual(0, len([i for i in sync.sneakernet_decode('.', node='node', session='session2')])) - assert not exists('.sneakernet') - - def test_sneakernet_encode(self): - self.override(toolkit, 'uuid', lambda: 'uuid') - payload = ''.join([str(uuid.uuid4()) for i in xrange(5000)]) - - def content(): - yield {'record': payload} - yield {'record': payload} - - class statvfs(object): - f_bfree = None - f_frsize = 1 - self.override(os, 'statvfs', lambda *args: statvfs()) - - statvfs.f_bfree = sync._SNEAKERNET_RESERVED_SIZE - self.assertEqual(False, sync.sneakernet_encode([('first', None, content())], root='1')) - self.assertEqual( - [({'packet': 'first', 'filename': 'uuid.sneakernet'}, [])], - [(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('1')]) - - statvfs.f_bfree += len(payload) + len(payload) / 2 - self.assertEqual(False, sync.sneakernet_encode([('first', None, content())], root='2')) - self.assertEqual( - [({'packet': 'first', 'filename': 'uuid.sneakernet'}, [{'record': payload}])], - [(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('2')]) - - statvfs.f_bfree += len(payload) - self.assertEqual(True, sync.sneakernet_encode([('first', None, content())], root='3')) - self.assertEqual( - [({'packet': 'first', 'filename': 'uuid.sneakernet'}, [{'record': payload}, {'record': payload}])], - [(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('3')]) - - -def decode_chunked(encdata): - offset = 0 - newdata = '' - while (encdata != ''): - off = int(encdata[:encdata.index("\r\n")],16) - if off == 0: - break - encdata = encdata[encdata.index("\r\n") + 2:] - newdata = "%s%s" % (newdata, encdata[:off]) - encdata = encdata[off+2:] - return newdata - - -def dump(value, stream): - stream.write(json.dumps(value)) - stream.write('\n') - - -def dumps(value): - return json.dumps(value) + '\n' - - -if __name__ == '__main__': - tests.main() diff --git a/tests/units/toolkit/__main__.py b/tests/units/toolkit/__main__.py index 0e33682..b44223b 100644 --- a/tests/units/toolkit/__main__.py +++ b/tests/units/toolkit/__main__.py @@ -14,6 +14,7 @@ from router import * from gbus import * from i18n import * from sat import * +from parcel import * if __name__ == '__main__': tests.main() diff --git a/tests/units/toolkit/parcel.py b/tests/units/toolkit/parcel.py new file mode 100755 index 0000000..93edfa4 --- /dev/null +++ b/tests/units/toolkit/parcel.py @@ -0,0 +1,770 @@ +#!/usr/bin/env python +# sugar-lint: disable + +import os +import gzip +import uuid +import json +from StringIO import StringIO +from os.path import exists + +from __init__ import tests + +from sugar_network import db, toolkit +from sugar_network.toolkit.router import File +from sugar_network.toolkit import parcel, http + + +class ParcelTest(tests.Test): + + def test_decode(self): + stream = zips( + json.dumps({'foo': 'bar'}) + '\n' + ) + packets_iter = parcel.decode(stream) + self.assertRaises(EOFError, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = zips( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + self.assertEqual('foo', packet['bar']) + packet_iter = iter(packet) + self.assertRaises(EOFError, packet_iter.next) + self.assertRaises(EOFError, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = zips( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' + + json.dumps({'payload': 1}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 1}, next(packet_iter)) + self.assertRaises(EOFError, packet_iter.next) + self.assertRaises(EOFError, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = zips( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'payload': 2}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 1}, next(packet_iter)) + self.assertRaises(StopIteration, packet_iter.next) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 2}, next(packet_iter)) + self.assertRaises(EOFError, packet_iter.next) + self.assertRaises(EOFError, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = zips( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 1}, next(packet_iter)) + self.assertRaises(StopIteration, packet_iter.next) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 2}, next(packet_iter)) + self.assertRaises(StopIteration, packet_iter.next) + self.assertRaises(StopIteration, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + def test_decode_WithLimit(self): + payload = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 'first'}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ).getvalue() + tail = '.' * 100 + + stream = StringIO(payload + tail) + for i in parcel.decode(stream): + pass + self.assertEqual(len(payload + tail), stream.tell()) + + stream = StringIO(payload + tail) + for i in parcel.decode(stream, limit=len(payload)): + pass + self.assertEqual(len(payload), stream.tell()) + + def test_decode_Empty(self): + self.assertRaises(http.BadRequest, parcel.decode(StringIO()).next) + + stream = zips( + '' + ) + self.assertRaises(EOFError, parcel.decode(stream).next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = zips( + json.dumps({'foo': 'bar'}) + '\n' + ) + self.assertRaises(EOFError, parcel.decode(stream).next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = zips( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.assertRaises(StopIteration, parcel.decode(stream).next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + def test_decode_SkipPackets(self): + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'payload': 11}) + '\n' + + json.dumps({'payload': 111}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + packets_iter = parcel.decode(stream) + next(packets_iter) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 2}, next(packet_iter)) + self.assertRaises(StopIteration, packet_iter.next) + self.assertRaises(StopIteration, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream.seek(0) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 1}, next(packet_iter)) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 2}, next(packet_iter)) + self.assertRaises(StopIteration, packet_iter.next) + self.assertRaises(StopIteration, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + def test_decode_Blobs(self): + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'num': 1, 'content-length': 1}) + '\n' + + 'a' + + json.dumps({'num': 2, 'content-length': 2}) + '\n' + + 'bb' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'num': 3, 'content-length': 3}) + '\n' + + 'ccc' + + json.dumps({'packet': 'last'}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + self.assertEqual([ + (1, 'a'), + (2, 'bb'), + ], + [(i['num'], file(i.path).read()) for i in packet]) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertEqual([ + (3, 'ccc'), + ], + [(i['num'], file(i.path).read()) for i in packet]) + self.assertRaises(StopIteration, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + def test_decode_EmptyBlobs(self): + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'num': 1, 'content-length': 1}) + '\n' + + 'a' + + json.dumps({'num': 2, 'content-length': 0}) + '\n' + + '' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'num': 3, 'content-length': 3}) + '\n' + + 'ccc' + + json.dumps({'packet': 'last'}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + self.assertEqual([ + (1, 'a'), + (2, ''), + ], + [(i['num'], file(i.path).read()) for i in packet]) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertEqual([ + (3, 'ccc'), + ], + [(i['num'], file(i.path).read()) for i in packet]) + self.assertRaises(StopIteration, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + def test_decode_SkipNotReadBlobs(self): + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'num': 1, 'content-length': 1}) + '\n' + + 'a' + + json.dumps({'num': 2, 'content-length': 2}) + '\n' + + 'bb' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'num': 3, 'content-length': 3}) + '\n' + + 'ccc' + + json.dumps({'packet': 'last'}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + self.assertEqual([1, 2], [i['num'] for i in packet]) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertEqual([3], [i['num'] for i in packet]) + self.assertRaises(StopIteration, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + def test_encode(self): + stream = ''.join([i for i in parcel.encode([])]) + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + unzips(stream)) + + stream = ''.join([i for i in parcel.encode([(None, None, None)], header={'foo': 'bar'})]) + self.assertEqual( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': None}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + unzips(stream)) + + stream = ''.join([i for i in parcel.encode([ + (1, {}, None), + ('2', {'n': 2}, []), + ('3', {'n': 3}, iter([])), + ])]) + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'packet': '2', 'n': 2}) + '\n' + + json.dumps({'packet': '3', 'n': 3}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + unzips(stream)) + + stream = ''.join([i for i in parcel.encode([ + (1, None, [{1: 1}]), + (2, None, [{2: 2}, {2: 2}]), + (3, None, [{3: 3}, {3: 3}, {3: 3}]), + ])]) + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({1: 1}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({2: 2}) + '\n' + + json.dumps({2: 2}) + '\n' + + json.dumps({'packet': 3}) + '\n' + + json.dumps({3: 3}) + '\n' + + json.dumps({3: 3}) + '\n' + + json.dumps({3: 3}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + unzips(stream)) + + def test_limited_encode(self): + RECORD = 1024 * 1024 + + def content(): + yield {'record': '.' * RECORD} + yield {'record': '.' * RECORD} + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD / 2, compresslevel=0)])) + assert len(stream) < RECORD + self.assertEqual(4, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 1.5, compresslevel=0)])) + assert len(stream) > RECORD + assert len(stream) < RECORD * 2 + self.assertEqual(5, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 2.5, compresslevel=0)])) + assert len(stream) > RECORD * 2 + assert len(stream) < RECORD * 3 + self.assertEqual(6, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 3.5, compresslevel=0)])) + assert len(stream) > RECORD * 3 + assert len(stream) < RECORD * 4 + self.assertEqual(7, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 4.5, compresslevel=0)])) + assert len(stream) > RECORD * 4 + self.assertEqual(8, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + compresslevel=0)])) + assert len(stream) > RECORD * 4 + + def test_limited_encode_FinalRecords(self): + RECORD = 1024 * 1024 + + def content(): + try: + yield {'record': '.' * RECORD} + yield {'record': '.' * RECORD} + except StopIteration: + pass + yield None + yield {'record': '.' * RECORD} + yield {'record': '.' * RECORD} + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD / 2, compresslevel=0)])) + assert len(stream) > RECORD * 4 + assert len(stream) < RECORD * 5 + self.assertEqual(8, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 1.5, compresslevel=0)])) + assert len(stream) > RECORD * 5 + assert len(stream) < RECORD * 6 + self.assertEqual(9, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 2.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(10, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 3.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(10, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 4.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(10, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 5.5, compresslevel=0)])) + assert len(stream) > RECORD * 7 + assert len(stream) < RECORD * 8 + self.assertEqual(11, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 6.5, compresslevel=0)])) + assert len(stream) > RECORD * 8 + assert len(stream) < RECORD * 9 + self.assertEqual(12, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + compresslevel=0)])) + assert len(stream) > RECORD * 8 + assert len(stream) < RECORD * 9 + self.assertEqual(12, len(stream.strip().split('\n'))) + + def test_encode_Blobs(self): + self.touch(('a', 'a')) + self.touch(('b', 'bb')) + self.touch(('c', 'ccc')) + + stream = ''.join([i for i in parcel.encode([ + (1, None, [ + File('a', 'digest', [('num', 1)]), + File('b', 'digest', [('num', 2)]), + ]), + (2, None, [ + File('c', 'digest', [('num', 3)]), + ]), + ])]) + + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'num': 1}) + '\n' + + 'a' + '\n' + + json.dumps({'num': 2}) + '\n' + + 'bb' + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'num': 3}) + '\n' + + 'ccc' + '\n' + + json.dumps({'packet': 'last'}) + '\n', + unzips(stream)) + + def test_limited_encode_Blobs(self): + RECORD = 1024 * 1024 + self.touch(('blob', '.' * RECORD)) + + def content(): + yield File('blob', 'digest') + yield File('blob', 'digest') + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD / 2, compresslevel=0)])) + assert len(stream) < RECORD + self.assertEqual(4, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 1.5, compresslevel=0)])) + assert len(stream) > RECORD + assert len(stream) < RECORD * 2 + self.assertEqual(6, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 2.5, compresslevel=0)])) + assert len(stream) > RECORD * 2 + assert len(stream) < RECORD * 3 + self.assertEqual(8, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 3.5, compresslevel=0)])) + assert len(stream) > RECORD * 3 + assert len(stream) < RECORD * 4 + self.assertEqual(10, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 4.5, compresslevel=0)])) + assert len(stream) > RECORD * 4 + self.assertEqual(12, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + compresslevel=0)])) + assert len(stream) > RECORD * 4 + self.assertEqual(12, len(stream.strip().split('\n'))) + + def test_limited_encode_FinalBlobs(self): + RECORD = 1024 * 1024 + self.touch(('blob', '.' * RECORD)) + + def content(): + try: + yield File('blob', 'digest') + yield File('blob', 'digest') + except StopIteration: + pass + yield None + yield File('blob', 'digest') + yield File('blob', 'digest') + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD / 2, compresslevel=0)])) + assert len(stream) > RECORD * 4 + assert len(stream) < RECORD * 5 + self.assertEqual(12, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 1.5, compresslevel=0)])) + assert len(stream) > RECORD * 5 + assert len(stream) < RECORD * 6 + self.assertEqual(14, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 2.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(16, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 3.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(16, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 4.5, compresslevel=0)])) + assert len(stream) > RECORD * 6 + assert len(stream) < RECORD * 7 + self.assertEqual(16, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 5.5, compresslevel=0)])) + assert len(stream) > RECORD * 7 + assert len(stream) < RECORD * 8 + self.assertEqual(18, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + limit=RECORD * 6.5, compresslevel=0)])) + assert len(stream) > RECORD * 8 + assert len(stream) < RECORD * 9 + self.assertEqual(20, len(stream.strip().split('\n'))) + + stream = unzips(''.join([i for i in parcel.encode([ + ('first', None, content()), + ('second', None, content()), + ], + compresslevel=0)])) + assert len(stream) > RECORD * 8 + assert len(stream) < RECORD * 9 + self.assertEqual(20, len(stream.strip().split('\n'))) + + def test_decode_dir(self): + stream = zips( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'num': 1, 'content-length': '8'}) + '\n' + + 'content1' + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/1.parcel', stream.getvalue())) + + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'num': 2, 'content-length': '8'}) + '\n' + + 'content2' + '\n' + + json.dumps({'payload': 3}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/2.parcel', stream.getvalue())) + + packets_iter = parcel.decode_dir('parcels') + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertEqual({'packet': 2}, packet.props) + items = iter(packet) + blob = next(items) + self.assertEqual({'num': 2, 'content-length': '8'}, blob) + self.assertEqual('content2', file(blob.path).read()) + self.assertEqual({'payload': 3}, next(items)) + self.assertRaises(StopIteration, items.next) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + self.assertEqual({'foo': 'bar', 'packet': 1}, packet.props) + items = iter(packet) + self.assertEqual({'payload': 1}, next(items)) + blob = next(items) + self.assertEqual({'num': 1, 'content-length': '8'}, blob) + self.assertEqual('content1', file(blob.path).read()) + self.assertEqual({'payload': 2}, next(items)) + self.assertRaises(StopIteration, items.next) + self.assertRaises(StopIteration, packets_iter.next) + + def test_decode_dir_RemoveOutdatedParcels(self): + stream = zips( + json.dumps({'from': 'principal'}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/1.parcel', stream.getvalue())) + + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/2.parcel', stream.getvalue())) + + packets_iter = parcel.decode_dir('parcels', recipient='principal') + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertRaises(StopIteration, packets_iter.next) + assert not exists('parcels/1.parcel') + assert exists('parcels/2.parcel') + + stream = zips( + json.dumps({'from': 'principal', 'session': 'old'}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/3.parcel', stream.getvalue())) + + packets_iter = parcel.decode_dir('parcels', recipient='principal', session='new') + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertRaises(StopIteration, packets_iter.next) + assert not exists('parcels/1.parcel') + assert exists('parcels/2.parcel') + assert not exists('parcels/3.parcel') + + def test_decode_dir_SkipTheSameSessionParcels(self): + stream = zips( + json.dumps({'from': 'principal', 'session': 'new'}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/1.parcel', stream.getvalue())) + + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + self.touch(('parcels/2.parcel', stream.getvalue())) + + packets_iter = parcel.decode_dir('parcels', recipient='principal', session='new') + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + self.assertRaises(StopIteration, packets_iter.next) + assert exists('parcels/1.parcel') + assert exists('parcels/2.parcel') + + def test_encode_dir(self): + self.touch(('blob', 'content')) + parcel.encode_dir([ + (1, None, [ + {'payload': 1}, + File('blob', 'digest', [('num', 1)]), + {'payload': 2}, + ]), + (2, None, [ + File('blob', 'digest', [('num', 2)]), + {'payload': 3}, + ]), + ], path='./parcel') + + assert exists('parcel') + + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'num': 1}) + '\n' + + 'content' + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'num': 2}) + '\n' + + 'content' + '\n' + + json.dumps({'payload': 3}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + unzips(file('parcel').read())) + + +def zips(data): + result = StringIO() + f = gzip.GzipFile(fileobj=result, mode='wb') + f.write(data) + f.close() + result.seek(0) + return result + + +def unzips(data): + return gzip.GzipFile(fileobj=StringIO(data)).read() + + +if __name__ == '__main__': + tests.main() |