diff options
Diffstat (limited to 'sugar_network/resources/volume.py')
-rw-r--r-- | sugar_network/resources/volume.py | 76 |
1 files changed, 54 insertions, 22 deletions
diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py index bf4fd70..52b8857 100644 --- a/sugar_network/resources/volume.py +++ b/sugar_network/resources/volume.py @@ -15,6 +15,7 @@ import json import logging +import tempfile from os.path import join import active_document as ad @@ -22,6 +23,8 @@ from active_document import directory as ad_directory from sugar_network import local from sugar_network.toolkit.sneakernet import DiskFull from sugar_network.toolkit.collection import Sequence +from sugar_network.toolkit import http +from active_toolkit.sockets import BUFFER_SIZE from active_toolkit import coroutine @@ -75,6 +78,7 @@ class Volume(ad.SingleVolume): if document_classes is None: document_classes = Volume.RESOURCES ad.SingleVolume.__init__(self, root, document_classes, lazy_open) + self._downloader = None def notify(self, event): if event['event'] == 'update' and 'props' in event and \ @@ -87,9 +91,11 @@ class Volume(ad.SingleVolume): def merge(self, record, increment_seqno=True): coroutine.dispatch() if record.get('content_type') == 'blob': - record['diff'] = record['blob'] - return self[record['document']].merge(increment_seqno=increment_seqno, - **record) + diff = record['blob'] + else: + diff = record['diff'] + return self[record['document']].merge(record['guid'], diff, + increment_seqno=increment_seqno) def diff(self, in_seq, out_packet): # Since `in_seq` will be changed in `patch()`, original sequence @@ -103,27 +109,30 @@ class Volume(ad.SingleVolume): directory.commit() def patch(): - for meta, data in directory.diff(orig_seq, limit=_DIFF_CHUNK): + for guid, seqno, diff in \ + directory.diff(orig_seq, limit=_DIFF_CHUNK): coroutine.dispatch() - seqno = None - if 'seqno' in meta: - seqno = meta.pop('seqno') - - if hasattr(data, 'fileno'): - arcname = join(document, 'blobs', meta['guid'], - meta['prop']) - out_packet.push(data, arcname=arcname, - cmd='sn_push', document=document, **meta) - else: - meta['diff'] = data - yield meta - - # Process `seqno` only after processing yield'ed data - if seqno: - # Update `in_seq`, it might be reused by caller - in_seq.exclude(seqno, seqno) - push_seq.include(seqno, seqno) + for prop, value in diff.items(): + if 'path' in value: + data = file(value.pop('path'), 'rb') + elif 'url' in value: + data = self._download_blob(value.pop('url')) + else: + continue + del diff[prop] + arcname = join(document, 'blobs', guid, prop) + out_packet.push(data, arcname=arcname, cmd='sn_push', + document=document, guid=guid, **value) + + if not diff: + continue + + yield {'guid': guid, 'diff': diff} + + # Update `in_seq`, it might be reused by caller + in_seq.exclude(seqno, seqno) + push_seq.include(seqno, seqno) try: out_packet.push(patch(), arcname=join(document, 'diff'), @@ -141,6 +150,29 @@ class Volume(ad.SingleVolume): orig_seq.floor(push_seq.last) out_packet.push(force=True, cmd='sn_commit', sequence=orig_seq) + def _download_blob(self, url): + _logger.debug('Download %r blob', url) + + if self._downloader is None: + self._downloader = http.Client() + + response = self._downloader.request('GET', url, allow_redirects=True) + content_length = response.headers.get('Content-Length') + content_length = int(content_length) if content_length else 0 + + ostream = tempfile.NamedTemporaryFile() + try: + chunk_size = min(content_length, BUFFER_SIZE) + # pylint: disable-msg=E1103 + for chunk in response.iter_content(chunk_size=chunk_size): + ostream.write(chunk) + except Exception: + ostream.close() + raise + + ostream.seek(0) + return ostream + class Commands(object): |