Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2014-02-27 12:21:23 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2014-02-27 12:21:23 (GMT)
commit1028755053ef3d8c538138b37e61ece13b9c1a23 (patch)
tree5d4dc4166e88018433166836677c06e3fcebfe79 /sugar_network
parent2aed09c3b60188063623eecee4a0f79592a4719e (diff)
Zip sync stream all timestats_user.removed
Diffstat (limited to 'sugar_network')
-rw-r--r--sugar_network/db/blobs.py12
-rw-r--r--sugar_network/db/metadata.py6
-rw-r--r--sugar_network/node/sync.py307
-rw-r--r--sugar_network/toolkit/__init__.py3
-rw-r--r--sugar_network/toolkit/parcel.py336
-rw-r--r--sugar_network/toolkit/router.py4
6 files changed, 355 insertions, 313 deletions
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py
index da06483..a9d66e0 100644
--- a/sugar_network/db/blobs.py
+++ b/sugar_network/db/blobs.py
@@ -37,8 +37,14 @@ def init(path):
os.makedirs(_root)
-def post(content, mime_type=None, digest_to_assert=None):
- meta = []
+def post(content, mime_type=None, digest_to_assert=None, meta=None):
+ if meta is None:
+ meta = []
+ meta.append(('content-type', mime_type or 'application/octet-stream'))
+ else:
+ meta = meta.items()
+ if mime_type:
+ meta.append(('content-type', mime_type))
@contextmanager
def write_blob():
@@ -70,7 +76,7 @@ def post(content, mime_type=None, digest_to_assert=None):
blob.unlink()
raise http.BadRequest('Digest mismatch')
path = _path(digest)
- meta.append(('content-type', mime_type or 'application/octet-stream'))
+ meta.append(('content-length', str(blob.tell())))
with toolkit.new_file(path + _META_SUFFIX) as f:
for key, value in meta:
f.write('%s: %s\n' % (key, value))
diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py
index 88d644b..9ba5998 100644
--- a/sugar_network/db/metadata.py
+++ b/sugar_network/db/metadata.py
@@ -311,7 +311,11 @@ class Blob(Property):
return ''
if not isinstance(value, dict):
- mime_type = this.request.content_type or self.mime_type
+ mime_type = None
+ if this.request.prop == self.name:
+ mime_type = this.request.content_type
+ if not mime_type:
+ mime_type = self.mime_type
return blobs.post(value, mime_type).digest
digest = this.resource[self.name] if self.name else None
diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py
deleted file mode 100644
index f5b946c..0000000
--- a/sugar_network/node/sync.py
+++ /dev/null
@@ -1,307 +0,0 @@
-# Copyright (C) 2012-2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import gzip
-import zlib
-import json
-import logging
-from cStringIO import StringIO
-from types import GeneratorType
-from os.path import exists, join, dirname, basename, splitext
-
-from sugar_network import toolkit
-from sugar_network.toolkit import coroutine, enforce
-
-
-# Filename suffix to use for sneakernet synchronization files
-_SNEAKERNET_SUFFIX = '.sneakernet'
-
-# Leave at leat n bytes in fs whle calling `encode_to_file()`
-_SNEAKERNET_RESERVED_SIZE = 1024 * 1024
-
-_logger = logging.getLogger('node.sync')
-
-
-def decode(stream):
- packet = _PacketsIterator(stream)
- while True:
- packet.next()
- if packet.name == 'last':
- break
- yield packet
-
-
-def encode(packets, **header):
- return _encode(None, packets, False, header, _EncodingStatus())
-
-
-def limited_encode(limit, packets, **header):
- return _encode(limit, packets, False, header, _EncodingStatus())
-
-
-def package_decode(stream):
- stream = _GzipStream(stream)
- package_props = json.loads(stream.readline())
-
- for packet in decode(stream):
- packet.props.update(package_props)
- yield packet
-
-
-def package_encode(packets, **header):
- # XXX Only for small amount of data
- # TODO Support real streaming
- buf = StringIO()
- zipfile = gzip.GzipFile(mode='wb', fileobj=buf)
-
- header['filename'] = toolkit.uuid() + _SNEAKERNET_SUFFIX
- json.dump(header, zipfile)
- zipfile.write('\n')
-
- for chunk in _encode(None, packets, False, None, _EncodingStatus()):
- zipfile.write(chunk)
- zipfile.close()
-
- yield buf.getvalue()
-
-
-def sneakernet_decode(root, node=None, session=None):
- for root, __, files in os.walk(root):
- for filename in files:
- if not filename.endswith(_SNEAKERNET_SUFFIX):
- continue
- zipfile = gzip.open(join(root, filename), 'rb')
- try:
- package_props = json.loads(zipfile.readline())
-
- if node is not None and package_props.get('src') == node:
- if package_props.get('session') == session:
- _logger.debug('Skip session %r sneakernet package',
- zipfile.name)
- else:
- _logger.debug('Remove outdate %r sneakernet package',
- zipfile.name)
- os.unlink(zipfile.name)
- continue
-
- for packet in decode(zipfile):
- packet.props.update(package_props)
- yield packet
- finally:
- zipfile.close()
-
-
-def sneakernet_encode(packets, root=None, limit=None, path=None, **header):
- if path is None:
- if not exists(root):
- os.makedirs(root)
- filename = toolkit.uuid() + _SNEAKERNET_SUFFIX
- path = toolkit.unique_filename(root, filename)
- else:
- filename = splitext(basename(path))[0] + _SNEAKERNET_SUFFIX
- if 'filename' not in header:
- header['filename'] = filename
-
- if limit <= 0:
- stat = os.statvfs(dirname(path))
- limit = stat.f_bfree * stat.f_frsize - _SNEAKERNET_RESERVED_SIZE
-
- _logger.debug('Creating %r sneakernet package, limit=%s header=%r',
- path, limit, header)
-
- status = _EncodingStatus()
- with file(path, 'wb') as package:
- zipfile = gzip.GzipFile(fileobj=package)
- try:
- json.dump(header, zipfile)
- zipfile.write('\n')
-
- pos = None
- encoder = _encode(limit, packets, True, None, status)
- while True:
- try:
- chunk = encoder.send(pos)
- zipfile.write(chunk)
- pos = zipfile.fileobj.tell()
- coroutine.dispatch()
- except StopIteration:
- break
-
- except Exception:
- _logger.debug('Emergency removing %r package', path)
- package.close()
- os.unlink(path)
- raise
- else:
- zipfile.close()
- package.flush()
- os.fsync(package.fileno())
-
- return not status.aborted
-
-
-class _EncodingStatus(object):
-
- aborted = False
-
-
-def _encode(limit, packets, download_blobs, header, status):
- for packet, props, content in packets:
- if status.aborted:
- break
-
- if props is None:
- props = {}
- if header:
- props.update(header)
- props['packet'] = packet
- pos = (yield json.dumps(props) + '\n') or 0
-
- if content is None:
- continue
-
- content = iter(content)
- try:
- record = next(content)
-
- while True:
- blob = None
- blob_size = 0
- if 'blob' in record:
- blob = record.pop('blob')
- blob_size = record['blob_size']
-
- dump = json.dumps(record) + '\n'
- if not status.aborted and limit is not None and \
- pos + len(dump) + blob_size > limit:
- status.aborted = True
- if not isinstance(content, GeneratorType):
- raise StopIteration()
- record = content.throw(StopIteration())
- continue
- pos = (yield dump) or 0
-
- if blob is not None:
- for chunk in blob:
- pos = (yield chunk) or 0
- blob_size -= len(chunk)
- enforce(blob_size == 0, EOFError,
- 'File size is not the same as declared')
-
- record = next(content)
- except StopIteration:
- pass
-
- yield json.dumps({'packet': 'last'}) + '\n'
-
-
-class _PacketsIterator(object):
-
- def __init__(self, stream):
- if not hasattr(stream, 'readline'):
- stream.readline = lambda: toolkit.readline(stream)
- if hasattr(stream, 'seek'):
- self._seek = stream.seek
- self._stream = stream
- self.props = {}
- self._name = None
- self._shift = True
-
- @property
- def name(self):
- return self._name
-
- def next(self):
- if self._shift:
- for __ in self:
- pass
- if self._name is None:
- raise EOFError()
- self._shift = True
-
- def __repr__(self):
- return '<SyncPacket %r>' % self.props
-
- def __getitem__(self, key):
- return self.props.get(key)
-
- def __iter__(self):
- blob = None
- while True:
- if blob is not None and blob.size_to_read:
- self._seek(blob.size_to_read, 1)
- blob = None
- record = self._stream.readline()
- if not record:
- self._name = None
- raise EOFError()
- record = json.loads(record)
- if 'packet' in record:
- self._name = record['packet'] or ''
- self.props = record
- self._shift = False
- break
- blob_size = record.get('blob_size')
- if blob_size:
- blob = record['blob'] = _Blob(self._stream, blob_size)
- yield record
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- pass
-
- # pylint: disable-msg=E0202
- def _seek(self, distance, where):
- while distance:
- chunk = self._stream.read(min(distance, toolkit.BUFFER_SIZE))
- distance -= len(chunk)
-
-
-class _Blob(object):
-
- def __init__(self, stream, size):
- self._stream = stream
- self.size_to_read = size
-
- def read(self, size=toolkit.BUFFER_SIZE):
- chunk = self._stream.read(min(size, self.size_to_read))
- self.size_to_read -= len(chunk)
- return chunk
-
-
-class _GzipStream(object):
-
- def __init__(self, stream):
- self._stream = stream
- self._zip = zlib.decompressobj(16 + zlib.MAX_WBITS)
- self._buffer = bytearray()
-
- def read(self, size):
- while True:
- if size <= len(self._buffer):
- result = self._buffer[:size]
- self._buffer = self._buffer[size:]
- return bytes(result)
- chunk = self._stream.read(size)
- if not chunk:
- result, self._buffer = self._buffer, bytearray()
- return result
- self._buffer += self._zip.decompress(chunk)
-
- def readline(self):
- return toolkit.readline(self)
diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py
index 8acfe27..792267a 100644
--- a/sugar_network/toolkit/__init__.py
+++ b/sugar_network/toolkit/__init__.py
@@ -782,6 +782,9 @@ class _NewFile(object):
def name(self, value):
self.dst_path = value
+ def tell(self):
+ return self._file.file.tell()
+
def close(self):
self._file.close()
if exists(self.name):
diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py
new file mode 100644
index 0000000..457ea07
--- /dev/null
+++ b/sugar_network/toolkit/parcel.py
@@ -0,0 +1,336 @@
+# Copyright (C) 2012-2014 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import sys
+import zlib
+import time
+import json
+import struct
+import logging
+from types import GeneratorType
+from os.path import dirname, exists, join
+
+from sugar_network import toolkit
+from sugar_network.toolkit.router import File
+from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce
+
+
+_FILENAME_SUFFIX = '.parcel'
+_RESERVED_DISK_SPACE = 1024 * 1024
+
+_ZLIB_WBITS = 15
+_ZLIB_WBITS_SIZE = 32768 # 2 ** 15
+
+_logger = logging.getLogger('parcel')
+
+
+def decode(stream, limit=None):
+ _logger.debug('Decode %r stream limit=%r', stream, limit)
+
+ stream = _UnzipStream(stream, limit)
+ header = stream.read_record()
+
+ packet = _DecodeIterator(stream)
+ while True:
+ packet.next()
+ if packet.name == 'last':
+ break
+ packet.props.update(header)
+ yield packet
+
+
+def encode(packets, limit=None, header=None, compresslevel=6):
+ _logger.debug('Encode %r packets limit=%r header=%r',
+ packets, limit, header)
+
+ ostream = _ZipStream(compresslevel)
+
+ if limit is None:
+ limit = sys.maxint
+ if header is None:
+ header = {}
+ chunk = ostream.write_record(header)
+ if chunk:
+ yield chunk
+
+ for packet, props, content in packets:
+ if props is None:
+ props = {}
+ props['packet'] = packet
+ chunk = ostream.write_record(props)
+ if chunk:
+ yield chunk
+
+ if content is None:
+ continue
+
+ content = iter(content)
+ try:
+ finalizing = False
+ record = next(content)
+ while True:
+ if record is None:
+ finalizing = True
+ record = next(content)
+ continue
+ blob_len = 0
+ if isinstance(record, File) and record.path:
+ blob_len = record.size
+ chunk = ostream.write_record(record,
+ None if finalizing else limit - blob_len)
+ if chunk is None:
+ _logger.debug('Reach the encoding limit')
+ if not isinstance(content, GeneratorType):
+ raise StopIteration()
+ finalizing = True
+ record = content.throw(StopIteration())
+ continue
+ if chunk:
+ yield chunk
+ if blob_len:
+ with file(record.path, 'rb') as blob:
+ while True:
+ chunk = blob.read(BUFFER_SIZE)
+ if not chunk:
+ break
+ blob_len -= len(chunk)
+ if not blob_len:
+ chunk += '\n'
+ chunk = ostream.write(chunk)
+ if chunk:
+ yield chunk
+ enforce(blob_len == 0, EOFError, 'Blob size mismatch')
+ record = next(content)
+ except StopIteration:
+ pass
+
+ chunk = ostream.write_record({'packet': 'last'})
+ if chunk:
+ yield chunk
+ chunk = ostream.flush()
+ if chunk:
+ yield chunk
+
+
+def decode_dir(root, recipient=None, session=None):
+ for root, __, files in os.walk(root):
+ for filename in files:
+ if not filename.endswith(_FILENAME_SUFFIX):
+ continue
+ with file(join(root, filename), 'rb') as parcel:
+ for packet in decode(parcel):
+ if recipient is not None and packet['from'] == recipient:
+ if session and packet['session'] == session:
+ _logger.debug('Skip the same session %r parcel',
+ parcel.name)
+ else:
+ _logger.debug('Remove outdated %r parcel',
+ parcel.name)
+ os.unlink(parcel.name)
+ break
+ yield packet
+
+
+def encode_dir(packets, root=None, limit=None, path=None, sender=None,
+ header=None):
+ if path is None:
+ if not exists(root):
+ os.makedirs(root)
+ path = toolkit.unique_filename(root, toolkit.uuid() + _FILENAME_SUFFIX)
+ if limit <= 0:
+ stat = os.statvfs(dirname(path))
+ limit = stat.f_bfree * stat.f_frsize - _RESERVED_DISK_SPACE
+ if header is None:
+ header = {}
+ if sender is not None:
+ header['from'] = sender
+
+ _logger.debug('Creating %r parcel limit=%s header=%r', path, limit, header)
+
+ with toolkit.NamedTemporaryFile(dir=dirname(path)) as parcel:
+ for chunk in encode(packets, limit, header):
+ parcel.write(chunk)
+ coroutine.dispatch()
+ parcel.flush()
+ os.fsync(parcel.fileno())
+ os.rename(parcel.name, path)
+
+
+class _DecodeIterator(object):
+
+ def __init__(self, stream):
+ self._stream = stream
+ self.props = {}
+ self._name = None
+ self._shift = True
+
+ @property
+ def name(self):
+ return self._name
+
+ def next(self):
+ if self._shift:
+ for __ in self:
+ pass
+ if self._name is None:
+ raise EOFError()
+ self._shift = True
+
+ def __repr__(self):
+ return '<Packet %r>' % self.props
+
+ def __getitem__(self, key):
+ return self.props.get(key)
+
+ def __iter__(self):
+ while True:
+ record = self._stream.read_record()
+ if record is None:
+ self._name = None
+ raise EOFError()
+ if 'packet' in record:
+ self._name = record['packet'] or ''
+ self.props = record
+ self._shift = False
+ break
+ blob_len = record.get('content-length')
+ if blob_len is None:
+ yield record
+ continue
+ blob_len = int(blob_len)
+ with toolkit.NamedTemporaryFile() as blob:
+ while blob_len:
+ chunk = self._stream.read(min(blob_len, BUFFER_SIZE))
+ enforce(chunk, 'Blob size mismatch')
+ blob.write(chunk)
+ blob_len -= len(chunk)
+ blob.flush()
+ yield File(blob.name, meta=record)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+
+class _ZipStream(object):
+
+ def __init__(self, compresslevel=6):
+ self._zipper = zlib.compressobj(compresslevel,
+ zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0)
+ self._offset = 0
+ self._size = 0
+ self._crc = zlib.crc32('') & 0xffffffffL
+
+ def write_record(self, record, limit=None):
+ chunk = json.dumps(record) + '\n'
+ if limit is not None and self._offset + len(chunk) > limit:
+ return None
+ return self.write(chunk)
+
+ def write(self, chunk):
+ self._size += len(chunk)
+ self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL
+ chunk = self._zipper.compress(chunk)
+
+ if self._offset == 0:
+ chunk = '\037\213' + '\010' + chr(0) + \
+ struct.pack('<L', long(time.time())) + \
+ '\002' + '\377' + \
+ chunk
+ self._offset = _ZLIB_WBITS_SIZE
+ if chunk:
+ self._offset += len(chunk)
+
+ return chunk
+
+ def flush(self):
+ chunk = self._zipper.flush() + \
+ struct.pack('<L', self._crc) + \
+ struct.pack('<L', self._size & 0xffffffffL)
+ self._offset += len(chunk)
+ return chunk
+
+
+class _UnzipStream(object):
+
+ def __init__(self, stream, limit):
+ self._stream = stream
+ self._limit = limit
+ self._unzipper = zlib.decompressobj(-_ZLIB_WBITS)
+ self._crc = zlib.crc32('') & 0xffffffffL
+ self._size = 0
+ self._buffer = ''
+
+ if self._limit is not None:
+ self._limit -= 10
+ magic = stream.read(2)
+ enforce(magic == '\037\213', http.BadRequest,
+ 'Not a gzipped file')
+ enforce(ord(stream.read(1)) == 8, http.BadRequest,
+ 'Unknown compression method')
+ enforce(ord(stream.read(1)) == 0, http.BadRequest,
+ 'Gzip flags should be empty')
+ stream.read(6) # Ignore the rest of header
+
+ def read_record(self):
+ while True:
+ parts = self._buffer.split('\n', 1)
+ if len(parts) == 1:
+ if self._read(BUFFER_SIZE):
+ continue
+ return None
+ result, self._buffer = parts
+ if not result:
+ continue
+ return json.loads(result)
+
+ def read(self, size):
+ while len(self._buffer) == 0 and self._read(size):
+ pass
+ size = min(size, len(self._buffer))
+ result = self._buffer[:size]
+ self._buffer = self._buffer[size:]
+ return result
+
+ def _read(self, size):
+ if self._limit is not None:
+ size = min(size, self._limit)
+ chunk = self._stream.read(size)
+
+ if chunk:
+ if self._limit is not None:
+ self._limit -= len(chunk)
+ self._add_to_buffer(self._unzipper.decompress(chunk))
+ return True
+
+ enforce(len(self._unzipper.unused_data) >= 8, http.BadRequest,
+ 'Malformed gzipped file')
+ crc = struct.unpack('<I', self._unzipper.unused_data[:4])[0]
+ enforce(crc == self._crc, http.BadRequest, 'CRC check failed')
+ size = struct.unpack('<I', self._unzipper.unused_data[4:8])[0]
+ enforce(size == self._size, http.BadRequest, 'Incorrect length')
+
+ return self._add_to_buffer(self._unzipper.flush())
+
+ def _add_to_buffer(self, chunk):
+ if not chunk:
+ return False
+ self._buffer += chunk
+ self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL
+ self._size += len(chunk)
+ return True
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index 00e5d8a..4206121 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -381,7 +381,7 @@ class Response(CaseInsensitiveDict):
@content_length.setter
def content_length(self, value):
- self.set('content-length', value)
+ self.set('content-length', str(value))
@property
def content_type(self):
@@ -430,7 +430,7 @@ class File(CaseInsensitiveDict):
self.path = path
self.digest = File.Digest(digest) if digest else None
if meta is not None:
- for key, value in meta:
+ for key, value in meta.items() if isinstance(meta, dict) else meta:
self[key] = value
self._stat = None