From d14a63c2d462e98f18eaee7cb8f52a3baa30851d Mon Sep 17 00:00:00 2001 From: Aleksey Lim Date: Mon, 01 Oct 2012 04:25:33 +0000 Subject: Reflect on ad.diff() changes in treating blobs --- diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py index bf4fd70..52b8857 100644 --- a/sugar_network/resources/volume.py +++ b/sugar_network/resources/volume.py @@ -15,6 +15,7 @@ import json import logging +import tempfile from os.path import join import active_document as ad @@ -22,6 +23,8 @@ from active_document import directory as ad_directory from sugar_network import local from sugar_network.toolkit.sneakernet import DiskFull from sugar_network.toolkit.collection import Sequence +from sugar_network.toolkit import http +from active_toolkit.sockets import BUFFER_SIZE from active_toolkit import coroutine @@ -75,6 +78,7 @@ class Volume(ad.SingleVolume): if document_classes is None: document_classes = Volume.RESOURCES ad.SingleVolume.__init__(self, root, document_classes, lazy_open) + self._downloader = None def notify(self, event): if event['event'] == 'update' and 'props' in event and \ @@ -87,9 +91,11 @@ class Volume(ad.SingleVolume): def merge(self, record, increment_seqno=True): coroutine.dispatch() if record.get('content_type') == 'blob': - record['diff'] = record['blob'] - return self[record['document']].merge(increment_seqno=increment_seqno, - **record) + diff = record['blob'] + else: + diff = record['diff'] + return self[record['document']].merge(record['guid'], diff, + increment_seqno=increment_seqno) def diff(self, in_seq, out_packet): # Since `in_seq` will be changed in `patch()`, original sequence @@ -103,27 +109,30 @@ class Volume(ad.SingleVolume): directory.commit() def patch(): - for meta, data in directory.diff(orig_seq, limit=_DIFF_CHUNK): + for guid, seqno, diff in \ + directory.diff(orig_seq, limit=_DIFF_CHUNK): coroutine.dispatch() - seqno = None - if 'seqno' in meta: - seqno = meta.pop('seqno') - - if hasattr(data, 'fileno'): - arcname = join(document, 'blobs', meta['guid'], - meta['prop']) - out_packet.push(data, arcname=arcname, - cmd='sn_push', document=document, **meta) - else: - meta['diff'] = data - yield meta - - # Process `seqno` only after processing yield'ed data - if seqno: - # Update `in_seq`, it might be reused by caller - in_seq.exclude(seqno, seqno) - push_seq.include(seqno, seqno) + for prop, value in diff.items(): + if 'path' in value: + data = file(value.pop('path'), 'rb') + elif 'url' in value: + data = self._download_blob(value.pop('url')) + else: + continue + del diff[prop] + arcname = join(document, 'blobs', guid, prop) + out_packet.push(data, arcname=arcname, cmd='sn_push', + document=document, guid=guid, **value) + + if not diff: + continue + + yield {'guid': guid, 'diff': diff} + + # Update `in_seq`, it might be reused by caller + in_seq.exclude(seqno, seqno) + push_seq.include(seqno, seqno) try: out_packet.push(patch(), arcname=join(document, 'diff'), @@ -141,6 +150,29 @@ class Volume(ad.SingleVolume): orig_seq.floor(push_seq.last) out_packet.push(force=True, cmd='sn_commit', sequence=orig_seq) + def _download_blob(self, url): + _logger.debug('Download %r blob', url) + + if self._downloader is None: + self._downloader = http.Client() + + response = self._downloader.request('GET', url, allow_redirects=True) + content_length = response.headers.get('Content-Length') + content_length = int(content_length) if content_length else 0 + + ostream = tempfile.NamedTemporaryFile() + try: + chunk_size = min(content_length, BUFFER_SIZE) + # pylint: disable-msg=E1103 + for chunk in response.iter_content(chunk_size=chunk_size): + ostream.write(chunk) + except Exception: + ostream.close() + raise + + ostream.seek(0) + return ostream + class Commands(object): diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 69a7acb..f801942 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -53,7 +53,7 @@ _logger = logging.getLogger('http') class Client(object): - def __init__(self, api_url, sugar_auth=False, **kwargs): + def __init__(self, api_url='', sugar_auth=False, **kwargs): self.api_url = api_url self.params = kwargs self._sugar_auth = sugar_auth @@ -82,6 +82,10 @@ class Client(object): def close(self): self._session.close() + def exists(self, path): + response = self.request('GET', path, allowed=[404], params=self.params) + return response.status_code != 404 + def get(self, path_=None, **kwargs): kwargs.update(self.params) response = self.request('GET', path_, params=kwargs) diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py index ab24292..d2e2448 100755 --- a/tests/units/sync_master.py +++ b/tests/units/sync_master.py @@ -529,7 +529,7 @@ class SyncMasterTest(tests.Test): def diff(*args, **kwargs): for i in range(1024): - yield {'guid': str(i), 'seqno': i}, {} + yield str(i), i, {} coroutine.sleep(.1) self.override(Directory, 'diff', diff) -- cgit v0.9.1