Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2012-10-01 04:25:33 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2012-10-01 04:26:54 (GMT)
commitd14a63c2d462e98f18eaee7cb8f52a3baa30851d (patch)
treee71fcae475fb238ead3b71e1cd98faaf4ec5889d
parented116a71e3b3ad78a2f36a9c81075e70eeb24ddd (diff)
Reflect on ad.diff() changes in treating blobs
-rw-r--r--sugar_network/resources/volume.py76
-rw-r--r--sugar_network/toolkit/http.py6
-rwxr-xr-xtests/units/sync_master.py2
3 files changed, 60 insertions, 24 deletions
diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py
index bf4fd70..52b8857 100644
--- a/sugar_network/resources/volume.py
+++ b/sugar_network/resources/volume.py
@@ -15,6 +15,7 @@
import json
import logging
+import tempfile
from os.path import join
import active_document as ad
@@ -22,6 +23,8 @@ from active_document import directory as ad_directory
from sugar_network import local
from sugar_network.toolkit.sneakernet import DiskFull
from sugar_network.toolkit.collection import Sequence
+from sugar_network.toolkit import http
+from active_toolkit.sockets import BUFFER_SIZE
from active_toolkit import coroutine
@@ -75,6 +78,7 @@ class Volume(ad.SingleVolume):
if document_classes is None:
document_classes = Volume.RESOURCES
ad.SingleVolume.__init__(self, root, document_classes, lazy_open)
+ self._downloader = None
def notify(self, event):
if event['event'] == 'update' and 'props' in event and \
@@ -87,9 +91,11 @@ class Volume(ad.SingleVolume):
def merge(self, record, increment_seqno=True):
coroutine.dispatch()
if record.get('content_type') == 'blob':
- record['diff'] = record['blob']
- return self[record['document']].merge(increment_seqno=increment_seqno,
- **record)
+ diff = record['blob']
+ else:
+ diff = record['diff']
+ return self[record['document']].merge(record['guid'], diff,
+ increment_seqno=increment_seqno)
def diff(self, in_seq, out_packet):
# Since `in_seq` will be changed in `patch()`, original sequence
@@ -103,27 +109,30 @@ class Volume(ad.SingleVolume):
directory.commit()
def patch():
- for meta, data in directory.diff(orig_seq, limit=_DIFF_CHUNK):
+ for guid, seqno, diff in \
+ directory.diff(orig_seq, limit=_DIFF_CHUNK):
coroutine.dispatch()
- seqno = None
- if 'seqno' in meta:
- seqno = meta.pop('seqno')
-
- if hasattr(data, 'fileno'):
- arcname = join(document, 'blobs', meta['guid'],
- meta['prop'])
- out_packet.push(data, arcname=arcname,
- cmd='sn_push', document=document, **meta)
- else:
- meta['diff'] = data
- yield meta
-
- # Process `seqno` only after processing yield'ed data
- if seqno:
- # Update `in_seq`, it might be reused by caller
- in_seq.exclude(seqno, seqno)
- push_seq.include(seqno, seqno)
+ for prop, value in diff.items():
+ if 'path' in value:
+ data = file(value.pop('path'), 'rb')
+ elif 'url' in value:
+ data = self._download_blob(value.pop('url'))
+ else:
+ continue
+ del diff[prop]
+ arcname = join(document, 'blobs', guid, prop)
+ out_packet.push(data, arcname=arcname, cmd='sn_push',
+ document=document, guid=guid, **value)
+
+ if not diff:
+ continue
+
+ yield {'guid': guid, 'diff': diff}
+
+ # Update `in_seq`, it might be reused by caller
+ in_seq.exclude(seqno, seqno)
+ push_seq.include(seqno, seqno)
try:
out_packet.push(patch(), arcname=join(document, 'diff'),
@@ -141,6 +150,29 @@ class Volume(ad.SingleVolume):
orig_seq.floor(push_seq.last)
out_packet.push(force=True, cmd='sn_commit', sequence=orig_seq)
+ def _download_blob(self, url):
+ _logger.debug('Download %r blob', url)
+
+ if self._downloader is None:
+ self._downloader = http.Client()
+
+ response = self._downloader.request('GET', url, allow_redirects=True)
+ content_length = response.headers.get('Content-Length')
+ content_length = int(content_length) if content_length else 0
+
+ ostream = tempfile.NamedTemporaryFile()
+ try:
+ chunk_size = min(content_length, BUFFER_SIZE)
+ # pylint: disable-msg=E1103
+ for chunk in response.iter_content(chunk_size=chunk_size):
+ ostream.write(chunk)
+ except Exception:
+ ostream.close()
+ raise
+
+ ostream.seek(0)
+ return ostream
+
class Commands(object):
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index 69a7acb..f801942 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -53,7 +53,7 @@ _logger = logging.getLogger('http')
class Client(object):
- def __init__(self, api_url, sugar_auth=False, **kwargs):
+ def __init__(self, api_url='', sugar_auth=False, **kwargs):
self.api_url = api_url
self.params = kwargs
self._sugar_auth = sugar_auth
@@ -82,6 +82,10 @@ class Client(object):
def close(self):
self._session.close()
+ def exists(self, path):
+ response = self.request('GET', path, allowed=[404], params=self.params)
+ return response.status_code != 404
+
def get(self, path_=None, **kwargs):
kwargs.update(self.params)
response = self.request('GET', path_, params=kwargs)
diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py
index ab24292..d2e2448 100755
--- a/tests/units/sync_master.py
+++ b/tests/units/sync_master.py
@@ -529,7 +529,7 @@ class SyncMasterTest(tests.Test):
def diff(*args, **kwargs):
for i in range(1024):
- yield {'guid': str(i), 'seqno': i}, {}
+ yield str(i), i, {}
coroutine.sleep(.1)
self.override(Directory, 'diff', diff)