diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-04-29 05:03:36 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-04-29 05:03:36 (GMT) |
commit | 8ed5799f7d891b8d09ffdc298368f93382ddea5b (patch) | |
tree | 59bbe820568cded8efcb300649fed680030c368f | |
parent | 37df873b8854ed99bba37139b135f92e204b8bef (diff) |
Fix file-less blobs streaming
-rw-r--r-- | sugar_network/db/blobs.py | 13 | ||||
-rw-r--r-- | sugar_network/toolkit/packets.py | 24 | ||||
-rw-r--r-- | sugar_network/toolkit/router.py | 17 | ||||
-rwxr-xr-x | tests/units/db/blobs.py | 12 | ||||
-rwxr-xr-x | tests/units/toolkit/packets.py | 49 | ||||
-rwxr-xr-x | tests/units/toolkit/router.py | 28 |
6 files changed, 72 insertions, 71 deletions
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index cc0944f..6426341 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -145,11 +145,13 @@ class Blobs(object): _logger.debug('Post %r file', path) + if not exists(path): + path = None return File(path, digest, meta) def update(self, path, meta): path = self.path(path) - enforce(exists(path), http.NotFound, 'No such blob') + enforce(exists(path + _META_SUFFIX), http.NotFound, 'No such blob') orig_meta = _read_meta(path) orig_meta.update(meta) _write_meta(path, orig_meta, None) @@ -157,7 +159,10 @@ class Blobs(object): def get(self, digest): path = self.path(digest) if exists(path + _META_SUFFIX): - return File(path, digest, _read_meta(path)) + meta = _read_meta(path) + if not exists(path): + path = None + return File(path, digest, meta) elif isdir(path): return _lsdir(path, digest) elif exists(path): @@ -226,6 +231,8 @@ class Blobs(object): digest = join(rel_root, filename) meta.append(('path', digest)) if yield_files: + if not exists(path): + path = None yield File(path, digest, meta) else: yield @@ -275,7 +282,7 @@ def _write_meta(path, meta, seqno): for key, value in meta.items() if isinstance(meta, dict) else meta: if seqno is None and key == 'x-seqno': seqno = int(value) - f.write('%s: %s\n' % (key, value)) + f.write('%s: %s\n' % (key, toolkit.ascii(value))) os.utime(path, (seqno, seqno)) diff --git a/sugar_network/toolkit/packets.py b/sugar_network/toolkit/packets.py index 46bc223..0960a1f 100644 --- a/sugar_network/toolkit/packets.py +++ b/sugar_network/toolkit/packets.py @@ -28,7 +28,6 @@ from os.path import dirname, exists, join from sugar_network import toolkit from sugar_network.toolkit.router import File -from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce @@ -60,17 +59,14 @@ def decode(stream, limit=None): def encode(items, limit=None, header=None, compresslevel=None, - on_complete=None, **kwargs): + on_complete=None, download_blobs=False, **kwargs): _logger.debug('Encode %r limit=%r header=%r', items, limit, header) if compresslevel is 0: ostream = _Encoder() else: ostream = _ZippedEncoder(compresslevel) - - # In case of downloading blobs - # (?) reuse current `this.http` - this.http = http.Connection() + connection = http.Connection() if limit is None: limit = sys.maxint @@ -116,6 +112,8 @@ def encode(items, limit=None, header=None, compresslevel=None, if isinstance(record, File): blob_len = record.size chunk = record.meta + if not record.path: + chunk['digest'] = record.digest else: chunk = record chunk = ostream.write_record(chunk, @@ -130,8 +128,14 @@ def encode(items, limit=None, header=None, compresslevel=None, continue if chunk: yield chunk - if blob_len: - for chunk in record.iter_content(): + if blob_len and (record.path or download_blobs): + if record.path: + blob_content = record.iter_content() + else: + url = record.meta['location'] + enforce(url, http.NotFound, 'No location') + blob_content = connection.download(url) + for chunk in blob_content: blob_len -= len(chunk) if not blob_len: chunk += '\n' @@ -245,6 +249,10 @@ class _DecodeIterator(object): yield record return + if 'location' in record: + yield File(None, digest=record.pop('digest'), meta=record) + return + blob_len = int(blob_len) with toolkit.NamedTemporaryFile() as blob: digest = hashlib.sha1() diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index bd5da32..ec89fec 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -413,21 +413,8 @@ class File(str): return basename(self.path) def iter_content(self): - if self.path: - return self._iter_content() - url = self.meta.get('location') - enforce(url, http.NotFound, 'No location') - blob = this.http.request('GET', url, allow_redirects=True, - # Request for uncompressed data - headers={'accept-encoding': ''}) - self.meta.clear() - for tag in ('content-length', 'content-type', 'content-disposition'): - value = blob.headers.get(tag) - if value: - self.meta[tag] = value - return blob.iter_content(toolkit.BUFFER_SIZE) - - def _iter_content(self): + if not self.path: + return with file(self.path, 'rb') as f: while True: chunk = f.read(toolkit.BUFFER_SIZE) diff --git a/tests/units/db/blobs.py b/tests/units/db/blobs.py index d5d2b1e..76c544b 100755 --- a/tests/units/db/blobs.py +++ b/tests/units/db/blobs.py @@ -105,9 +105,7 @@ class BlobsTest(tests.Test): self.assertEqual( '0000000000000000000000000000000000000000', blob.digest) - self.assertEqual( - abspath('blobs/%s/%s' % (blob.digest[:3], blob.digest)), - blob.path) + assert blob.path is None self.assertEqual({ 'status': '301 Moved Permanently', 'location': 'location', @@ -116,8 +114,6 @@ class BlobsTest(tests.Test): 'x-seqno': '1', }, blob.meta) - - assert not exists(blob.path) self.assertEqual( sorted([ 'status: 301 Moved Permanently', @@ -126,7 +122,7 @@ class BlobsTest(tests.Test): 'content-length: 101', 'x-seqno: 1', ]), - sorted(file(blob.path + '.meta').read().strip().split('\n'))) + sorted(file('blobs/%s/%s.meta' % (blob.digest[:3], blob.digest)).read().strip().split('\n'))) the_same_blob = blobs.get(blob.digest) assert the_same_blob is not blob @@ -388,7 +384,7 @@ class BlobsTest(tests.Test): blobs.patch(File('./fake', '0000000000000000000000000000000000000001', {'n': 3, 'content-length': '0'}), -3) blob = blobs.get('0000000000000000000000000000000000000001') - assert not exists(blob.path) + assert blob.path is None self.assertEqual({'x-seqno': '-3', 'n': '1', 'status': '410 Gone'}, blob.meta) def test_patch_File(self): @@ -406,7 +402,7 @@ class BlobsTest(tests.Test): blobs.patch(File('./fake', 'foo/bar', {'n': 3, 'content-length': '0', 'path': 'foo/bar'}), -3) blob = blobs.get('foo/bar') - assert not exists(blob.path) + assert blob.path is None self.assertEqual({'x-seqno': '-3', 'n': '1', 'status': '410 Gone'}, blob.meta) def test_walk_Blobs(self): diff --git a/tests/units/toolkit/packets.py b/tests/units/toolkit/packets.py index 6a181b6..3836b6e 100755 --- a/tests/units/toolkit/packets.py +++ b/tests/units/toolkit/packets.py @@ -340,6 +340,28 @@ class PacketsTest(tests.Test): self.assertRaises(StopIteration, next, packets_iter) self.assertEqual(len(stream.getvalue()), stream.tell()) + def test_decode_BlobUrls(self): + stream = zips( + json.dumps({}) + '\n' + + json.dumps({'segment': 1}) + '\n' + + json.dumps({'num': 1, 'content-length': 1}) + '\n' + + 'a' + + json.dumps({'num': 2, 'content-length': 100, 'location': 'http://foo/bar', 'digest': 'digest'}) + '\n' + + json.dumps({'num': 3, 'content-length': 3}) + '\n' + + 'ccc' + ) + packets_iter = iter(packets.decode(stream)) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + self.assertEqual([ + (1, 'a'), + (2, None), + (3, 'ccc'), + ], + [(i.meta['num'], file(i.path).read() if i.path else None) for i in packet]) + self.assertRaises(StopIteration, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + def test_encode_Zipped(self): stream = ''.join([i for i in packets.encode([])]) self.assertEqual( @@ -593,7 +615,7 @@ class PacketsTest(tests.Test): 'ccc' + '\n', unzips(stream)) - def test_encode_BlobWithUrls(self): + def test_encode_BlobUrls(self): class Routes(object): @@ -607,37 +629,46 @@ class PacketsTest(tests.Test): url = 'http://127.0.0.1:%s' % client.ipc_port.value stream = ''.join([i for i in packets.encode([ - (1, None, [File(None, meta={'location': 'fake'})]), + (1, None, [File(None, digest='digest', meta={'location': 'fake'})]), ])]) self.assertEqual( json.dumps({}) + '\n' + json.dumps({'segment': 1}) + '\n' + - json.dumps({'location': 'fake'}) + '\n', + json.dumps({'digest': 'digest', 'location': 'fake'}) + '\n', unzips(stream)) stream = ''.join([i for i in packets.encode([ - (1, None, [File(None, meta={'location': 'fake', 'content-length': '0'})]), + (1, None, [File(None, digest='digest', meta={'location': 'fake', 'content-length': '0'})]), ])]) self.assertEqual( json.dumps({}) + '\n' + json.dumps({'segment': 1}) + '\n' + - json.dumps({'location': 'fake', 'content-length': '0'}) + '\n', + json.dumps({'location': 'fake', 'content-length': '0', 'digest': 'digest'}) + '\n', unzips(stream)) stream = ''.join([i for i in packets.encode([ - (1, None, [File(None, meta={'location': url, 'content-length': str(len('probe'))})]), + (1, None, [File(None, digest='digest', meta={'location': url, 'content-length': str(len('probe'))})]), ])]) self.assertEqual( json.dumps({}) + '\n' + json.dumps({'segment': 1}) + '\n' + - json.dumps({'location': url, 'content-length': str(len('probe'))}) + '\n' + + json.dumps({'location': url, 'content-length': str(len('probe')), 'digest': 'digest'}) + '\n', + unzips(stream)) + + stream = ''.join([i for i in packets.encode([ + (1, None, [File(None, digest='digest', meta={'location': url, 'content-length': str(len('probe'))})]), + ], download_blobs=True)]) + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'segment': 1}) + '\n' + + json.dumps({'location': url, 'content-length': str(len('probe')), 'digest': 'digest'}) + '\n' + 'probe' + '\n', unzips(stream)) def encode(): stream = ''.join([i for i in packets.encode([ - (1, None, [File(None, meta={'location': 'http://127.0.0.1:108', 'content-length': str(len('probe'))})]), - ])]) + (1, None, [File(None, digest='digest', meta={'location': 'http://127.0.0.1:108', 'content-length': str(len('probe'))})]), + ], download_blobs=True)]) self.assertRaises(http.ConnectionError, encode) def test_limited_encode_Blobs(self): diff --git a/tests/units/toolkit/router.py b/tests/units/toolkit/router.py index 7222da1..2231107 100755 --- a/tests/units/toolkit/router.py +++ b/tests/units/toolkit/router.py @@ -1466,34 +1466,6 @@ class RouterTest(tests.Test): ['blob'], [i for i in File('blob').iter_content()]) - def test_File_IterContentByUrl(self): - this.http = http.Connection() - - class Routes(object): - - @route('GET') - def probe(self): - this.response['content-type'] = 'foo/bar' - this.response['content-length'] = str(len('probe')) - this.response['content-disposition'] = 'attachment; filename="foo"' - - return 'probe' - - server = coroutine.WSGIServer(('127.0.0.1', client.ipc_port.value), Router(Routes())) - coroutine.spawn(server.serve_forever) - coroutine.dispatch() - - blob = File(None, meta={'location': 'http://127.0.0.1:%s' % client.ipc_port.value}) - self.assertEqual( - ['probe'], - [i for i in blob.iter_content()]) - self.assertEqual({ - 'content-length': '5', - 'content-type': 'foo/bar', - 'content-disposition': 'attachment; filename="foo"', - }, - blob.meta) - def test_SetCookie(self): class Routes(object): |