# -*- coding: utf-8 -*- # # Copyright (C) 2012-2014 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import sys import zlib import time import json import struct import hashlib import logging from types import GeneratorType from os.path import dirname, exists, join from sugar_network import toolkit from sugar_network.toolkit.router import File from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce DEFAULT_COMPRESSLEVEL = 6 _FILENAME_SUFFIX = '.packet' _RESERVED_DISK_SPACE = 1024 * 1024 _ZLIB_WBITS = 15 _ZLIB_WBITS_SIZE = 32768 # 2 ** 15 _logger = logging.getLogger('packets') def decode(stream, limit=None): _logger.debug('Decode %r stream limit=%r', stream, limit) if limit is not None: limit -= 2 magic = stream.read(2) enforce(len(magic) == 2, http.BadRequest, 'Malformed packet') if magic == '\037\213': stream = _ZippedDecoder(stream, limit) else: stream = _Decoder(magic, stream, limit) header = stream.read_record() return _DecodeIterator(stream, header) def encode(items, limit=None, header=None, compresslevel=None, on_complete=None, download_blobs=False, **kwargs): _logger.debug('Encode %r limit=%r header=%r', items, limit, header) if compresslevel is 0: ostream = _Encoder() else: ostream = _ZippedEncoder(compresslevel) connection = http.Connection() if limit is None: limit = sys.maxint if header is None: header = kwargs else: header.update(kwargs) chunk = ostream.write_record(header) if chunk: yield chunk try: items = iter(items) record = next(items) multisegments = type(record) in (tuple, list) while True: if multisegments: packet, props, content = record if props is None: props = {} props['segment'] = packet chunk = ostream.write_record(props) if chunk: yield chunk if content: content = iter(content) record = next(content) else: content = iter([]) record = None else: content = items try: finalizing = False while True: if record is None: finalizing = True record = next(content) continue blob_len = 0 if isinstance(record, File): blob_len = record.size chunk = record.meta if not record.path: chunk['digest'] = record.digest else: chunk = record chunk = ostream.write_record(chunk, None if finalizing else limit - blob_len) if chunk is None: _logger.debug('Reach the encoding limit') on_complete = None if not isinstance(content, GeneratorType): raise StopIteration() finalizing = True record = content.throw(StopIteration()) continue if chunk: yield chunk if blob_len and (record.path or download_blobs): if record.path: blob_content = record.iter_content() else: url = record.meta['location'] enforce(url, http.NotFound, 'No location') blob_content = connection.download(url) for chunk in blob_content: blob_len -= len(chunk) if not blob_len: chunk += '\n' chunk = ostream.write(chunk) if chunk: yield chunk enforce(blob_len == 0, EOFError, 'Blob size mismatch') record = next(content) except StopIteration: pass if multisegments: record = next(items) continue break finally: if on_complete is not None: on_complete() chunk = ostream.flush() if chunk: yield chunk def decode_dir(root, recipient=None, session=None): for root, __, files in os.walk(root): for filename in files: if not filename.endswith(_FILENAME_SUFFIX): continue with file(join(root, filename), 'rb') as packets: packet = decode(packets) if recipient is not None and packet['from'] == recipient: if session and packet['session'] == session: _logger.debug('Skip the same session %r packet', packets.name) else: _logger.debug('Remove outdated %r packet', packets.name) os.unlink(packets.name) continue for i in packet: yield i def encode_dir(packets, root=None, limit=None, path=None, sender=None, header=None): if path is None: if not exists(root): os.makedirs(root) path = toolkit.unique_filename(root, toolkit.uuid() + _FILENAME_SUFFIX) if limit <= 0: stat = os.statvfs(dirname(path)) limit = stat.f_bfree * stat.f_frsize - _RESERVED_DISK_SPACE if header is None: header = {} if sender is not None: header['from'] = sender _logger.debug('Creating %r packet limit=%s header=%r', path, limit, header) with toolkit.NamedTemporaryFile(dir=dirname(path)) as f: for chunk in encode(packets, limit, header): f.write(chunk) coroutine.dispatch() f.flush() os.fsync(f.fileno()) os.rename(f.name, path) class _DecodeIterator(object): def __init__(self, stream, header): self._stream = stream self.header = header def __repr__(self): return '' % self.header def __getitem__(self, key): return self.header.get(key) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): pass def __iter__(self): while True: record = self._stream.read_record() if record is None: break if 'segment' in record: while record is not None: record.update(self.header) segment = _SegmentIterator(self._stream, record) yield segment record = segment.next_segment if record is not None: continue while True: record = self._stream.read_record() if record is None or 'segment' in record: break break for i in self._process_record(record): yield i def _process_record(self, record): blob_len = record.get('content-length') if blob_len is None: yield record return if 'location' in record: yield File(None, digest=record.pop('digest'), meta=record) return blob_len = int(blob_len) with toolkit.NamedTemporaryFile() as blob: digest = hashlib.sha1() while blob_len: chunk = self._stream.read(min(blob_len, BUFFER_SIZE)) enforce(chunk, 'Blob size mismatch') blob.write(chunk) blob_len -= len(chunk) digest.update(chunk) blob.flush() yield File(blob.name, digest=digest.hexdigest(), meta=record) class _SegmentIterator(_DecodeIterator): next_segment = None @property def name(self): return self.header['segment'] def __iter__(self): while True: record = self._stream.read_record() if record is None: break if 'segment' in record: self.next_segment = record break for i in self._process_record(record): yield i class _Encoder(object): def __init__(self): self._offset = 0 def write_record(self, record, limit=None): chunk = json.dumps(record) + '\n' if limit is not None and self._offset + len(chunk) > limit: return None return self.write(chunk) def write(self, chunk): chunk = self._encode(chunk) if chunk: self._offset += len(chunk) return chunk def flush(self): chunk = self._flush() self._offset += len(chunk) return chunk def _encode(self, chunk): return chunk def _flush(self): return '' class _ZippedEncoder(_Encoder): def __init__(self, compresslevel=None): _Encoder.__init__(self) if compresslevel is None: compresslevel = DEFAULT_COMPRESSLEVEL self._zipper = zlib.compressobj(compresslevel, zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0) self._size = 0 self._crc = zlib.crc32('') & 0xffffffffL def _encode(self, chunk): self._size += len(chunk) self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL chunk = self._zipper.compress(chunk) if self._offset == 0: chunk = '\037\213' + '\010' + chr(0) + \ struct.pack('= 8, http.BadRequest, 'Malformed gzipped file') crc = struct.unpack('