Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/node/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/node/sync.py')
-rw-r--r--sugar_network/node/sync.py307
1 files changed, 0 insertions, 307 deletions
diff --git a/sugar_network/node/sync.py b/sugar_network/node/sync.py
deleted file mode 100644
index f5b946c..0000000
--- a/sugar_network/node/sync.py
+++ /dev/null
@@ -1,307 +0,0 @@
-# Copyright (C) 2012-2013 Aleksey Lim
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-import os
-import gzip
-import zlib
-import json
-import logging
-from cStringIO import StringIO
-from types import GeneratorType
-from os.path import exists, join, dirname, basename, splitext
-
-from sugar_network import toolkit
-from sugar_network.toolkit import coroutine, enforce
-
-
-# Filename suffix to use for sneakernet synchronization files
-_SNEAKERNET_SUFFIX = '.sneakernet'
-
-# Leave at leat n bytes in fs whle calling `encode_to_file()`
-_SNEAKERNET_RESERVED_SIZE = 1024 * 1024
-
-_logger = logging.getLogger('node.sync')
-
-
-def decode(stream):
- packet = _PacketsIterator(stream)
- while True:
- packet.next()
- if packet.name == 'last':
- break
- yield packet
-
-
-def encode(packets, **header):
- return _encode(None, packets, False, header, _EncodingStatus())
-
-
-def limited_encode(limit, packets, **header):
- return _encode(limit, packets, False, header, _EncodingStatus())
-
-
-def package_decode(stream):
- stream = _GzipStream(stream)
- package_props = json.loads(stream.readline())
-
- for packet in decode(stream):
- packet.props.update(package_props)
- yield packet
-
-
-def package_encode(packets, **header):
- # XXX Only for small amount of data
- # TODO Support real streaming
- buf = StringIO()
- zipfile = gzip.GzipFile(mode='wb', fileobj=buf)
-
- header['filename'] = toolkit.uuid() + _SNEAKERNET_SUFFIX
- json.dump(header, zipfile)
- zipfile.write('\n')
-
- for chunk in _encode(None, packets, False, None, _EncodingStatus()):
- zipfile.write(chunk)
- zipfile.close()
-
- yield buf.getvalue()
-
-
-def sneakernet_decode(root, node=None, session=None):
- for root, __, files in os.walk(root):
- for filename in files:
- if not filename.endswith(_SNEAKERNET_SUFFIX):
- continue
- zipfile = gzip.open(join(root, filename), 'rb')
- try:
- package_props = json.loads(zipfile.readline())
-
- if node is not None and package_props.get('src') == node:
- if package_props.get('session') == session:
- _logger.debug('Skip session %r sneakernet package',
- zipfile.name)
- else:
- _logger.debug('Remove outdate %r sneakernet package',
- zipfile.name)
- os.unlink(zipfile.name)
- continue
-
- for packet in decode(zipfile):
- packet.props.update(package_props)
- yield packet
- finally:
- zipfile.close()
-
-
-def sneakernet_encode(packets, root=None, limit=None, path=None, **header):
- if path is None:
- if not exists(root):
- os.makedirs(root)
- filename = toolkit.uuid() + _SNEAKERNET_SUFFIX
- path = toolkit.unique_filename(root, filename)
- else:
- filename = splitext(basename(path))[0] + _SNEAKERNET_SUFFIX
- if 'filename' not in header:
- header['filename'] = filename
-
- if limit <= 0:
- stat = os.statvfs(dirname(path))
- limit = stat.f_bfree * stat.f_frsize - _SNEAKERNET_RESERVED_SIZE
-
- _logger.debug('Creating %r sneakernet package, limit=%s header=%r',
- path, limit, header)
-
- status = _EncodingStatus()
- with file(path, 'wb') as package:
- zipfile = gzip.GzipFile(fileobj=package)
- try:
- json.dump(header, zipfile)
- zipfile.write('\n')
-
- pos = None
- encoder = _encode(limit, packets, True, None, status)
- while True:
- try:
- chunk = encoder.send(pos)
- zipfile.write(chunk)
- pos = zipfile.fileobj.tell()
- coroutine.dispatch()
- except StopIteration:
- break
-
- except Exception:
- _logger.debug('Emergency removing %r package', path)
- package.close()
- os.unlink(path)
- raise
- else:
- zipfile.close()
- package.flush()
- os.fsync(package.fileno())
-
- return not status.aborted
-
-
-class _EncodingStatus(object):
-
- aborted = False
-
-
-def _encode(limit, packets, download_blobs, header, status):
- for packet, props, content in packets:
- if status.aborted:
- break
-
- if props is None:
- props = {}
- if header:
- props.update(header)
- props['packet'] = packet
- pos = (yield json.dumps(props) + '\n') or 0
-
- if content is None:
- continue
-
- content = iter(content)
- try:
- record = next(content)
-
- while True:
- blob = None
- blob_size = 0
- if 'blob' in record:
- blob = record.pop('blob')
- blob_size = record['blob_size']
-
- dump = json.dumps(record) + '\n'
- if not status.aborted and limit is not None and \
- pos + len(dump) + blob_size > limit:
- status.aborted = True
- if not isinstance(content, GeneratorType):
- raise StopIteration()
- record = content.throw(StopIteration())
- continue
- pos = (yield dump) or 0
-
- if blob is not None:
- for chunk in blob:
- pos = (yield chunk) or 0
- blob_size -= len(chunk)
- enforce(blob_size == 0, EOFError,
- 'File size is not the same as declared')
-
- record = next(content)
- except StopIteration:
- pass
-
- yield json.dumps({'packet': 'last'}) + '\n'
-
-
-class _PacketsIterator(object):
-
- def __init__(self, stream):
- if not hasattr(stream, 'readline'):
- stream.readline = lambda: toolkit.readline(stream)
- if hasattr(stream, 'seek'):
- self._seek = stream.seek
- self._stream = stream
- self.props = {}
- self._name = None
- self._shift = True
-
- @property
- def name(self):
- return self._name
-
- def next(self):
- if self._shift:
- for __ in self:
- pass
- if self._name is None:
- raise EOFError()
- self._shift = True
-
- def __repr__(self):
- return '<SyncPacket %r>' % self.props
-
- def __getitem__(self, key):
- return self.props.get(key)
-
- def __iter__(self):
- blob = None
- while True:
- if blob is not None and blob.size_to_read:
- self._seek(blob.size_to_read, 1)
- blob = None
- record = self._stream.readline()
- if not record:
- self._name = None
- raise EOFError()
- record = json.loads(record)
- if 'packet' in record:
- self._name = record['packet'] or ''
- self.props = record
- self._shift = False
- break
- blob_size = record.get('blob_size')
- if blob_size:
- blob = record['blob'] = _Blob(self._stream, blob_size)
- yield record
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- pass
-
- # pylint: disable-msg=E0202
- def _seek(self, distance, where):
- while distance:
- chunk = self._stream.read(min(distance, toolkit.BUFFER_SIZE))
- distance -= len(chunk)
-
-
-class _Blob(object):
-
- def __init__(self, stream, size):
- self._stream = stream
- self.size_to_read = size
-
- def read(self, size=toolkit.BUFFER_SIZE):
- chunk = self._stream.read(min(size, self.size_to_read))
- self.size_to_read -= len(chunk)
- return chunk
-
-
-class _GzipStream(object):
-
- def __init__(self, stream):
- self._stream = stream
- self._zip = zlib.decompressobj(16 + zlib.MAX_WBITS)
- self._buffer = bytearray()
-
- def read(self, size):
- while True:
- if size <= len(self._buffer):
- result = self._buffer[:size]
- self._buffer = self._buffer[size:]
- return bytes(result)
- chunk = self._stream.read(size)
- if not chunk:
- result, self._buffer = self._buffer, bytearray()
- return result
- self._buffer += self._zip.decompress(chunk)
-
- def readline(self):
- return toolkit.readline(self)