Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/toolkit/parcel.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/toolkit/parcel.py')
-rw-r--r--sugar_network/toolkit/parcel.py336
1 files changed, 336 insertions, 0 deletions
diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py
new file mode 100644
index 0000000..457ea07
--- /dev/null
+++ b/sugar_network/toolkit/parcel.py
@@ -0,0 +1,336 @@
+# Copyright (C) 2012-2014 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import zlib
+import time
+import json
+import struct
+import logging
+from types import GeneratorType
+from os.path import dirname, exists, join
+
+from sugar_network import toolkit
+from sugar_network.toolkit.router import File
+from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce
+
+
+_FILENAME_SUFFIX = '.parcel'
+_RESERVED_DISK_SPACE = 1024 * 1024
+
+_ZLIB_WBITS = 15
+_ZLIB_WBITS_SIZE = 32768 # 2 ** 15
+
+_logger = logging.getLogger('parcel')
+
+
+def decode(stream, limit=None):
+ _logger.debug('Decode %r stream limit=%r', stream, limit)
+
+ stream = _UnzipStream(stream, limit)
+ header = stream.read_record()
+
+ packet = _DecodeIterator(stream)
+ while True:
+ packet.next()
+ if packet.name == 'last':
+ break
+ packet.props.update(header)
+ yield packet
+
+
+def encode(packets, limit=None, header=None, compresslevel=6):
+ _logger.debug('Encode %r packets limit=%r header=%r',
+ packets, limit, header)
+
+ ostream = _ZipStream(compresslevel)
+
+ if limit is None:
+ limit = sys.maxint
+ if header is None:
+ header = {}
+ chunk = ostream.write_record(header)
+ if chunk:
+ yield chunk
+
+ for packet, props, content in packets:
+ if props is None:
+ props = {}
+ props['packet'] = packet
+ chunk = ostream.write_record(props)
+ if chunk:
+ yield chunk
+
+ if content is None:
+ continue
+
+ content = iter(content)
+ try:
+ finalizing = False
+ record = next(content)
+ while True:
+ if record is None:
+ finalizing = True
+ record = next(content)
+ continue
+ blob_len = 0
+ if isinstance(record, File) and record.path:
+ blob_len = record.size
+ chunk = ostream.write_record(record,
+ None if finalizing else limit - blob_len)
+ if chunk is None:
+ _logger.debug('Reach the encoding limit')
+ if not isinstance(content, GeneratorType):
+ raise StopIteration()
+ finalizing = True
+ record = content.throw(StopIteration())
+ continue
+ if chunk:
+ yield chunk
+ if blob_len:
+ with file(record.path, 'rb') as blob:
+ while True:
+ chunk = blob.read(BUFFER_SIZE)
+ if not chunk:
+ break
+ blob_len -= len(chunk)
+ if not blob_len:
+ chunk += '\n'
+ chunk = ostream.write(chunk)
+ if chunk:
+ yield chunk
+ enforce(blob_len == 0, EOFError, 'Blob size mismatch')
+ record = next(content)
+ except StopIteration:
+ pass
+
+ chunk = ostream.write_record({'packet': 'last'})
+ if chunk:
+ yield chunk
+ chunk = ostream.flush()
+ if chunk:
+ yield chunk
+
+
+def decode_dir(root, recipient=None, session=None):
+ for root, __, files in os.walk(root):
+ for filename in files:
+ if not filename.endswith(_FILENAME_SUFFIX):
+ continue
+ with file(join(root, filename), 'rb') as parcel:
+ for packet in decode(parcel):
+ if recipient is not None and packet['from'] == recipient:
+ if session and packet['session'] == session:
+ _logger.debug('Skip the same session %r parcel',
+ parcel.name)
+ else:
+ _logger.debug('Remove outdated %r parcel',
+ parcel.name)
+ os.unlink(parcel.name)
+ break
+ yield packet
+
+
+def encode_dir(packets, root=None, limit=None, path=None, sender=None,
+ header=None):
+ if path is None:
+ if not exists(root):
+ os.makedirs(root)
+ path = toolkit.unique_filename(root, toolkit.uuid() + _FILENAME_SUFFIX)
+ if limit <= 0:
+ stat = os.statvfs(dirname(path))
+ limit = stat.f_bfree * stat.f_frsize - _RESERVED_DISK_SPACE
+ if header is None:
+ header = {}
+ if sender is not None:
+ header['from'] = sender
+
+ _logger.debug('Creating %r parcel limit=%s header=%r', path, limit, header)
+
+ with toolkit.NamedTemporaryFile(dir=dirname(path)) as parcel:
+ for chunk in encode(packets, limit, header):
+ parcel.write(chunk)
+ coroutine.dispatch()
+ parcel.flush()
+ os.fsync(parcel.fileno())
+ os.rename(parcel.name, path)
+
+
+class _DecodeIterator(object):
+
+ def __init__(self, stream):
+ self._stream = stream
+ self.props = {}
+ self._name = None
+ self._shift = True
+
+ @property
+ def name(self):
+ return self._name
+
+ def next(self):
+ if self._shift:
+ for __ in self:
+ pass
+ if self._name is None:
+ raise EOFError()
+ self._shift = True
+
+ def __repr__(self):
+ return '<Packet %r>' % self.props
+
+ def __getitem__(self, key):
+ return self.props.get(key)
+
+ def __iter__(self):
+ while True:
+ record = self._stream.read_record()
+ if record is None:
+ self._name = None
+ raise EOFError()
+ if 'packet' in record:
+ self._name = record['packet'] or ''
+ self.props = record
+ self._shift = False
+ break
+ blob_len = record.get('content-length')
+ if blob_len is None:
+ yield record
+ continue
+ blob_len = int(blob_len)
+ with toolkit.NamedTemporaryFile() as blob:
+ while blob_len:
+ chunk = self._stream.read(min(blob_len, BUFFER_SIZE))
+ enforce(chunk, 'Blob size mismatch')
+ blob.write(chunk)
+ blob_len -= len(chunk)
+ blob.flush()
+ yield File(blob.name, meta=record)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+
+class _ZipStream(object):
+
+ def __init__(self, compresslevel=6):
+ self._zipper = zlib.compressobj(compresslevel,
+ zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0)
+ self._offset = 0
+ self._size = 0
+ self._crc = zlib.crc32('') & 0xffffffffL
+
+ def write_record(self, record, limit=None):
+ chunk = json.dumps(record) + '\n'
+ if limit is not None and self._offset + len(chunk) > limit:
+ return None
+ return self.write(chunk)
+
+ def write(self, chunk):
+ self._size += len(chunk)
+ self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL
+ chunk = self._zipper.compress(chunk)
+
+ if self._offset == 0:
+ chunk = '\037\213' + '\010' + chr(0) + \
+ struct.pack('<L', long(time.time())) + \
+ '\002' + '\377' + \
+ chunk
+ self._offset = _ZLIB_WBITS_SIZE
+ if chunk:
+ self._offset += len(chunk)
+
+ return chunk
+
+ def flush(self):
+ chunk = self._zipper.flush() + \
+ struct.pack('<L', self._crc) + \
+ struct.pack('<L', self._size & 0xffffffffL)
+ self._offset += len(chunk)
+ return chunk
+
+
+class _UnzipStream(object):
+
+ def __init__(self, stream, limit):
+ self._stream = stream
+ self._limit = limit
+ self._unzipper = zlib.decompressobj(-_ZLIB_WBITS)
+ self._crc = zlib.crc32('') & 0xffffffffL
+ self._size = 0
+ self._buffer = ''
+
+ if self._limit is not None:
+ self._limit -= 10
+ magic = stream.read(2)
+ enforce(magic == '\037\213', http.BadRequest,
+ 'Not a gzipped file')
+ enforce(ord(stream.read(1)) == 8, http.BadRequest,
+ 'Unknown compression method')
+ enforce(ord(stream.read(1)) == 0, http.BadRequest,
+ 'Gzip flags should be empty')
+ stream.read(6) # Ignore the rest of header
+
+ def read_record(self):
+ while True:
+ parts = self._buffer.split('\n', 1)
+ if len(parts) == 1:
+ if self._read(BUFFER_SIZE):
+ continue
+ return None
+ result, self._buffer = parts
+ if not result:
+ continue
+ return json.loads(result)
+
+ def read(self, size):
+ while len(self._buffer) == 0 and self._read(size):
+ pass
+ size = min(size, len(self._buffer))
+ result = self._buffer[:size]
+ self._buffer = self._buffer[size:]
+ return result
+
+ def _read(self, size):
+ if self._limit is not None:
+ size = min(size, self._limit)
+ chunk = self._stream.read(size)
+
+ if chunk:
+ if self._limit is not None:
+ self._limit -= len(chunk)
+ self._add_to_buffer(self._unzipper.decompress(chunk))
+ return True
+
+ enforce(len(self._unzipper.unused_data) >= 8, http.BadRequest,
+ 'Malformed gzipped file')
+ crc = struct.unpack('<I', self._unzipper.unused_data[:4])[0]
+ enforce(crc == self._crc, http.BadRequest, 'CRC check failed')
+ size = struct.unpack('<I', self._unzipper.unused_data[4:8])[0]
+ enforce(size == self._size, http.BadRequest, 'Incorrect length')
+
+ return self._add_to_buffer(self._unzipper.flush())
+
+ def _add_to_buffer(self, chunk):
+ if not chunk:
+ return False
+ self._buffer += chunk
+ self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL
+ self._size += len(chunk)
+ return True