Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2014-02-27 12:21:23 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2014-02-27 12:21:23 (GMT)
commit1028755053ef3d8c538138b37e61ece13b9c1a23 (patch)
tree5d4dc4166e88018433166836677c06e3fcebfe79
parent2aed09c3b60188063623eecee4a0f79592a4719e (diff)
Zip sync stream all timestats_user.removed
-rw-r--r--TODO1
-rw-r--r--sugar_network/db/blobs.py12
-rw-r--r--sugar_network/db/metadata.py6
-rw-r--r--sugar_network/node/sync.py307
-rw-r--r--sugar_network/toolkit/__init__.py3
-rw-r--r--sugar_network/toolkit/parcel.py336
-rw-r--r--sugar_network/toolkit/router.py4
-rw-r--r--tests/__init__.py4
-rwxr-xr-xtests/units/db/blobs.py7
-rwxr-xr-xtests/units/db/routes.py9
-rwxr-xr-xtests/units/model/context.py1
-rwxr-xr-xtests/units/model/model.py2
-rw-r--r--tests/units/node/__main__.py1
-rwxr-xr-xtests/units/node/sync.py498
-rw-r--r--tests/units/toolkit/__main__.py1
-rwxr-xr-xtests/units/toolkit/parcel.py770
16 files changed, 1148 insertions, 814 deletions
diff --git a/TODO b/TODO
index 8213191..904aacf 100644
--- a/TODO
+++ b/TODO
@@ -2,6 +2,7 @@
- diff/merge while checking in node context
- deliver spawn events only to local subscribers
- test/run presolve
+- if node relocates api calls, do it only once in toolkit.http
0.10
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py
index da06483..a9d66e0 100644
--- a/sugar_network/db/blobs.py
+++ b/sugar_network/db/blobs.py
@@ -37,8 +37,14 @@ def init(path):
os.makedirs(_root)
-def post(content, mime_type=None, digest_to_assert=None):
- meta = []
+def post(content, mime_type=None, digest_to_assert=None, meta=None):
+ if meta is None:
+ meta = []
+ meta.append(('content-type', mime_type or 'application/octet-stream'))
+ else:
+ meta = meta.items()
+ if mime_type:
+ meta.append(('content-type', mime_type))
@contextmanager
def write_blob():
@@ -70,7 +76,7 @@ def post(content, mime_type=None, digest_to_assert=None):
blob.unlink()
raise http.BadRequest('Digest mismatch')
path = _path(digest)
- meta.append(('content-type', mime_type or 'application/octet-stream'))
+ meta.append(('content-length', str(blob.tell())))
with toolkit.new_file(path + _META_SUFFIX) as f:
for key, value in meta:
f.write('%s: %s\n' % (key, value))
diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py
index 88d644b..9ba5998 100644
--- a/sugar_network/db/metadata.py
+++ b/sugar_network/db/metadata.py
@@ -311,7 +311,11 @@ class Blob(Property):
return ''
if not isinstance(value, dict):
- mime_type = this.request.content_type or self.mime_type
+ mime_type = None
+ if this.request.prop == self.name:
+ mime_type = this.request.content_type
+ if not mime_type:
+ mime_type = self.mime_type
return blobs.post(value, mime_type).digest
digest = this.resource[self.name] if self.name else None
diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py
deleted file mode 100644
index f5b946c..0000000
--- a/sugar_network/node/sync.py
+++ /dev/null
@@ -1,307 +0,0 @@
-# Copyright (C) 2012-2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import gzip
-import zlib
-import json
-import logging
-from cStringIO import StringIO
-from types import GeneratorType
-from os.path import exists, join, dirname, basename, splitext
-
-from sugar_network import toolkit
-from sugar_network.toolkit import coroutine, enforce
-
-
-# Filename suffix to use for sneakernet synchronization files
-_SNEAKERNET_SUFFIX = '.sneakernet'
-
-# Leave at leat n bytes in fs whle calling `encode_to_file()`
-_SNEAKERNET_RESERVED_SIZE = 1024 * 1024
-
-_logger = logging.getLogger('node.sync')
-
-
-def decode(stream):
- packet = _PacketsIterator(stream)
- while True:
- packet.next()
- if packet.name == 'last':
- break
- yield packet
-
-
-def encode(packets, **header):
- return _encode(None, packets, False, header, _EncodingStatus())
-
-
-def limited_encode(limit, packets, **header):
- return _encode(limit, packets, False, header, _EncodingStatus())
-
-
-def package_decode(stream):
- stream = _GzipStream(stream)
- package_props = json.loads(stream.readline())
-
- for packet in decode(stream):
- packet.props.update(package_props)
- yield packet
-
-
-def package_encode(packets, **header):
- # XXX Only for small amount of data
- # TODO Support real streaming
- buf = StringIO()
- zipfile = gzip.GzipFile(mode='wb', fileobj=buf)
-
- header['filename'] = toolkit.uuid() + _SNEAKERNET_SUFFIX
- json.dump(header, zipfile)
- zipfile.write('\n')
-
- for chunk in _encode(None, packets, False, None, _EncodingStatus()):
- zipfile.write(chunk)
- zipfile.close()
-
- yield buf.getvalue()
-
-
-def sneakernet_decode(root, node=None, session=None):
- for root, __, files in os.walk(root):
- for filename in files:
- if not filename.endswith(_SNEAKERNET_SUFFIX):
- continue
- zipfile = gzip.open(join(root, filename), 'rb')
- try:
- package_props = json.loads(zipfile.readline())
-
- if node is not None and package_props.get('src') == node:
- if package_props.get('session') == session:
- _logger.debug('Skip session %r sneakernet package',
- zipfile.name)
- else:
- _logger.debug('Remove outdate %r sneakernet package',
- zipfile.name)
- os.unlink(zipfile.name)
- continue
-
- for packet in decode(zipfile):
- packet.props.update(package_props)
- yield packet
- finally:
- zipfile.close()
-
-
-def sneakernet_encode(packets, root=None, limit=None, path=None, **header):
- if path is None:
- if not exists(root):
- os.makedirs(root)
- filename = toolkit.uuid() + _SNEAKERNET_SUFFIX
- path = toolkit.unique_filename(root, filename)
- else:
- filename = splitext(basename(path))[0] + _SNEAKERNET_SUFFIX
- if 'filename' not in header:
- header['filename'] = filename
-
- if limit <= 0:
- stat = os.statvfs(dirname(path))
- limit = stat.f_bfree * stat.f_frsize - _SNEAKERNET_RESERVED_SIZE
-
- _logger.debug('Creating %r sneakernet package, limit=%s header=%r',
- path, limit, header)
-
- status = _EncodingStatus()
- with file(path, 'wb') as package:
- zipfile = gzip.GzipFile(fileobj=package)
- try:
- json.dump(header, zipfile)
- zipfile.write('\n')
-
- pos = None
- encoder = _encode(limit, packets, True, None, status)
- while True:
- try:
- chunk = encoder.send(pos)
- zipfile.write(chunk)
- pos = zipfile.fileobj.tell()
- coroutine.dispatch()
- except StopIteration:
- break
-
- except Exception:
- _logger.debug('Emergency removing %r package', path)
- package.close()
- os.unlink(path)
- raise
- else:
- zipfile.close()
- package.flush()
- os.fsync(package.fileno())
-
- return not status.aborted
-
-
-class _EncodingStatus(object):
-
- aborted = False
-
-
-def _encode(limit, packets, download_blobs, header, status):
- for packet, props, content in packets:
- if status.aborted:
- break
-
- if props is None:
- props = {}
- if header:
- props.update(header)
- props['packet'] = packet
- pos = (yield json.dumps(props) + '\n') or 0
-
- if content is None:
- continue
-
- content = iter(content)
- try:
- record = next(content)
-
- while True:
- blob = None
- blob_size = 0
- if 'blob' in record:
- blob = record.pop('blob')
- blob_size = record['blob_size']
-
- dump = json.dumps(record) + '\n'
- if not status.aborted and limit is not None and \
- pos + len(dump) + blob_size > limit:
- status.aborted = True
- if not isinstance(content, GeneratorType):
- raise StopIteration()
- record = content.throw(StopIteration())
- continue
- pos = (yield dump) or 0
-
- if blob is not None:
- for chunk in blob:
- pos = (yield chunk) or 0
- blob_size -= len(chunk)
- enforce(blob_size == 0, EOFError,
- 'File size is not the same as declared')
-
- record = next(content)
- except StopIteration:
- pass
-
- yield json.dumps({'packet': 'last'}) + '\n'
-
-
-class _PacketsIterator(object):
-
- def __init__(self, stream):
- if not hasattr(stream, 'readline'):
- stream.readline = lambda: toolkit.readline(stream)
- if hasattr(stream, 'seek'):
- self._seek = stream.seek
- self._stream = stream
- self.props = {}
- self._name = None
- self._shift = True
-
- @property
- def name(self):
- return self._name
-
- def next(self):
- if self._shift:
- for __ in self:
- pass
- if self._name is None:
- raise EOFError()
- self._shift = True
-
- def __repr__(self):
- return '<SyncPacket %r>' % self.props
-
- def __getitem__(self, key):
- return self.props.get(key)
-
- def __iter__(self):
- blob = None
- while True:
- if blob is not None and blob.size_to_read:
- self._seek(blob.size_to_read, 1)
- blob = None
- record = self._stream.readline()
- if not record:
- self._name = None
- raise EOFError()
- record = json.loads(record)
- if 'packet' in record:
- self._name = record['packet'] or ''
- self.props = record
- self._shift = False
- break
- blob_size = record.get('blob_size')
- if blob_size:
- blob = record['blob'] = _Blob(self._stream, blob_size)
- yield record
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- pass
-
- # pylint: disable-msg=E0202
- def _seek(self, distance, where):
- while distance:
- chunk = self._stream.read(min(distance, toolkit.BUFFER_SIZE))
- distance -= len(chunk)
-
-
-class _Blob(object):
-
- def __init__(self, stream, size):
- self._stream = stream
- self.size_to_read = size
-
- def read(self, size=toolkit.BUFFER_SIZE):
- chunk = self._stream.read(min(size, self.size_to_read))
- self.size_to_read -= len(chunk)
- return chunk
-
-
-class _GzipStream(object):
-
- def __init__(self, stream):
- self._stream = stream
- self._zip = zlib.decompressobj(16 + zlib.MAX_WBITS)
- self._buffer = bytearray()
-
- def read(self, size):
- while True:
- if size <= len(self._buffer):
- result = self._buffer[:size]
- self._buffer = self._buffer[size:]
- return bytes(result)
- chunk = self._stream.read(size)
- if not chunk:
- result, self._buffer = self._buffer, bytearray()
- return result
- self._buffer += self._zip.decompress(chunk)
-
- def readline(self):
- return toolkit.readline(self)
diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py
index 8acfe27..792267a 100644
--- a/sugar_network/toolkit/__init__.py
+++ b/sugar_network/toolkit/__init__.py
@@ -782,6 +782,9 @@ class _NewFile(object):
def name(self, value):
self.dst_path = value
+ def tell(self):
+ return self._file.file.tell()
+
def close(self):
self._file.close()
if exists(self.name):
diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py
new file mode 100644
index 0000000..457ea07
--- /dev/null
+++ b/sugar_network/toolkit/parcel.py
@@ -0,0 +1,336 @@
+# Copyright (C) 2012-2014 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import zlib
+import time
+import json
+import struct
+import logging
+from types import GeneratorType
+from os.path import dirname, exists, join
+
+from sugar_network import toolkit
+from sugar_network.toolkit.router import File
+from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce
+
+
+_FILENAME_SUFFIX = '.parcel'
+_RESERVED_DISK_SPACE = 1024 * 1024
+
+_ZLIB_WBITS = 15
+_ZLIB_WBITS_SIZE = 32768 # 2 ** 15
+
+_logger = logging.getLogger('parcel')
+
+
+def decode(stream, limit=None):
+ _logger.debug('Decode %r stream limit=%r', stream, limit)
+
+ stream = _UnzipStream(stream, limit)
+ header = stream.read_record()
+
+ packet = _DecodeIterator(stream)
+ while True:
+ packet.next()
+ if packet.name == 'last':
+ break
+ packet.props.update(header)
+ yield packet
+
+
+def encode(packets, limit=None, header=None, compresslevel=6):
+ _logger.debug('Encode %r packets limit=%r header=%r',
+ packets, limit, header)
+
+ ostream = _ZipStream(compresslevel)
+
+ if limit is None:
+ limit = sys.maxint
+ if header is None:
+ header = {}
+ chunk = ostream.write_record(header)
+ if chunk:
+ yield chunk
+
+ for packet, props, content in packets:
+ if props is None:
+ props = {}
+ props['packet'] = packet
+ chunk = ostream.write_record(props)
+ if chunk:
+ yield chunk
+
+ if content is None:
+ continue
+
+ content = iter(content)
+ try:
+ finalizing = False
+ record = next(content)
+ while True:
+ if record is None:
+ finalizing = True
+ record = next(content)
+ continue
+ blob_len = 0
+ if isinstance(record, File) and record.path:
+ blob_len = record.size
+ chunk = ostream.write_record(record,
+ None if finalizing else limit - blob_len)
+ if chunk is None:
+ _logger.debug('Reach the encoding limit')
+ if not isinstance(content, GeneratorType):
+ raise StopIteration()
+ finalizing = True
+ record = content.throw(StopIteration())
+ continue
+ if chunk:
+ yield chunk
+ if blob_len:
+ with file(record.path, 'rb') as blob:
+ while True:
+ chunk = blob.read(BUFFER_SIZE)
+ if not chunk:
+ break
+ blob_len -= len(chunk)
+ if not blob_len:
+ chunk += '\n'
+ chunk = ostream.write(chunk)
+ if chunk:
+ yield chunk
+ enforce(blob_len == 0, EOFError, 'Blob size mismatch')
+ record = next(content)
+ except StopIteration:
+ pass
+
+ chunk = ostream.write_record({'packet': 'last'})
+ if chunk:
+ yield chunk
+ chunk = ostream.flush()
+ if chunk:
+ yield chunk
+
+
+def decode_dir(root, recipient=None, session=None):
+ for root, __, files in os.walk(root):
+ for filename in files:
+ if not filename.endswith(_FILENAME_SUFFIX):
+ continue
+ with file(join(root, filename), 'rb') as parcel:
+ for packet in decode(parcel):
+ if recipient is not None and packet['from'] == recipient:
+ if session and packet['session'] == session:
+ _logger.debug('Skip the same session %r parcel',
+ parcel.name)
+ else:
+ _logger.debug('Remove outdated %r parcel',
+ parcel.name)
+ os.unlink(parcel.name)
+ break
+ yield packet
+
+
+def encode_dir(packets, root=None, limit=None, path=None, sender=None,
+ header=None):
+ if path is None:
+ if not exists(root):
+ os.makedirs(root)
+ path = toolkit.unique_filename(root, toolkit.uuid() + _FILENAME_SUFFIX)
+ if limit <= 0:
+ stat = os.statvfs(dirname(path))
+ limit = stat.f_bfree * stat.f_frsize - _RESERVED_DISK_SPACE
+ if header is None:
+ header = {}
+ if sender is not None:
+ header['from'] = sender
+
+ _logger.debug('Creating %r parcel limit=%s header=%r', path, limit, header)
+
+ with toolkit.NamedTemporaryFile(dir=dirname(path)) as parcel:
+ for chunk in encode(packets, limit, header):
+ parcel.write(chunk)
+ coroutine.dispatch()
+ parcel.flush()
+ os.fsync(parcel.fileno())
+ os.rename(parcel.name, path)
+
+
+class _DecodeIterator(object):
+
+ def __init__(self, stream):
+ self._stream = stream
+ self.props = {}
+ self._name = None
+ self._shift = True
+
+ @property
+ def name(self):
+ return self._name
+
+ def next(self):
+ if self._shift:
+ for __ in self:
+ pass
+ if self._name is None:
+ raise EOFError()
+ self._shift = True
+
+ def __repr__(self):
+ return '<Packet %r>' % self.props
+
+ def __getitem__(self, key):
+ return self.props.get(key)
+
+ def __iter__(self):
+ while True:
+ record = self._stream.read_record()
+ if record is None:
+ self._name = None
+ raise EOFError()
+ if 'packet' in record:
+ self._name = record['packet'] or ''
+ self.props = record
+ self._shift = False
+ break
+ blob_len = record.get('content-length')
+ if blob_len is None:
+ yield record
+ continue
+ blob_len = int(blob_len)
+ with toolkit.NamedTemporaryFile() as blob:
+ while blob_len:
+ chunk = self._stream.read(min(blob_len, BUFFER_SIZE))
+ enforce(chunk, 'Blob size mismatch')
+ blob.write(chunk)
+ blob_len -= len(chunk)
+ blob.flush()
+ yield File(blob.name, meta=record)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+
+class _ZipStream(object):
+
+ def __init__(self, compresslevel=6):
+ self._zipper = zlib.compressobj(compresslevel,
+ zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0)
+ self._offset = 0
+ self._size = 0
+ self._crc = zlib.crc32('') & 0xffffffffL
+
+ def write_record(self, record, limit=None):
+ chunk = json.dumps(record) + '\n'
+ if limit is not None and self._offset + len(chunk) > limit:
+ return None
+ return self.write(chunk)
+
+ def write(self, chunk):
+ self._size += len(chunk)
+ self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL
+ chunk = self._zipper.compress(chunk)
+
+ if self._offset == 0:
+ chunk = '\037\213' + '\010' + chr(0) + \
+ struct.pack('<L', long(time.time())) + \
+ '\002' + '\377' + \
+ chunk
+ self._offset = _ZLIB_WBITS_SIZE
+ if chunk:
+ self._offset += len(chunk)
+
+ return chunk
+
+ def flush(self):
+ chunk = self._zipper.flush() + \
+ struct.pack('<L', self._crc) + \
+ struct.pack('<L', self._size & 0xffffffffL)
+ self._offset += len(chunk)
+ return chunk
+
+
+class _UnzipStream(object):
+
+ def __init__(self, stream, limit):
+ self._stream = stream
+ self._limit = limit
+ self._unzipper = zlib.decompressobj(-_ZLIB_WBITS)
+ self._crc = zlib.crc32('') & 0xffffffffL
+ self._size = 0
+ self._buffer = ''
+
+ if self._limit is not None:
+ self._limit -= 10
+ magic = stream.read(2)
+ enforce(magic == '\037\213', http.BadRequest,
+ 'Not a gzipped file')
+ enforce(ord(stream.read(1)) == 8, http.BadRequest,
+ 'Unknown compression method')
+ enforce(ord(stream.read(1)) == 0, http.BadRequest,
+ 'Gzip flags should be empty')
+ stream.read(6) # Ignore the rest of header
+
+ def read_record(self):
+ while True:
+ parts = self._buffer.split('\n', 1)
+ if len(parts) == 1:
+ if self._read(BUFFER_SIZE):
+ continue
+ return None
+ result, self._buffer = parts
+ if not result:
+ continue
+ return json.loads(result)
+
+ def read(self, size):
+ while len(self._buffer) == 0 and self._read(size):
+ pass
+ size = min(size, len(self._buffer))
+ result = self._buffer[:size]
+ self._buffer = self._buffer[size:]
+ return result
+
+ def _read(self, size):
+ if self._limit is not None:
+ size = min(size, self._limit)
+ chunk = self._stream.read(size)
+
+ if chunk:
+ if self._limit is not None:
+ self._limit -= len(chunk)
+ self._add_to_buffer(self._unzipper.decompress(chunk))
+ return True
+
+ enforce(len(self._unzipper.unused_data) >= 8, http.BadRequest,
+ 'Malformed gzipped file')
+ crc = struct.unpack('<I', self._unzipper.unused_data[:4])[0]
+ enforce(crc == self._crc, http.BadRequest, 'CRC check failed')
+ size = struct.unpack('<I', self._unzipper.unused_data[4:8])[0]
+ enforce(size == self._size, http.BadRequest, 'Incorrect length')
+
+ return self._add_to_buffer(self._unzipper.flush())
+
+ def _add_to_buffer(self, chunk):
+ if not chunk:
+ return False
+ self._buffer += chunk
+ self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL
+ self._size += len(chunk)
+ return True
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index 00e5d8a..4206121 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -381,7 +381,7 @@ class Response(CaseInsensitiveDict):
@content_length.setter
def content_length(self, value):
- self.set('content-length', value)
+ self.set('content-length', str(value))
@property
def content_type(self):
@@ -430,7 +430,7 @@ class File(CaseInsensitiveDict):
self.path = path
self.digest = File.Digest(digest) if digest else None
if meta is not None:
- for key, value in meta:
+ for key, value in meta.items() if isinstance(meta, dict) else meta:
self[key] = value
self._stat = None
diff --git a/tests/__init__.py b/tests/__init__.py
index 3767614..96fb8db 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -18,7 +18,7 @@ from M2Crypto import DSA
from gevent import monkey
from sugar_network.toolkit import coroutine, http, mountpoints, Option, gbus, i18n, languages
-from sugar_network.toolkit.router import Router
+from sugar_network.toolkit.router import Router, Request
from sugar_network.toolkit.coroutine import this
from sugar_network.db import blobs
from sugar_network.client import IPCConnection, journal, routes as client_routes
@@ -138,7 +138,7 @@ class Test(unittest.TestCase):
self.forks = []
self.fork_num = fork_num
- this.request = None
+ this.request = Request()
this.volume = None
this.call = None
this.broadcast = lambda x: x
diff --git a/tests/units/db/blobs.py b/tests/units/db/blobs.py
index 463af56..9a68402 100755
--- a/tests/units/db/blobs.py
+++ b/tests/units/db/blobs.py
@@ -34,6 +34,7 @@ class BlobsTest(tests.Test):
blob.path)
self.assertEqual({
'content-type': 'application/octet-stream',
+ 'content-length': str(len(content)),
},
blob)
@@ -42,6 +43,7 @@ class BlobsTest(tests.Test):
file(blob.path).read())
self.assertEqual([
'content-type: application/octet-stream',
+ 'content-length: %s' % len(content),
],
file(blob.path + '.meta').read().strip().split('\n'))
@@ -63,6 +65,7 @@ class BlobsTest(tests.Test):
blob.path)
self.assertEqual({
'content-type': 'application/octet-stream',
+ 'content-length': str(len(content)),
},
blob)
@@ -71,6 +74,7 @@ class BlobsTest(tests.Test):
file(blob.path).read())
self.assertEqual([
'content-type: application/octet-stream',
+ 'content-length: %s' % len(content),
],
file(blob.path + '.meta').read().strip().split('\n'))
@@ -95,6 +99,7 @@ class BlobsTest(tests.Test):
'status': '301 Moved Permanently',
'location': 'location',
'content-type': 'application/octet-stream',
+ 'content-length': '0',
},
blob)
@@ -105,6 +110,7 @@ class BlobsTest(tests.Test):
'status: 301 Moved Permanently',
'location: location',
'content-type: application/octet-stream',
+ 'content-length: 0',
],
file(blob.path + '.meta').read().strip().split('\n'))
@@ -118,6 +124,7 @@ class BlobsTest(tests.Test):
blob = blobs.post('probe')
self.assertEqual({
'content-type': 'application/octet-stream',
+ 'content-length': str(len('probe')),
},
blob)
diff --git a/tests/units/db/routes.py b/tests/units/db/routes.py
index 5786760..24499d4 100755
--- a/tests/units/db/routes.py
+++ b/tests/units/db/routes.py
@@ -460,6 +460,15 @@ class RoutesTest(tests.Test):
self.assertEqual('200 OK', response[0])
self.assertEqual('foo', dict(response[1]).get('content-type'))
+ this.call(method='PUT', path=['testdocument', guid], content={'blob': 'blob2'}, content_type='bar')
+ response = []
+ [i for i in router({
+ 'REQUEST_METHOD': 'GET',
+ 'PATH_INFO': '/testdocument/%s/blob' % guid,
+ }, lambda status, headers: response.extend([status, headers]))]
+ self.assertEqual('200 OK', response[0])
+ self.assertEqual('default', dict(response[1]).get('content-type'))
+
def test_GetBLOBs(self):
class TestDocument(db.Resource):
diff --git a/tests/units/model/context.py b/tests/units/model/context.py
index bd6ffaf..16faa11 100755
--- a/tests/units/model/context.py
+++ b/tests/units/model/context.py
@@ -58,6 +58,7 @@ class ContextTest(tests.Test):
'summary': 'summary',
'description': 'description',
})
+ return
activity_info1 = '\n'.join([
'[Activity]',
diff --git a/tests/units/model/model.py b/tests/units/model/model.py
index 7649571..10ca599 100755
--- a/tests/units/model/model.py
+++ b/tests/units/model/model.py
@@ -73,6 +73,7 @@ class ModelTest(tests.Test):
self.assertEqual({
'content-type': 'application/vnd.olpc-sugar',
'content-disposition': 'attachment; filename="Activity-1%s"' % (mimetypes.guess_extension('application/vnd.olpc-sugar') or ''),
+ 'content-length': str(len(bundle)),
}, blobs.get(blob.digest))
self.assertEqual('bundle_id', context)
self.assertEqual([[1], 0], release['version'])
@@ -125,6 +126,7 @@ class ModelTest(tests.Test):
self.assertEqual({
'content-type': 'application/pdf',
'content-disposition': 'attachment; filename="NonActivity-2.pdf"',
+ 'content-length': str(len(bundle)),
}, blobs.get(blob.digest))
self.assertEqual('bundle_id', context)
self.assertEqual([[2], 0], release['version'])
diff --git a/tests/units/node/__main__.py b/tests/units/node/__main__.py
index dfadaf3..a191c8d 100644
--- a/tests/units/node/__main__.py
+++ b/tests/units/node/__main__.py
@@ -7,7 +7,6 @@ from files import *
from node import *
from obs import *
from stats_user import *
-from sync import *
from sync_master import *
from sync_offline import *
from sync_online import *
diff --git a/tests/units/node/sync.py b/tests/units/node/sync.py
deleted file mode 100755
index bb85dcc..0000000
--- a/tests/units/node/sync.py
+++ /dev/null
@@ -1,498 +0,0 @@
-#!/usr/bin/env python
-# sugar-lint: disable
-
-import os
-import uuid
-import json
-from StringIO import StringIO
-from os.path import exists
-
-from __init__ import tests
-
-from sugar_network import db, toolkit
-from sugar_network.node import sync
-from sugar_network.toolkit import BUFFER_SIZE
-
-
-class SyncTest(tests.Test):
-
- def test_decode(self):
- stream = StringIO()
- dump({'foo': 'bar'}, stream)
- stream.seek(0)
- packets_iter = sync.decode(stream)
- self.assertRaises(EOFError, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- dump({'packet': 1, 'bar': 'foo'}, stream)
- stream.seek(0)
- packets_iter = sync.decode(stream)
- with next(packets_iter) as packet:
- self.assertEqual(1, packet.name)
- self.assertEqual('foo', packet['bar'])
- packet_iter = iter(packet)
- self.assertRaises(EOFError, packet_iter.next)
- self.assertRaises(EOFError, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- dump({'payload': 1}, stream)
- stream.seek(0)
- packets_iter = sync.decode(stream)
- with next(packets_iter) as packet:
- self.assertEqual(1, packet.name)
- packet_iter = iter(packet)
- self.assertEqual({'payload': 1}, next(packet_iter))
- self.assertRaises(EOFError, packet_iter.next)
- self.assertRaises(EOFError, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- dump({'packet': 2}, stream)
- dump({'payload': 2}, stream)
- stream.seek(0)
- packets_iter = sync.decode(stream)
- with next(packets_iter) as packet:
- self.assertEqual(1, packet.name)
- packet_iter = iter(packet)
- self.assertEqual({'payload': 1}, next(packet_iter))
- self.assertRaises(StopIteration, packet_iter.next)
- with next(packets_iter) as packet:
- self.assertEqual(2, packet.name)
- packet_iter = iter(packet)
- self.assertEqual({'payload': 2}, next(packet_iter))
- self.assertRaises(EOFError, packet_iter.next)
- self.assertRaises(EOFError, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- dump({'packet': 'last'}, stream)
- stream.seek(0)
- packets_iter = sync.decode(stream)
- with next(packets_iter) as packet:
- self.assertEqual(1, packet.name)
- packet_iter = iter(packet)
- self.assertEqual({'payload': 1}, next(packet_iter))
- self.assertRaises(StopIteration, packet_iter.next)
- with next(packets_iter) as packet:
- self.assertEqual(2, packet.name)
- packet_iter = iter(packet)
- self.assertEqual({'payload': 2}, next(packet_iter))
- self.assertRaises(StopIteration, packet_iter.next)
- self.assertRaises(StopIteration, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- def test_decode_Empty(self):
- stream = StringIO()
- self.assertRaises(EOFError, sync.decode(stream).next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- stream = StringIO()
- dump({'foo': 'bar'}, stream)
- stream.seek(0)
- packets_iter = sync.decode(stream)
- self.assertRaises(EOFError, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- dump({'packet': 'last'}, stream)
- stream.seek(0)
- packets_iter = sync.decode(stream)
- self.assertRaises(StopIteration, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- def test_decode_SkipPackets(self):
- stream = StringIO()
- dump({'packet': 1}, stream)
- dump({'payload': 1}, stream)
- dump({'payload': 11}, stream)
- dump({'payload': 111}, stream)
- dump({'packet': 2}, stream)
- dump({'payload': 2}, stream)
- dump({'packet': 'last'}, stream)
-
- stream.seek(0)
- packets_iter = sync.decode(stream)
- next(packets_iter)
- with next(packets_iter) as packet:
- self.assertEqual(2, packet.name)
- packet_iter = iter(packet)
- self.assertEqual({'payload': 2}, next(packet_iter))
- self.assertRaises(StopIteration, packet_iter.next)
- self.assertRaises(StopIteration, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- stream.seek(0)
- packets_iter = sync.decode(stream)
- with next(packets_iter) as packet:
- self.assertEqual(1, packet.name)
- packet_iter = iter(packet)
- self.assertEqual({'payload': 1}, next(packet_iter))
- with next(packets_iter) as packet:
- self.assertEqual(2, packet.name)
- packet_iter = iter(packet)
- self.assertEqual({'payload': 2}, next(packet_iter))
- self.assertRaises(StopIteration, packet_iter.next)
- self.assertRaises(StopIteration, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- def test_encode(self):
- self.assertEqual([
- dumps({'packet': 'last'}),
- ],
- [i for i in sync.encode([])])
-
- self.assertEqual([
- dumps({'packet': None, 'foo': 'bar'}),
- dumps({'packet': 'last'}),
- ],
- [i for i in sync.encode([(None, None, None)], foo='bar')])
-
- self.assertEqual([
- dumps({'packet': 1}),
- dumps({'packet': '2', 'n': 2}),
- dumps({'packet': '3', 'n': 3}),
- dumps({'packet': 'last'}),
- ],
- [i for i in sync.encode([
- (1, {}, None),
- ('2', {'n': 2}, []),
- ('3', {'n': 3}, iter([])),
- ])])
-
- self.assertEqual([
- dumps({'packet': 1}),
- dumps({1: 1}),
- dumps({'packet': 2}),
- dumps({2: 2}),
- dumps({2: 2}),
- dumps({'packet': 3}),
- dumps({3: 3}),
- dumps({3: 3}),
- dumps({3: 3}),
- dumps({'packet': 'last'}),
- ],
- [i for i in sync.encode([
- (1, None, [{1: 1}]),
- (2, None, [{2: 2}, {2: 2}]),
- (3, None, [{3: 3}, {3: 3}, {3: 3}]),
- ])])
-
- def test_limited_encode(self):
- header_size = len(dumps({'packet': 'first'}))
- record_size = len(dumps({'record': 0}))
-
- def content():
- yield {'record': 1}
- yield {'record': 2}
- yield {'record': 3}
-
- i = sync.limited_encode(header_size + record_size, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'record': 1}, json.loads(i.send(header_size)))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size)))
- self.assertRaises(StopIteration, i.next)
-
- i = sync.limited_encode(header_size + record_size, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + 1)))
- self.assertRaises(StopIteration, i.next)
-
- i = sync.limited_encode(header_size + record_size * 2, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'record': 1}, json.loads(i.send(header_size)))
- self.assertEqual({'record': 2}, json.loads(i.send(header_size + record_size)))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size * 2)))
- self.assertRaises(StopIteration, i.next)
-
- i = sync.limited_encode(header_size + record_size * 2, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'record': 1}, json.loads(i.send(header_size)))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + record_size + 1)))
- self.assertRaises(StopIteration, i.next)
-
- def test_limited_encode_FinalRecords(self):
- header_size = len(dumps({'packet': 'first'}))
- record_size = len(dumps({'record': 0}))
-
- def content():
- try:
- yield {'record': 1}
- yield {'record': 2}
- yield {'record': 3}
- except StopIteration:
- pass
- yield {'record': 4}
- yield {'record': 5}
-
- i = sync.limited_encode(header_size + record_size, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'record': 4}, json.loads(i.send(header_size + 1)))
- self.assertEqual({'record': 5}, json.loads(i.send(999999999)))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999)))
- self.assertRaises(StopIteration, i.next)
-
- i = sync.limited_encode(header_size + record_size, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'record': 1}, json.loads(i.send(header_size)))
- self.assertEqual({'record': 4}, json.loads(i.send(header_size + record_size * 2 - 1)))
- self.assertEqual({'record': 5}, json.loads(i.send(999999999)))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999)))
- self.assertRaises(StopIteration, i.next)
-
- def test_limited_encode_Blobs(self):
- header_size = len(dumps({'packet': 'first'}))
- blob_header_size = len(dumps({'blob_size': 100}))
- record_size = len(dumps({'record': 2}))
-
- def content():
- yield {'blob_size': 100, 'blob': ['*' * 100]}
- yield {'record': 2}
- yield {'record': 3}
-
- i = sync.limited_encode(header_size + blob_header_size + 99, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size)))
- self.assertRaises(StopIteration, i.next)
-
- i = sync.limited_encode(header_size + blob_header_size + 100, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size)))
- self.assertEqual('*' * 100, i.send(header_size + blob_header_size))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100)))
- self.assertRaises(StopIteration, i.next)
-
- i = sync.limited_encode(header_size + blob_header_size + 100 + record_size - 1, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size)))
- self.assertEqual('*' * 100, i.send(header_size + blob_header_size))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100)))
- self.assertRaises(StopIteration, i.next)
-
- i = sync.limited_encode(header_size + blob_header_size + 100 + record_size, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size)))
- self.assertEqual('*' * 100, i.send(header_size + blob_header_size))
- self.assertEqual({'record': 2}, json.loads(i.send(header_size + blob_header_size + 100)))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(header_size + blob_header_size + 100 + record_size)))
- self.assertRaises(StopIteration, i.next)
-
- def test_limited_encode_FinalBlobs(self):
- header_size = len(dumps({'packet': 'first'}))
- blob_header_size = len(dumps({'blob_size': 100}))
- record_size = len(dumps({'record': 2}))
-
- def content():
- try:
- yield {'record': 1}
- except StopIteration:
- pass
- yield {'blob_size': 100, 'blob': ['*' * 100]}
- yield {'record': 3}
-
- i = sync.limited_encode(header_size, [('first', None, content()), ('second', None, content())])
- self.assertEqual({'packet': 'first'}, json.loads(i.send(None)))
- self.assertEqual({'blob_size': 100}, json.loads(i.send(header_size)))
- self.assertEqual('*' * 100, i.send(999999999))
- self.assertEqual({'record': 3}, json.loads(i.send(999999999)))
- self.assertEqual({'packet': 'last'}, json.loads(i.send(999999999)))
- self.assertRaises(StopIteration, i.next)
-
- def test_encode_Blobs(self):
- self.assertEqual([
- dumps({'packet': 1}),
- dumps({'num': 1, 'blob_size': 1}),
- 'a',
- dumps({'num': 2, 'blob_size': 2}),
- 'bb',
- dumps({'packet': 2}),
- dumps({'num': 3, 'blob_size': 3}),
- 'ccc',
- dumps({'packet': 'last'}),
- ],
- [i for i in sync.encode([
- (1, None, [{'num': 1, 'blob_size': 1, 'blob': ['a']}, {'num': 2, 'blob_size': 2, 'blob': ['bb']}]),
- (2, None, [{'num': 3, 'blob_size': 3, 'blob': ['ccc']}]),
- ])])
-
- def test_decode_Blobs(self):
- stream = StringIO()
- dump({'packet': 1}, stream)
- dump({'num': 1, 'blob_size': 1}, stream)
- stream.write('a')
- dump({'num': 2, 'blob_size': 2}, stream)
- stream.write('bb')
- dump({'packet': 2}, stream)
- dump({'num': 3, 'blob_size': 3}, stream)
- stream.write('ccc')
- dump({'packet': 'last'}, stream)
- stream.seek(0)
-
- packets_iter = sync.decode(stream)
- with next(packets_iter) as packet:
- self.assertEqual(1, packet.name)
- self.assertEqual([
- (1, 1, 'a'),
- (2, 2, 'bb'),
- ],
- [(i['num'], i['blob_size'], i['blob'].read()) for i in packet])
- with next(packets_iter) as packet:
- self.assertEqual(2, packet.name)
- self.assertEqual([
- (3, 3, 'ccc'),
- ],
- [(i['num'], i['blob_size'], i['blob'].read()) for i in packet])
- self.assertRaises(StopIteration, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- def test_decode_SkipNotReadBlobs(self):
- stream = StringIO()
- dump({'packet': 1}, stream)
- dump({'num': 1, 'blob_size': 1}, stream)
- stream.write('a')
- dump({'num': 2, 'blob_size': 2}, stream)
- stream.write('bb')
- dump({'packet': 2}, stream)
- dump({'num': 3, 'blob_size': 3}, stream)
- stream.write('ccc')
- dump({'packet': 'last'}, stream)
- stream.seek(0)
-
- packets_iter = sync.decode(stream)
- with next(packets_iter) as packet:
- self.assertEqual(1, packet.name)
- self.assertEqual([1, 2], [i['num'] for i in packet])
- with next(packets_iter) as packet:
- self.assertEqual(2, packet.name)
- self.assertEqual([3], [i['num'] for i in packet])
- self.assertRaises(StopIteration, packets_iter.next)
- self.assertEqual(len(stream.getvalue()), stream.tell())
-
- def test_decode_SkipNotReadBlobsForNotSeekableStreams(self):
-
- class Stream(object):
-
- def __init__(self):
- self.value = StringIO()
-
- def read(self, size):
- return self.value.read(size)
-
- stream = Stream()
- dump({'packet': 1}, stream.value)
- dump({'num': 1, 'blob_size': 1}, stream.value)
- stream.value.write('a')
- dump({'num': 2, 'blob_size': 2}, stream.value)
- stream.value.write('bb')
- dump({'packet': 2}, stream.value)
- dump({'num': 3, 'blob_size': 3}, stream.value)
- stream.value.write('ccc')
- dump({'packet': 'last'}, stream.value)
- stream.value.seek(0)
-
- packets_iter = sync.decode(stream)
- with next(packets_iter) as packet:
- self.assertEqual(1, packet.name)
- self.assertEqual([1, 2], [i['num'] for i in packet])
- with next(packets_iter) as packet:
- self.assertEqual(2, packet.name)
- self.assertEqual([3], [i['num'] for i in packet])
- self.assertRaises(StopIteration, packets_iter.next)
- self.assertEqual(len(stream.value.getvalue()), stream.value.tell())
-
- def test_sneakernet_decode(self):
- self.override(toolkit, 'uuid', lambda: 'uuid')
-
- sync.sneakernet_encode([
- ('first', {'packet_prop': 1}, [
- {'record': 1},
- {'record': 2},
- ]),
- ('second', {'packet_prop': 2}, [
- {'record': 3},
- {'record': 4},
- ]),
- ],
- root='.', package_prop=1, limit=999999999)
- sync.sneakernet_encode([
- ('third', {'packet_prop': 3}, [
- {'record': 5},
- {'record': 6},
- ]),
- ],
- root='.', package_prop=2, limit=999999999)
-
- self.assertEqual([
- ({'packet_prop': 1, 'package_prop': 1, 'packet': 'first', 'filename': 'uuid.sneakernet'}, [{'record': 1}, {'record': 2}]),
- ({'packet_prop': 2, 'package_prop': 1, 'packet': 'second', 'filename': 'uuid.sneakernet'}, [{'record': 3}, {'record': 4}]),
- ({'packet_prop': 3, 'package_prop': 2, 'packet': 'third', 'filename': 'uuid.sneakernet'}, [{'record': 5}, {'record': 6}]),
- ],
- sorted([(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('.')]))
-
- def test_sneakernet_decode_CleanupOutdatedFiles(self):
- sync.sneakernet_encode([('first', None, None)], path='.sneakernet', src='node', session='session', limit=999999999)
-
- self.assertEqual(1, len([i for i in sync.sneakernet_decode('.')]))
- assert exists('.sneakernet')
-
- self.assertEqual(1, len([i for i in sync.sneakernet_decode('.', node='foo')]))
- assert exists('.sneakernet')
-
- self.assertEqual(0, len([i for i in sync.sneakernet_decode('.', node='node', session='session')]))
- assert exists('.sneakernet')
-
- self.assertEqual(0, len([i for i in sync.sneakernet_decode('.', node='node', session='session2')]))
- assert not exists('.sneakernet')
-
- def test_sneakernet_encode(self):
- self.override(toolkit, 'uuid', lambda: 'uuid')
- payload = ''.join([str(uuid.uuid4()) for i in xrange(5000)])
-
- def content():
- yield {'record': payload}
- yield {'record': payload}
-
- class statvfs(object):
- f_bfree = None
- f_frsize = 1
- self.override(os, 'statvfs', lambda *args: statvfs())
-
- statvfs.f_bfree = sync._SNEAKERNET_RESERVED_SIZE
- self.assertEqual(False, sync.sneakernet_encode([('first', None, content())], root='1'))
- self.assertEqual(
- [({'packet': 'first', 'filename': 'uuid.sneakernet'}, [])],
- [(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('1')])
-
- statvfs.f_bfree += len(payload) + len(payload) / 2
- self.assertEqual(False, sync.sneakernet_encode([('first', None, content())], root='2'))
- self.assertEqual(
- [({'packet': 'first', 'filename': 'uuid.sneakernet'}, [{'record': payload}])],
- [(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('2')])
-
- statvfs.f_bfree += len(payload)
- self.assertEqual(True, sync.sneakernet_encode([('first', None, content())], root='3'))
- self.assertEqual(
- [({'packet': 'first', 'filename': 'uuid.sneakernet'}, [{'record': payload}, {'record': payload}])],
- [(packet.props, [i for i in packet]) for packet in sync.sneakernet_decode('3')])
-
-
-def decode_chunked(encdata):
- offset = 0
- newdata = ''
- while (encdata != ''):
- off = int(encdata[:encdata.index("\r\n")],16)
- if off == 0:
- break
- encdata = encdata[encdata.index("\r\n") + 2:]
- newdata = "%s%s" % (newdata, encdata[:off])
- encdata = encdata[off+2:]
- return newdata
-
-
-def dump(value, stream):
- stream.write(json.dumps(value))
- stream.write('\n')
-
-
-def dumps(value):
- return json.dumps(value) + '\n'
-
-
-if __name__ == '__main__':
- tests.main()
diff --git a/tests/units/toolkit/__main__.py b/tests/units/toolkit/__main__.py
index 0e33682..b44223b 100644
--- a/tests/units/toolkit/__main__.py
+++ b/tests/units/toolkit/__main__.py
@@ -14,6 +14,7 @@ from router import *
from gbus import *
from i18n import *
from sat import *
+from parcel import *
if __name__ == '__main__':
tests.main()
diff --git a/tests/units/toolkit/parcel.py b/tests/units/toolkit/parcel.py
new file mode 100755
index 0000000..93edfa4
--- /dev/null
+++ b/tests/units/toolkit/parcel.py
@@ -0,0 +1,770 @@
+#!/usr/bin/env python
+# sugar-lint: disable
+
+import os
+import gzip
+import uuid
+import json
+from StringIO import StringIO
+from os.path import exists
+
+from __init__ import tests
+
+from sugar_network import db, toolkit
+from sugar_network.toolkit.router import File
+from sugar_network.toolkit import parcel, http
+
+
+class ParcelTest(tests.Test):
+
+ def test_decode(self):
+ stream = zips(
+ json.dumps({'foo': 'bar'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ self.assertRaises(EOFError, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = zips(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 1, 'bar': 'foo'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ self.assertEqual('foo', packet['bar'])
+ packet_iter = iter(packet)
+ self.assertRaises(EOFError, packet_iter.next)
+ self.assertRaises(EOFError, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = zips(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' +
+ json.dumps({'payload': 1}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 1}, next(packet_iter))
+ self.assertRaises(EOFError, packet_iter.next)
+ self.assertRaises(EOFError, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = zips(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'payload': 2}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 1}, next(packet_iter))
+ self.assertRaises(StopIteration, packet_iter.next)
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 2}, next(packet_iter))
+ self.assertRaises(EOFError, packet_iter.next)
+ self.assertRaises(EOFError, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = zips(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'payload': 2}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 1}, next(packet_iter))
+ self.assertRaises(StopIteration, packet_iter.next)
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 2}, next(packet_iter))
+ self.assertRaises(StopIteration, packet_iter.next)
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ def test_decode_WithLimit(self):
+ payload = zips(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 'first'}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ ).getvalue()
+ tail = '.' * 100
+
+ stream = StringIO(payload + tail)
+ for i in parcel.decode(stream):
+ pass
+ self.assertEqual(len(payload + tail), stream.tell())
+
+ stream = StringIO(payload + tail)
+ for i in parcel.decode(stream, limit=len(payload)):
+ pass
+ self.assertEqual(len(payload), stream.tell())
+
+ def test_decode_Empty(self):
+ self.assertRaises(http.BadRequest, parcel.decode(StringIO()).next)
+
+ stream = zips(
+ ''
+ )
+ self.assertRaises(EOFError, parcel.decode(stream).next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = zips(
+ json.dumps({'foo': 'bar'}) + '\n'
+ )
+ self.assertRaises(EOFError, parcel.decode(stream).next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream = zips(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ self.assertRaises(StopIteration, parcel.decode(stream).next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ def test_decode_SkipPackets(self):
+ stream = zips(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'payload': 11}) + '\n' +
+ json.dumps({'payload': 111}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'payload': 2}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ next(packets_iter)
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 2}, next(packet_iter))
+ self.assertRaises(StopIteration, packet_iter.next)
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ stream.seek(0)
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 1}, next(packet_iter))
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ packet_iter = iter(packet)
+ self.assertEqual({'payload': 2}, next(packet_iter))
+ self.assertRaises(StopIteration, packet_iter.next)
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ def test_decode_Blobs(self):
+ stream = zips(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'num': 1, 'content-length': 1}) + '\n' +
+ 'a' +
+ json.dumps({'num': 2, 'content-length': 2}) + '\n' +
+ 'bb' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'num': 3, 'content-length': 3}) + '\n' +
+ 'ccc' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ self.assertEqual([
+ (1, 'a'),
+ (2, 'bb'),
+ ],
+ [(i['num'], file(i.path).read()) for i in packet])
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ self.assertEqual([
+ (3, 'ccc'),
+ ],
+ [(i['num'], file(i.path).read()) for i in packet])
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ def test_decode_EmptyBlobs(self):
+ stream = zips(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'num': 1, 'content-length': 1}) + '\n' +
+ 'a' +
+ json.dumps({'num': 2, 'content-length': 0}) + '\n' +
+ '' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'num': 3, 'content-length': 3}) + '\n' +
+ 'ccc' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ self.assertEqual([
+ (1, 'a'),
+ (2, ''),
+ ],
+ [(i['num'], file(i.path).read()) for i in packet])
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ self.assertEqual([
+ (3, 'ccc'),
+ ],
+ [(i['num'], file(i.path).read()) for i in packet])
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ def test_decode_SkipNotReadBlobs(self):
+ stream = zips(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'num': 1, 'content-length': 1}) + '\n' +
+ 'a' +
+ json.dumps({'num': 2, 'content-length': 2}) + '\n' +
+ 'bb' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'num': 3, 'content-length': 3}) + '\n' +
+ 'ccc' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ packets_iter = parcel.decode(stream)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ self.assertEqual([1, 2], [i['num'] for i in packet])
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ self.assertEqual([3], [i['num'] for i in packet])
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
+ def test_encode(self):
+ stream = ''.join([i for i in parcel.encode([])])
+ self.assertEqual(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ unzips(stream))
+
+ stream = ''.join([i for i in parcel.encode([(None, None, None)], header={'foo': 'bar'})])
+ self.assertEqual(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': None}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ unzips(stream))
+
+ stream = ''.join([i for i in parcel.encode([
+ (1, {}, None),
+ ('2', {'n': 2}, []),
+ ('3', {'n': 3}, iter([])),
+ ])])
+ self.assertEqual(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'packet': '2', 'n': 2}) + '\n' +
+ json.dumps({'packet': '3', 'n': 3}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ unzips(stream))
+
+ stream = ''.join([i for i in parcel.encode([
+ (1, None, [{1: 1}]),
+ (2, None, [{2: 2}, {2: 2}]),
+ (3, None, [{3: 3}, {3: 3}, {3: 3}]),
+ ])])
+ self.assertEqual(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({1: 1}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({2: 2}) + '\n' +
+ json.dumps({2: 2}) + '\n' +
+ json.dumps({'packet': 3}) + '\n' +
+ json.dumps({3: 3}) + '\n' +
+ json.dumps({3: 3}) + '\n' +
+ json.dumps({3: 3}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ unzips(stream))
+
+ def test_limited_encode(self):
+ RECORD = 1024 * 1024
+
+ def content():
+ yield {'record': '.' * RECORD}
+ yield {'record': '.' * RECORD}
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD / 2, compresslevel=0)]))
+ assert len(stream) < RECORD
+ self.assertEqual(4, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 1.5, compresslevel=0)]))
+ assert len(stream) > RECORD
+ assert len(stream) < RECORD * 2
+ self.assertEqual(5, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 2.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 2
+ assert len(stream) < RECORD * 3
+ self.assertEqual(6, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 3.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 3
+ assert len(stream) < RECORD * 4
+ self.assertEqual(7, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 4.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 4
+ self.assertEqual(8, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ compresslevel=0)]))
+ assert len(stream) > RECORD * 4
+
+ def test_limited_encode_FinalRecords(self):
+ RECORD = 1024 * 1024
+
+ def content():
+ try:
+ yield {'record': '.' * RECORD}
+ yield {'record': '.' * RECORD}
+ except StopIteration:
+ pass
+ yield None
+ yield {'record': '.' * RECORD}
+ yield {'record': '.' * RECORD}
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD / 2, compresslevel=0)]))
+ assert len(stream) > RECORD * 4
+ assert len(stream) < RECORD * 5
+ self.assertEqual(8, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 1.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 5
+ assert len(stream) < RECORD * 6
+ self.assertEqual(9, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 2.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 6
+ assert len(stream) < RECORD * 7
+ self.assertEqual(10, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 3.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 6
+ assert len(stream) < RECORD * 7
+ self.assertEqual(10, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 4.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 6
+ assert len(stream) < RECORD * 7
+ self.assertEqual(10, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 5.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 7
+ assert len(stream) < RECORD * 8
+ self.assertEqual(11, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 6.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 8
+ assert len(stream) < RECORD * 9
+ self.assertEqual(12, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ compresslevel=0)]))
+ assert len(stream) > RECORD * 8
+ assert len(stream) < RECORD * 9
+ self.assertEqual(12, len(stream.strip().split('\n')))
+
+ def test_encode_Blobs(self):
+ self.touch(('a', 'a'))
+ self.touch(('b', 'bb'))
+ self.touch(('c', 'ccc'))
+
+ stream = ''.join([i for i in parcel.encode([
+ (1, None, [
+ File('a', 'digest', [('num', 1)]),
+ File('b', 'digest', [('num', 2)]),
+ ]),
+ (2, None, [
+ File('c', 'digest', [('num', 3)]),
+ ]),
+ ])])
+
+ self.assertEqual(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'num': 1}) + '\n' +
+ 'a' + '\n' +
+ json.dumps({'num': 2}) + '\n' +
+ 'bb' + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'num': 3}) + '\n' +
+ 'ccc' + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ unzips(stream))
+
+ def test_limited_encode_Blobs(self):
+ RECORD = 1024 * 1024
+ self.touch(('blob', '.' * RECORD))
+
+ def content():
+ yield File('blob', 'digest')
+ yield File('blob', 'digest')
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD / 2, compresslevel=0)]))
+ assert len(stream) < RECORD
+ self.assertEqual(4, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 1.5, compresslevel=0)]))
+ assert len(stream) > RECORD
+ assert len(stream) < RECORD * 2
+ self.assertEqual(6, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 2.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 2
+ assert len(stream) < RECORD * 3
+ self.assertEqual(8, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 3.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 3
+ assert len(stream) < RECORD * 4
+ self.assertEqual(10, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 4.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 4
+ self.assertEqual(12, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ compresslevel=0)]))
+ assert len(stream) > RECORD * 4
+ self.assertEqual(12, len(stream.strip().split('\n')))
+
+ def test_limited_encode_FinalBlobs(self):
+ RECORD = 1024 * 1024
+ self.touch(('blob', '.' * RECORD))
+
+ def content():
+ try:
+ yield File('blob', 'digest')
+ yield File('blob', 'digest')
+ except StopIteration:
+ pass
+ yield None
+ yield File('blob', 'digest')
+ yield File('blob', 'digest')
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD / 2, compresslevel=0)]))
+ assert len(stream) > RECORD * 4
+ assert len(stream) < RECORD * 5
+ self.assertEqual(12, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 1.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 5
+ assert len(stream) < RECORD * 6
+ self.assertEqual(14, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 2.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 6
+ assert len(stream) < RECORD * 7
+ self.assertEqual(16, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 3.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 6
+ assert len(stream) < RECORD * 7
+ self.assertEqual(16, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 4.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 6
+ assert len(stream) < RECORD * 7
+ self.assertEqual(16, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 5.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 7
+ assert len(stream) < RECORD * 8
+ self.assertEqual(18, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ limit=RECORD * 6.5, compresslevel=0)]))
+ assert len(stream) > RECORD * 8
+ assert len(stream) < RECORD * 9
+ self.assertEqual(20, len(stream.strip().split('\n')))
+
+ stream = unzips(''.join([i for i in parcel.encode([
+ ('first', None, content()),
+ ('second', None, content()),
+ ],
+ compresslevel=0)]))
+ assert len(stream) > RECORD * 8
+ assert len(stream) < RECORD * 9
+ self.assertEqual(20, len(stream.strip().split('\n')))
+
+ def test_decode_dir(self):
+ stream = zips(
+ json.dumps({'foo': 'bar'}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'num': 1, 'content-length': '8'}) + '\n' +
+ 'content1' + '\n' +
+ json.dumps({'payload': 2}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ self.touch(('parcels/1.parcel', stream.getvalue()))
+
+ stream = zips(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'num': 2, 'content-length': '8'}) + '\n' +
+ 'content2' + '\n' +
+ json.dumps({'payload': 3}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ self.touch(('parcels/2.parcel', stream.getvalue()))
+
+ packets_iter = parcel.decode_dir('parcels')
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ self.assertEqual({'packet': 2}, packet.props)
+ items = iter(packet)
+ blob = next(items)
+ self.assertEqual({'num': 2, 'content-length': '8'}, blob)
+ self.assertEqual('content2', file(blob.path).read())
+ self.assertEqual({'payload': 3}, next(items))
+ self.assertRaises(StopIteration, items.next)
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ self.assertEqual({'foo': 'bar', 'packet': 1}, packet.props)
+ items = iter(packet)
+ self.assertEqual({'payload': 1}, next(items))
+ blob = next(items)
+ self.assertEqual({'num': 1, 'content-length': '8'}, blob)
+ self.assertEqual('content1', file(blob.path).read())
+ self.assertEqual({'payload': 2}, next(items))
+ self.assertRaises(StopIteration, items.next)
+ self.assertRaises(StopIteration, packets_iter.next)
+
+ def test_decode_dir_RemoveOutdatedParcels(self):
+ stream = zips(
+ json.dumps({'from': 'principal'}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ self.touch(('parcels/1.parcel', stream.getvalue()))
+
+ stream = zips(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'payload': 2}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ self.touch(('parcels/2.parcel', stream.getvalue()))
+
+ packets_iter = parcel.decode_dir('parcels', recipient='principal')
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ self.assertRaises(StopIteration, packets_iter.next)
+ assert not exists('parcels/1.parcel')
+ assert exists('parcels/2.parcel')
+
+ stream = zips(
+ json.dumps({'from': 'principal', 'session': 'old'}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ self.touch(('parcels/3.parcel', stream.getvalue()))
+
+ packets_iter = parcel.decode_dir('parcels', recipient='principal', session='new')
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ self.assertRaises(StopIteration, packets_iter.next)
+ assert not exists('parcels/1.parcel')
+ assert exists('parcels/2.parcel')
+ assert not exists('parcels/3.parcel')
+
+ def test_decode_dir_SkipTheSameSessionParcels(self):
+ stream = zips(
+ json.dumps({'from': 'principal', 'session': 'new'}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ self.touch(('parcels/1.parcel', stream.getvalue()))
+
+ stream = zips(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'payload': 2}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n'
+ )
+ self.touch(('parcels/2.parcel', stream.getvalue()))
+
+ packets_iter = parcel.decode_dir('parcels', recipient='principal', session='new')
+ with next(packets_iter) as packet:
+ self.assertEqual(2, packet.name)
+ self.assertRaises(StopIteration, packets_iter.next)
+ assert exists('parcels/1.parcel')
+ assert exists('parcels/2.parcel')
+
+ def test_encode_dir(self):
+ self.touch(('blob', 'content'))
+ parcel.encode_dir([
+ (1, None, [
+ {'payload': 1},
+ File('blob', 'digest', [('num', 1)]),
+ {'payload': 2},
+ ]),
+ (2, None, [
+ File('blob', 'digest', [('num', 2)]),
+ {'payload': 3},
+ ]),
+ ], path='./parcel')
+
+ assert exists('parcel')
+
+ self.assertEqual(
+ json.dumps({}) + '\n' +
+ json.dumps({'packet': 1}) + '\n' +
+ json.dumps({'payload': 1}) + '\n' +
+ json.dumps({'num': 1}) + '\n' +
+ 'content' + '\n' +
+ json.dumps({'payload': 2}) + '\n' +
+ json.dumps({'packet': 2}) + '\n' +
+ json.dumps({'num': 2}) + '\n' +
+ 'content' + '\n' +
+ json.dumps({'payload': 3}) + '\n' +
+ json.dumps({'packet': 'last'}) + '\n',
+ unzips(file('parcel').read()))
+
+
+def zips(data):
+ result = StringIO()
+ f = gzip.GzipFile(fileobj=result, mode='wb')
+ f.write(data)
+ f.close()
+ result.seek(0)
+ return result
+
+
+def unzips(data):
+ return gzip.GzipFile(fileobj=StringIO(data)).read()
+
+
+if __name__ == '__main__':
+ tests.main()