Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2014-04-29 05:03:36 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2014-04-29 05:03:36 (GMT)
commit8ed5799f7d891b8d09ffdc298368f93382ddea5b (patch)
tree59bbe820568cded8efcb300649fed680030c368f
parent37df873b8854ed99bba37139b135f92e204b8bef (diff)
Fix file-less blobs streaming
-rw-r--r--sugar_network/db/blobs.py13
-rw-r--r--sugar_network/toolkit/packets.py24
-rw-r--r--sugar_network/toolkit/router.py17
-rwxr-xr-xtests/units/db/blobs.py12
-rwxr-xr-xtests/units/toolkit/packets.py49
-rwxr-xr-xtests/units/toolkit/router.py28
6 files changed, 72 insertions, 71 deletions
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py
index cc0944f..6426341 100644
--- a/sugar_network/db/blobs.py
+++ b/sugar_network/db/blobs.py
@@ -145,11 +145,13 @@ class Blobs(object):
_logger.debug('Post %r file', path)
+ if not exists(path):
+ path = None
return File(path, digest, meta)
def update(self, path, meta):
path = self.path(path)
- enforce(exists(path), http.NotFound, 'No such blob')
+ enforce(exists(path + _META_SUFFIX), http.NotFound, 'No such blob')
orig_meta = _read_meta(path)
orig_meta.update(meta)
_write_meta(path, orig_meta, None)
@@ -157,7 +159,10 @@ class Blobs(object):
def get(self, digest):
path = self.path(digest)
if exists(path + _META_SUFFIX):
- return File(path, digest, _read_meta(path))
+ meta = _read_meta(path)
+ if not exists(path):
+ path = None
+ return File(path, digest, meta)
elif isdir(path):
return _lsdir(path, digest)
elif exists(path):
@@ -226,6 +231,8 @@ class Blobs(object):
digest = join(rel_root, filename)
meta.append(('path', digest))
if yield_files:
+ if not exists(path):
+ path = None
yield File(path, digest, meta)
else:
yield
@@ -275,7 +282,7 @@ def _write_meta(path, meta, seqno):
for key, value in meta.items() if isinstance(meta, dict) else meta:
if seqno is None and key == 'x-seqno':
seqno = int(value)
- f.write('%s: %s\n' % (key, value))
+ f.write('%s: %s\n' % (key, toolkit.ascii(value)))
os.utime(path, (seqno, seqno))
diff --git a/sugar_network/toolkit/packets.py b/sugar_network/toolkit/packets.py
index 46bc223..0960a1f 100644
--- a/sugar_network/toolkit/packets.py
+++ b/sugar_network/toolkit/packets.py
@@ -28,7 +28,6 @@ from os.path import dirname, exists, join
from sugar_network import toolkit
from sugar_network.toolkit.router import File
-from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce
@@ -60,17 +59,14 @@ def decode(stream, limit=None):
def encode(items, limit=None, header=None, compresslevel=None,
- on_complete=None, **kwargs):
+ on_complete=None, download_blobs=False, **kwargs):
_logger.debug('Encode %r limit=%r header=%r', items, limit, header)
if compresslevel is 0:
ostream = _Encoder()
else:
ostream = _ZippedEncoder(compresslevel)
-
- # In case of downloading blobs
- # (?) reuse current `this.http`
- this.http = http.Connection()
+ connection = http.Connection()
if limit is None:
limit = sys.maxint
@@ -116,6 +112,8 @@ def encode(items, limit=None, header=None, compresslevel=None,
if isinstance(record, File):
blob_len = record.size
chunk = record.meta
+ if not record.path:
+ chunk['digest'] = record.digest
else:
chunk = record
chunk = ostream.write_record(chunk,
@@ -130,8 +128,14 @@ def encode(items, limit=None, header=None, compresslevel=None,
continue
if chunk:
yield chunk
- if blob_len:
- for chunk in record.iter_content():
+ if blob_len and (record.path or download_blobs):
+ if record.path:
+ blob_content = record.iter_content()
+ else:
+ url = record.meta['location']
+ enforce(url, http.NotFound, 'No location')
+ blob_content = connection.download(url)
+ for chunk in blob_content:
blob_len -= len(chunk)
if not blob_len:
chunk += '\n'
@@ -245,6 +249,10 @@ class _DecodeIterator(object):
yield record
return
+ if 'location' in record:
+ yield File(None, digest=record.pop('digest'), meta=record)
+ return
+
blob_len = int(blob_len)
with toolkit.NamedTemporaryFile() as blob:
digest = hashlib.sha1()
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index bd5da32..ec89fec 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -413,21 +413,8 @@ class File(str):
return basename(self.path)
def iter_content(self):
- if self.path:
- return self._iter_content()
- url = self.meta.get('location')
- enforce(url, http.NotFound, 'No location')
- blob = this.http.request('GET', url, allow_redirects=True,
- # Request for uncompressed data
- headers={'accept-encoding': ''})
- self.meta.clear()
- for tag in ('content-length', 'content-type', 'content-disposition'):
- value = blob.headers.get(tag)
- if value:
- self.meta[tag] = value
- return blob.iter_content(toolkit.BUFFER_SIZE)
-
- def _iter_content(self):
+ if not self.path:
+ return
with file(self.path, 'rb') as f:
while True:
chunk = f.read(toolkit.BUFFER_SIZE)
diff --git a/tests/units/db/blobs.py b/tests/units/db/blobs.py
index d5d2b1e..76c544b 100755
--- a/tests/units/db/blobs.py
+++ b/tests/units/db/blobs.py
@@ -105,9 +105,7 @@ class BlobsTest(tests.Test):
self.assertEqual(
'0000000000000000000000000000000000000000',
blob.digest)
- self.assertEqual(
- abspath('blobs/%s/%s' % (blob.digest[:3], blob.digest)),
- blob.path)
+ assert blob.path is None
self.assertEqual({
'status': '301 Moved Permanently',
'location': 'location',
@@ -116,8 +114,6 @@ class BlobsTest(tests.Test):
'x-seqno': '1',
},
blob.meta)
-
- assert not exists(blob.path)
self.assertEqual(
sorted([
'status: 301 Moved Permanently',
@@ -126,7 +122,7 @@ class BlobsTest(tests.Test):
'content-length: 101',
'x-seqno: 1',
]),
- sorted(file(blob.path + '.meta').read().strip().split('\n')))
+ sorted(file('blobs/%s/%s.meta' % (blob.digest[:3], blob.digest)).read().strip().split('\n')))
the_same_blob = blobs.get(blob.digest)
assert the_same_blob is not blob
@@ -388,7 +384,7 @@ class BlobsTest(tests.Test):
blobs.patch(File('./fake', '0000000000000000000000000000000000000001', {'n': 3, 'content-length': '0'}), -3)
blob = blobs.get('0000000000000000000000000000000000000001')
- assert not exists(blob.path)
+ assert blob.path is None
self.assertEqual({'x-seqno': '-3', 'n': '1', 'status': '410 Gone'}, blob.meta)
def test_patch_File(self):
@@ -406,7 +402,7 @@ class BlobsTest(tests.Test):
blobs.patch(File('./fake', 'foo/bar', {'n': 3, 'content-length': '0', 'path': 'foo/bar'}), -3)
blob = blobs.get('foo/bar')
- assert not exists(blob.path)
+ assert blob.path is None
self.assertEqual({'x-seqno': '-3', 'n': '1', 'status': '410 Gone'}, blob.meta)
def test_walk_Blobs(self):
diff --git a/tests/units/toolkit/packets.py b/tests/units/toolkit/packets.py
index 6a181b6..3836b6e 100755
--- a/tests/units/toolkit/packets.py
+++ b/tests/units/toolkit/packets.py
@@ -340,6 +340,28 @@ class PacketsTest(tests.Test):
self.assertRaises(StopIteration, next, packets_iter)
self.assertEqual(len(stream.getvalue()), stream.tell())
+ def test_decode_BlobUrls(self):
+ stream = zips(
+ json.dumps({}) + '\n' +
+ json.dumps({'segment': 1}) + '\n' +
+ json.dumps({'num': 1, 'content-length': 1}) + '\n' +
+ 'a' +
+ json.dumps({'num': 2, 'content-length': 100, 'location': 'http://foo/bar', 'digest': 'digest'}) + '\n' +
+ json.dumps({'num': 3, 'content-length': 3}) + '\n' +
+ 'ccc'
+ )
+ packets_iter = iter(packets.decode(stream))
+ with next(packets_iter) as packet:
+ self.assertEqual(1, packet.name)
+ self.assertEqual([
+ (1, 'a'),
+ (2, None),
+ (3, 'ccc'),
+ ],
+ [(i.meta['num'], file(i.path).read() if i.path else None) for i in packet])
+ self.assertRaises(StopIteration, packets_iter.next)
+ self.assertEqual(len(stream.getvalue()), stream.tell())
+
def test_encode_Zipped(self):
stream = ''.join([i for i in packets.encode([])])
self.assertEqual(
@@ -593,7 +615,7 @@ class PacketsTest(tests.Test):
'ccc' + '\n',
unzips(stream))
- def test_encode_BlobWithUrls(self):
+ def test_encode_BlobUrls(self):
class Routes(object):
@@ -607,37 +629,46 @@ class PacketsTest(tests.Test):
url = 'http://127.0.0.1:%s' % client.ipc_port.value
stream = ''.join([i for i in packets.encode([
- (1, None, [File(None, meta={'location': 'fake'})]),
+ (1, None, [File(None, digest='digest', meta={'location': 'fake'})]),
])])
self.assertEqual(
json.dumps({}) + '\n' +
json.dumps({'segment': 1}) + '\n' +
- json.dumps({'location': 'fake'}) + '\n',
+ json.dumps({'digest': 'digest', 'location': 'fake'}) + '\n',
unzips(stream))
stream = ''.join([i for i in packets.encode([
- (1, None, [File(None, meta={'location': 'fake', 'content-length': '0'})]),
+ (1, None, [File(None, digest='digest', meta={'location': 'fake', 'content-length': '0'})]),
])])
self.assertEqual(
json.dumps({}) + '\n' +
json.dumps({'segment': 1}) + '\n' +
- json.dumps({'location': 'fake', 'content-length': '0'}) + '\n',
+ json.dumps({'location': 'fake', 'content-length': '0', 'digest': 'digest'}) + '\n',
unzips(stream))
stream = ''.join([i for i in packets.encode([
- (1, None, [File(None, meta={'location': url, 'content-length': str(len('probe'))})]),
+ (1, None, [File(None, digest='digest', meta={'location': url, 'content-length': str(len('probe'))})]),
])])
self.assertEqual(
json.dumps({}) + '\n' +
json.dumps({'segment': 1}) + '\n' +
- json.dumps({'location': url, 'content-length': str(len('probe'))}) + '\n' +
+ json.dumps({'location': url, 'content-length': str(len('probe')), 'digest': 'digest'}) + '\n',
+ unzips(stream))
+
+ stream = ''.join([i for i in packets.encode([
+ (1, None, [File(None, digest='digest', meta={'location': url, 'content-length': str(len('probe'))})]),
+ ], download_blobs=True)])
+ self.assertEqual(
+ json.dumps({}) + '\n' +
+ json.dumps({'segment': 1}) + '\n' +
+ json.dumps({'location': url, 'content-length': str(len('probe')), 'digest': 'digest'}) + '\n' +
'probe' + '\n',
unzips(stream))
def encode():
stream = ''.join([i for i in packets.encode([
- (1, None, [File(None, meta={'location': 'http://127.0.0.1:108', 'content-length': str(len('probe'))})]),
- ])])
+ (1, None, [File(None, digest='digest', meta={'location': 'http://127.0.0.1:108', 'content-length': str(len('probe'))})]),
+ ], download_blobs=True)])
self.assertRaises(http.ConnectionError, encode)
def test_limited_encode_Blobs(self):
diff --git a/tests/units/toolkit/router.py b/tests/units/toolkit/router.py
index 7222da1..2231107 100755
--- a/tests/units/toolkit/router.py
+++ b/tests/units/toolkit/router.py
@@ -1466,34 +1466,6 @@ class RouterTest(tests.Test):
['blob'],
[i for i in File('blob').iter_content()])
- def test_File_IterContentByUrl(self):
- this.http = http.Connection()
-
- class Routes(object):
-
- @route('GET')
- def probe(self):
- this.response['content-type'] = 'foo/bar'
- this.response['content-length'] = str(len('probe'))
- this.response['content-disposition'] = 'attachment; filename="foo"'
-
- return 'probe'
-
- server = coroutine.WSGIServer(('127.0.0.1', client.ipc_port.value), Router(Routes()))
- coroutine.spawn(server.serve_forever)
- coroutine.dispatch()
-
- blob = File(None, meta={'location': 'http://127.0.0.1:%s' % client.ipc_port.value})
- self.assertEqual(
- ['probe'],
- [i for i in blob.iter_content()])
- self.assertEqual({
- 'content-length': '5',
- 'content-type': 'foo/bar',
- 'content-disposition': 'attachment; filename="foo"',
- },
- blob.meta)
-
def test_SetCookie(self):
class Routes(object):