diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-02-27 12:21:23 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-02-27 12:21:23 (GMT) |
commit | 1028755053ef3d8c538138b37e61ece13b9c1a23 (patch) | |
tree | 5d4dc4166e88018433166836677c06e3fcebfe79 /sugar_network | |
parent | 2aed09c3b60188063623eecee4a0f79592a4719e (diff) |
Zip sync stream all timestats_user.removed
Diffstat (limited to 'sugar_network')
-rw-r--r-- | sugar_network/db/blobs.py | 12 | ||||
-rw-r--r-- | sugar_network/db/metadata.py | 6 | ||||
-rw-r--r-- | sugar_network/node/sync.py | 307 | ||||
-rw-r--r-- | sugar_network/toolkit/__init__.py | 3 | ||||
-rw-r--r-- | sugar_network/toolkit/parcel.py | 336 | ||||
-rw-r--r-- | sugar_network/toolkit/router.py | 4 |
6 files changed, 355 insertions, 313 deletions
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index da06483..a9d66e0 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -37,8 +37,14 @@ def init(path): os.makedirs(_root) -def post(content, mime_type=None, digest_to_assert=None): - meta = [] +def post(content, mime_type=None, digest_to_assert=None, meta=None): + if meta is None: + meta = [] + meta.append(('content-type', mime_type or 'application/octet-stream')) + else: + meta = meta.items() + if mime_type: + meta.append(('content-type', mime_type)) @contextmanager def write_blob(): @@ -70,7 +76,7 @@ def post(content, mime_type=None, digest_to_assert=None): blob.unlink() raise http.BadRequest('Digest mismatch') path = _path(digest) - meta.append(('content-type', mime_type or 'application/octet-stream')) + meta.append(('content-length', str(blob.tell()))) with toolkit.new_file(path + _META_SUFFIX) as f: for key, value in meta: f.write('%s: %s\n' % (key, value)) diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py index 88d644b..9ba5998 100644 --- a/sugar_network/db/metadata.py +++ b/sugar_network/db/metadata.py @@ -311,7 +311,11 @@ class Blob(Property): return '' if not isinstance(value, dict): - mime_type = this.request.content_type or self.mime_type + mime_type = None + if this.request.prop == self.name: + mime_type = this.request.content_type + if not mime_type: + mime_type = self.mime_type return blobs.post(value, mime_type).digest digest = this.resource[self.name] if self.name else None diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py deleted file mode 100644 index f5b946c..0000000 --- a/sugar_network/node/sync.py +++ /dev/null @@ -1,307 +0,0 @@ -# Copyright (C) 2012-2013 Aleksey Lim -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -import os -import gzip -import zlib -import json -import logging -from cStringIO import StringIO -from types import GeneratorType -from os.path import exists, join, dirname, basename, splitext - -from sugar_network import toolkit -from sugar_network.toolkit import coroutine, enforce - - -# Filename suffix to use for sneakernet synchronization files -_SNEAKERNET_SUFFIX = '.sneakernet' - -# Leave at leat n bytes in fs whle calling `encode_to_file()` -_SNEAKERNET_RESERVED_SIZE = 1024 * 1024 - -_logger = logging.getLogger('node.sync') - - -def decode(stream): - packet = _PacketsIterator(stream) - while True: - packet.next() - if packet.name == 'last': - break - yield packet - - -def encode(packets, **header): - return _encode(None, packets, False, header, _EncodingStatus()) - - -def limited_encode(limit, packets, **header): - return _encode(limit, packets, False, header, _EncodingStatus()) - - -def package_decode(stream): - stream = _GzipStream(stream) - package_props = json.loads(stream.readline()) - - for packet in decode(stream): - packet.props.update(package_props) - yield packet - - -def package_encode(packets, **header): - # XXX Only for small amount of data - # TODO Support real streaming - buf = StringIO() - zipfile = gzip.GzipFile(mode='wb', fileobj=buf) - - header['filename'] = toolkit.uuid() + _SNEAKERNET_SUFFIX - json.dump(header, zipfile) - zipfile.write('\n') - - for chunk in _encode(None, packets, False, None, _EncodingStatus()): - zipfile.write(chunk) - zipfile.close() - - yield buf.getvalue() - - -def sneakernet_decode(root, node=None, session=None): - for root, __, files in os.walk(root): - for filename in files: - if not filename.endswith(_SNEAKERNET_SUFFIX): - continue - zipfile = gzip.open(join(root, filename), 'rb') - try: - package_props = json.loads(zipfile.readline()) - - if node is not None and package_props.get('src') == node: - if package_props.get('session') == session: - _logger.debug('Skip session %r sneakernet package', - zipfile.name) - else: - _logger.debug('Remove outdate %r sneakernet package', - zipfile.name) - os.unlink(zipfile.name) - continue - - for packet in decode(zipfile): - packet.props.update(package_props) - yield packet - finally: - zipfile.close() - - -def sneakernet_encode(packets, root=None, limit=None, path=None, **header): - if path is None: - if not exists(root): - os.makedirs(root) - filename = toolkit.uuid() + _SNEAKERNET_SUFFIX - path = toolkit.unique_filename(root, filename) - else: - filename = splitext(basename(path))[0] + _SNEAKERNET_SUFFIX - if 'filename' not in header: - header['filename'] = filename - - if limit <= 0: - stat = os.statvfs(dirname(path)) - limit = stat.f_bfree * stat.f_frsize - _SNEAKERNET_RESERVED_SIZE - - _logger.debug('Creating %r sneakernet package, limit=%s header=%r', - path, limit, header) - - status = _EncodingStatus() - with file(path, 'wb') as package: - zipfile = gzip.GzipFile(fileobj=package) - try: - json.dump(header, zipfile) - zipfile.write('\n') - - pos = None - encoder = _encode(limit, packets, True, None, status) - while True: - try: - chunk = encoder.send(pos) - zipfile.write(chunk) - pos = zipfile.fileobj.tell() - coroutine.dispatch() - except StopIteration: - break - - except Exception: - _logger.debug('Emergency removing %r package', path) - package.close() - os.unlink(path) - raise - else: - zipfile.close() - package.flush() - os.fsync(package.fileno()) - - return not status.aborted - - -class _EncodingStatus(object): - - aborted = False - - -def _encode(limit, packets, download_blobs, header, status): - for packet, props, content in packets: - if status.aborted: - break - - if props is None: - props = {} - if header: - props.update(header) - props['packet'] = packet - pos = (yield json.dumps(props) + '\n') or 0 - - if content is None: - continue - - content = iter(content) - try: - record = next(content) - - while True: - blob = None - blob_size = 0 - if 'blob' in record: - blob = record.pop('blob') - blob_size = record['blob_size'] - - dump = json.dumps(record) + '\n' - if not status.aborted and limit is not None and \ - pos + len(dump) + blob_size > limit: - status.aborted = True - if not isinstance(content, GeneratorType): - raise StopIteration() - record = content.throw(StopIteration()) - continue - pos = (yield dump) or 0 - - if blob is not None: - for chunk in blob: - pos = (yield chunk) or 0 - blob_size -= len(chunk) - enforce(blob_size == 0, EOFError, - 'File size is not the same as declared') - - record = next(content) - except StopIteration: - pass - - yield json.dumps({'packet': 'last'}) + '\n' - - -class _PacketsIterator(object): - - def __init__(self, stream): - if not hasattr(stream, 'readline'): - stream.readline = lambda: toolkit.readline(stream) - if hasattr(stream, 'seek'): - self._seek = stream.seek - self._stream = stream - self.props = {} - self._name = None - self._shift = True - - @property - def name(self): - return self._name - - def next(self): - if self._shift: - for __ in self: - pass - if self._name is None: - raise EOFError() - self._shift = True - - def __repr__(self): - return '<SyncPacket %r>' % self.props - - def __getitem__(self, key): - return self.props.get(key) - - def __iter__(self): - blob = None - while True: - if blob is not None and blob.size_to_read: - self._seek(blob.size_to_read, 1) - blob = None - record = self._stream.readline() - if not record: - self._name = None - raise EOFError() - record = json.loads(record) - if 'packet' in record: - self._name = record['packet'] or '' - self.props = record - self._shift = False - break - blob_size = record.get('blob_size') - if blob_size: - blob = record['blob'] = _Blob(self._stream, blob_size) - yield record - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, traceback): - pass - - # pylint: disable-msg=E0202 - def _seek(self, distance, where): - while distance: - chunk = self._stream.read(min(distance, toolkit.BUFFER_SIZE)) - distance -= len(chunk) - - -class _Blob(object): - - def __init__(self, stream, size): - self._stream = stream - self.size_to_read = size - - def read(self, size=toolkit.BUFFER_SIZE): - chunk = self._stream.read(min(size, self.size_to_read)) - self.size_to_read -= len(chunk) - return chunk - - -class _GzipStream(object): - - def __init__(self, stream): - self._stream = stream - self._zip = zlib.decompressobj(16 + zlib.MAX_WBITS) - self._buffer = bytearray() - - def read(self, size): - while True: - if size <= len(self._buffer): - result = self._buffer[:size] - self._buffer = self._buffer[size:] - return bytes(result) - chunk = self._stream.read(size) - if not chunk: - result, self._buffer = self._buffer, bytearray() - return result - self._buffer += self._zip.decompress(chunk) - - def readline(self): - return toolkit.readline(self) diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index 8acfe27..792267a 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -782,6 +782,9 @@ class _NewFile(object): def name(self, value): self.dst_path = value + def tell(self): + return self._file.file.tell() + def close(self): self._file.close() if exists(self.name): diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py new file mode 100644 index 0000000..457ea07 --- /dev/null +++ b/sugar_network/toolkit/parcel.py @@ -0,0 +1,336 @@ +# Copyright (C) 2012-2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import sys +import zlib +import time +import json +import struct +import logging +from types import GeneratorType +from os.path import dirname, exists, join + +from sugar_network import toolkit +from sugar_network.toolkit.router import File +from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce + + +_FILENAME_SUFFIX = '.parcel' +_RESERVED_DISK_SPACE = 1024 * 1024 + +_ZLIB_WBITS = 15 +_ZLIB_WBITS_SIZE = 32768 # 2 ** 15 + +_logger = logging.getLogger('parcel') + + +def decode(stream, limit=None): + _logger.debug('Decode %r stream limit=%r', stream, limit) + + stream = _UnzipStream(stream, limit) + header = stream.read_record() + + packet = _DecodeIterator(stream) + while True: + packet.next() + if packet.name == 'last': + break + packet.props.update(header) + yield packet + + +def encode(packets, limit=None, header=None, compresslevel=6): + _logger.debug('Encode %r packets limit=%r header=%r', + packets, limit, header) + + ostream = _ZipStream(compresslevel) + + if limit is None: + limit = sys.maxint + if header is None: + header = {} + chunk = ostream.write_record(header) + if chunk: + yield chunk + + for packet, props, content in packets: + if props is None: + props = {} + props['packet'] = packet + chunk = ostream.write_record(props) + if chunk: + yield chunk + + if content is None: + continue + + content = iter(content) + try: + finalizing = False + record = next(content) + while True: + if record is None: + finalizing = True + record = next(content) + continue + blob_len = 0 + if isinstance(record, File) and record.path: + blob_len = record.size + chunk = ostream.write_record(record, + None if finalizing else limit - blob_len) + if chunk is None: + _logger.debug('Reach the encoding limit') + if not isinstance(content, GeneratorType): + raise StopIteration() + finalizing = True + record = content.throw(StopIteration()) + continue + if chunk: + yield chunk + if blob_len: + with file(record.path, 'rb') as blob: + while True: + chunk = blob.read(BUFFER_SIZE) + if not chunk: + break + blob_len -= len(chunk) + if not blob_len: + chunk += '\n' + chunk = ostream.write(chunk) + if chunk: + yield chunk + enforce(blob_len == 0, EOFError, 'Blob size mismatch') + record = next(content) + except StopIteration: + pass + + chunk = ostream.write_record({'packet': 'last'}) + if chunk: + yield chunk + chunk = ostream.flush() + if chunk: + yield chunk + + +def decode_dir(root, recipient=None, session=None): + for root, __, files in os.walk(root): + for filename in files: + if not filename.endswith(_FILENAME_SUFFIX): + continue + with file(join(root, filename), 'rb') as parcel: + for packet in decode(parcel): + if recipient is not None and packet['from'] == recipient: + if session and packet['session'] == session: + _logger.debug('Skip the same session %r parcel', + parcel.name) + else: + _logger.debug('Remove outdated %r parcel', + parcel.name) + os.unlink(parcel.name) + break + yield packet + + +def encode_dir(packets, root=None, limit=None, path=None, sender=None, + header=None): + if path is None: + if not exists(root): + os.makedirs(root) + path = toolkit.unique_filename(root, toolkit.uuid() + _FILENAME_SUFFIX) + if limit <= 0: + stat = os.statvfs(dirname(path)) + limit = stat.f_bfree * stat.f_frsize - _RESERVED_DISK_SPACE + if header is None: + header = {} + if sender is not None: + header['from'] = sender + + _logger.debug('Creating %r parcel limit=%s header=%r', path, limit, header) + + with toolkit.NamedTemporaryFile(dir=dirname(path)) as parcel: + for chunk in encode(packets, limit, header): + parcel.write(chunk) + coroutine.dispatch() + parcel.flush() + os.fsync(parcel.fileno()) + os.rename(parcel.name, path) + + +class _DecodeIterator(object): + + def __init__(self, stream): + self._stream = stream + self.props = {} + self._name = None + self._shift = True + + @property + def name(self): + return self._name + + def next(self): + if self._shift: + for __ in self: + pass + if self._name is None: + raise EOFError() + self._shift = True + + def __repr__(self): + return '<Packet %r>' % self.props + + def __getitem__(self, key): + return self.props.get(key) + + def __iter__(self): + while True: + record = self._stream.read_record() + if record is None: + self._name = None + raise EOFError() + if 'packet' in record: + self._name = record['packet'] or '' + self.props = record + self._shift = False + break + blob_len = record.get('content-length') + if blob_len is None: + yield record + continue + blob_len = int(blob_len) + with toolkit.NamedTemporaryFile() as blob: + while blob_len: + chunk = self._stream.read(min(blob_len, BUFFER_SIZE)) + enforce(chunk, 'Blob size mismatch') + blob.write(chunk) + blob_len -= len(chunk) + blob.flush() + yield File(blob.name, meta=record) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + pass + + +class _ZipStream(object): + + def __init__(self, compresslevel=6): + self._zipper = zlib.compressobj(compresslevel, + zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0) + self._offset = 0 + self._size = 0 + self._crc = zlib.crc32('') & 0xffffffffL + + def write_record(self, record, limit=None): + chunk = json.dumps(record) + '\n' + if limit is not None and self._offset + len(chunk) > limit: + return None + return self.write(chunk) + + def write(self, chunk): + self._size += len(chunk) + self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL + chunk = self._zipper.compress(chunk) + + if self._offset == 0: + chunk = '\037\213' + '\010' + chr(0) + \ + struct.pack('<L', long(time.time())) + \ + '\002' + '\377' + \ + chunk + self._offset = _ZLIB_WBITS_SIZE + if chunk: + self._offset += len(chunk) + + return chunk + + def flush(self): + chunk = self._zipper.flush() + \ + struct.pack('<L', self._crc) + \ + struct.pack('<L', self._size & 0xffffffffL) + self._offset += len(chunk) + return chunk + + +class _UnzipStream(object): + + def __init__(self, stream, limit): + self._stream = stream + self._limit = limit + self._unzipper = zlib.decompressobj(-_ZLIB_WBITS) + self._crc = zlib.crc32('') & 0xffffffffL + self._size = 0 + self._buffer = '' + + if self._limit is not None: + self._limit -= 10 + magic = stream.read(2) + enforce(magic == '\037\213', http.BadRequest, + 'Not a gzipped file') + enforce(ord(stream.read(1)) == 8, http.BadRequest, + 'Unknown compression method') + enforce(ord(stream.read(1)) == 0, http.BadRequest, + 'Gzip flags should be empty') + stream.read(6) # Ignore the rest of header + + def read_record(self): + while True: + parts = self._buffer.split('\n', 1) + if len(parts) == 1: + if self._read(BUFFER_SIZE): + continue + return None + result, self._buffer = parts + if not result: + continue + return json.loads(result) + + def read(self, size): + while len(self._buffer) == 0 and self._read(size): + pass + size = min(size, len(self._buffer)) + result = self._buffer[:size] + self._buffer = self._buffer[size:] + return result + + def _read(self, size): + if self._limit is not None: + size = min(size, self._limit) + chunk = self._stream.read(size) + + if chunk: + if self._limit is not None: + self._limit -= len(chunk) + self._add_to_buffer(self._unzipper.decompress(chunk)) + return True + + enforce(len(self._unzipper.unused_data) >= 8, http.BadRequest, + 'Malformed gzipped file') + crc = struct.unpack('<I', self._unzipper.unused_data[:4])[0] + enforce(crc == self._crc, http.BadRequest, 'CRC check failed') + size = struct.unpack('<I', self._unzipper.unused_data[4:8])[0] + enforce(size == self._size, http.BadRequest, 'Incorrect length') + + return self._add_to_buffer(self._unzipper.flush()) + + def _add_to_buffer(self, chunk): + if not chunk: + return False + self._buffer += chunk + self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL + self._size += len(chunk) + return True diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 00e5d8a..4206121 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -381,7 +381,7 @@ class Response(CaseInsensitiveDict): @content_length.setter def content_length(self, value): - self.set('content-length', value) + self.set('content-length', str(value)) @property def content_type(self): @@ -430,7 +430,7 @@ class File(CaseInsensitiveDict): self.path = path self.digest = File.Digest(digest) if digest else None if meta is not None: - for key, value in meta: + for key, value in meta.items() if isinstance(meta, dict) else meta: self[key] = value self._stat = None |