Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/resources/volume.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/resources/volume.py')
-rw-r--r--sugar_network/resources/volume.py76
1 files changed, 54 insertions, 22 deletions
diff --git a/sugar_network/resources/volume.py b/sugar_network/resources/volume.py
index bf4fd70..52b8857 100644
--- a/sugar_network/resources/volume.py
+++ b/sugar_network/resources/volume.py
@@ -15,6 +15,7 @@
import json
import logging
+import tempfile
from os.path import join
import active_document as ad
@@ -22,6 +23,8 @@ from active_document import directory as ad_directory
from sugar_network import local
from sugar_network.toolkit.sneakernet import DiskFull
from sugar_network.toolkit.collection import Sequence
+from sugar_network.toolkit import http
+from active_toolkit.sockets import BUFFER_SIZE
from active_toolkit import coroutine
@@ -75,6 +78,7 @@ class Volume(ad.SingleVolume):
if document_classes is None:
document_classes = Volume.RESOURCES
ad.SingleVolume.__init__(self, root, document_classes, lazy_open)
+ self._downloader = None
def notify(self, event):
if event['event'] == 'update' and 'props' in event and \
@@ -87,9 +91,11 @@ class Volume(ad.SingleVolume):
def merge(self, record, increment_seqno=True):
coroutine.dispatch()
if record.get('content_type') == 'blob':
- record['diff'] = record['blob']
- return self[record['document']].merge(increment_seqno=increment_seqno,
- **record)
+ diff = record['blob']
+ else:
+ diff = record['diff']
+ return self[record['document']].merge(record['guid'], diff,
+ increment_seqno=increment_seqno)
def diff(self, in_seq, out_packet):
# Since `in_seq` will be changed in `patch()`, original sequence
@@ -103,27 +109,30 @@ class Volume(ad.SingleVolume):
directory.commit()
def patch():
- for meta, data in directory.diff(orig_seq, limit=_DIFF_CHUNK):
+ for guid, seqno, diff in \
+ directory.diff(orig_seq, limit=_DIFF_CHUNK):
coroutine.dispatch()
- seqno = None
- if 'seqno' in meta:
- seqno = meta.pop('seqno')
-
- if hasattr(data, 'fileno'):
- arcname = join(document, 'blobs', meta['guid'],
- meta['prop'])
- out_packet.push(data, arcname=arcname,
- cmd='sn_push', document=document, **meta)
- else:
- meta['diff'] = data
- yield meta
-
- # Process `seqno` only after processing yield'ed data
- if seqno:
- # Update `in_seq`, it might be reused by caller
- in_seq.exclude(seqno, seqno)
- push_seq.include(seqno, seqno)
+ for prop, value in diff.items():
+ if 'path' in value:
+ data = file(value.pop('path'), 'rb')
+ elif 'url' in value:
+ data = self._download_blob(value.pop('url'))
+ else:
+ continue
+ del diff[prop]
+ arcname = join(document, 'blobs', guid, prop)
+ out_packet.push(data, arcname=arcname, cmd='sn_push',
+ document=document, guid=guid, **value)
+
+ if not diff:
+ continue
+
+ yield {'guid': guid, 'diff': diff}
+
+ # Update `in_seq`, it might be reused by caller
+ in_seq.exclude(seqno, seqno)
+ push_seq.include(seqno, seqno)
try:
out_packet.push(patch(), arcname=join(document, 'diff'),
@@ -141,6 +150,29 @@ class Volume(ad.SingleVolume):
orig_seq.floor(push_seq.last)
out_packet.push(force=True, cmd='sn_commit', sequence=orig_seq)
+ def _download_blob(self, url):
+ _logger.debug('Download %r blob', url)
+
+ if self._downloader is None:
+ self._downloader = http.Client()
+
+ response = self._downloader.request('GET', url, allow_redirects=True)
+ content_length = response.headers.get('Content-Length')
+ content_length = int(content_length) if content_length else 0
+
+ ostream = tempfile.NamedTemporaryFile()
+ try:
+ chunk_size = min(content_length, BUFFER_SIZE)
+ # pylint: disable-msg=E1103
+ for chunk in response.iter_content(chunk_size=chunk_size):
+ ostream.write(chunk)
+ except Exception:
+ ostream.close()
+ raise
+
+ ostream.seek(0)
+ return ostream
+
class Commands(object):