diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-06 15:33:04 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-03-07 04:58:00 (GMT) |
commit | 90f74541ec4925bad47466e39517c22ff7eadfe4 (patch) | |
tree | f8fca9c302904981a46e275fcaa5a2305ea99f8d /sugar_network | |
parent | 1028755053ef3d8c538138b37e61ece13b9c1a23 (diff) |
Keep data synchronization in db module; use blobs storage to keep standalone files
Diffstat (limited to 'sugar_network')
-rw-r--r-- | sugar_network/db/blobs.py | 314 | ||||
-rw-r--r-- | sugar_network/db/directory.py | 96 | ||||
-rw-r--r-- | sugar_network/db/index.py | 2 | ||||
-rw-r--r-- | sugar_network/db/metadata.py | 43 | ||||
-rw-r--r-- | sugar_network/db/resource.py | 18 | ||||
-rw-r--r-- | sugar_network/db/routes.py | 23 | ||||
-rw-r--r-- | sugar_network/db/storage.py | 10 | ||||
-rw-r--r-- | sugar_network/db/volume.py | 91 | ||||
-rw-r--r-- | sugar_network/model/__init__.py | 16 | ||||
-rw-r--r-- | sugar_network/model/report.py | 2 | ||||
-rw-r--r-- | sugar_network/model/routes.py | 5 | ||||
-rw-r--r-- | sugar_network/toolkit/__init__.py | 235 | ||||
-rw-r--r-- | sugar_network/toolkit/http.py | 6 | ||||
-rw-r--r-- | sugar_network/toolkit/parcel.py | 52 | ||||
-rw-r--r-- | sugar_network/toolkit/ranges.py | 198 | ||||
-rw-r--r-- | sugar_network/toolkit/router.py | 93 |
16 files changed, 701 insertions, 503 deletions
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index a9d66e0..cd795c6 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -16,137 +16,215 @@ import os import logging import hashlib +import mimetypes from contextlib import contextmanager -from os.path import exists, abspath, join, isdir, isfile +from os.path import exists, abspath, join, dirname from sugar_network import toolkit from sugar_network.toolkit.router import File -from sugar_network.toolkit import http, enforce +from sugar_network.toolkit import http, ranges, enforce _META_SUFFIX = '.meta' _logger = logging.getLogger('db.blobs') -_root = None - - -def init(path): - global _root - _root = abspath(path) - if not exists(_root): - os.makedirs(_root) - - -def post(content, mime_type=None, digest_to_assert=None, meta=None): - if meta is None: - meta = [] - meta.append(('content-type', mime_type or 'application/octet-stream')) - else: - meta = meta.items() - if mime_type: - meta.append(('content-type', mime_type)) - - @contextmanager - def write_blob(): - tmp_path = join(_path(), 'post') - if hasattr(content, 'read'): - with toolkit.new_file(tmp_path) as blob: - digest = hashlib.sha1() - while True: - chunk = content.read(toolkit.BUFFER_SIZE) - if not chunk: - break - blob.write(chunk) - digest.update(chunk) - yield blob, digest.hexdigest() - elif isinstance(content, dict): - enforce('location' in content, http.BadRequest, 'No location') - enforce('digest' in content, http.BadRequest, 'No digest') - meta.append(('status', '301 Moved Permanently')) - meta.append(('location', content['location'])) - with toolkit.new_file(tmp_path) as blob: - yield blob, content['digest'] - else: - with toolkit.new_file(tmp_path) as blob: - blob.write(content) - yield blob, hashlib.sha1(content).hexdigest() - - with write_blob() as (blob, digest): - if digest_to_assert and digest != digest_to_assert: - blob.unlink() - raise http.BadRequest('Digest mismatch') - path = _path(digest) - meta.append(('content-length', str(blob.tell()))) - with toolkit.new_file(path + _META_SUFFIX) as f: - for key, value in meta: - f.write('%s: %s\n' % (key, value)) - blob.name = path - - return File(path, digest, meta) - - -def update(digest, meta): - path = _path(digest) + _META_SUFFIX - enforce(exists(path), http.NotFound, 'No such blob') - meta_content = '' - for key, value in meta.items(): - meta_content += '%s: %s\n' % (key, value) - with toolkit.new_file(path) as f: - f.write(meta_content) - -def get(digest): - path = _path(digest) - if not exists(path) or not exists(path + _META_SUFFIX): - return None - meta = [] - with file(path + _META_SUFFIX) as f: - for line in f: - key, value = line.split(':', 1) - meta.append((key, value.strip())) - return File(path, digest, meta) +class Blobs(object): -def delete(digest): - path = _path(digest) - if exists(path + _META_SUFFIX): - os.unlink(path + _META_SUFFIX) - if exists(path): - os.unlink(path) + def __init__(self, root, seqno): + self._root = abspath(root) + self._seqno = seqno + def path(self, *args): + if len(args) == 1 and len(args[0]) == 40 and '.' not in args[0]: + return self._blob_path(args[0]) + else: + return join(self._root, 'files', *args) -def diff(in_seq, out_seq=None): - if out_seq is None: - out_seq = toolkit.Sequence([]) - is_the_only_seq = not out_seq - - try: - root = _path() - for name in os.listdir(root): - dirpath = join(root, name) - if not isdir(dirpath) or os.stat(dirpath).st_ctime not in in_seq: + def post(self, content, mime_type=None, digest_to_assert=None, meta=None): + if meta is None: + meta = [] + meta.append(('content-type', + mime_type or 'application/octet-stream')) + else: + meta = meta.items() + if mime_type: + meta.append(('content-type', mime_type)) + + @contextmanager + def write_blob(): + tmp_path = join(self._blob_path(), 'post') + if hasattr(content, 'read'): + with toolkit.new_file(tmp_path) as blob: + digest = hashlib.sha1() + while True: + chunk = content.read(toolkit.BUFFER_SIZE) + if not chunk: + break + blob.write(chunk) + digest.update(chunk) + yield blob, digest.hexdigest() + elif isinstance(content, dict): + enforce('location' in content, http.BadRequest, 'No location') + enforce('digest' in content, http.BadRequest, 'No digest') + meta.append(('status', '301 Moved Permanently')) + meta.append(('location', content['location'])) + with toolkit.new_file(tmp_path) as blob: + yield blob, content['digest'] + else: + with toolkit.new_file(tmp_path) as blob: + blob.write(content) + yield blob, hashlib.sha1(content).hexdigest() + + with write_blob() as (blob, digest): + if digest_to_assert and digest != digest_to_assert: + blob.unlink() + raise http.BadRequest('Digest mismatch') + path = self._blob_path(digest) + seqno = self._seqno.next() + meta.append(('content-length', str(blob.tell()))) + meta.append(('x-seqno', str(seqno))) + _write_meta(path, meta, seqno) + blob.name = path + os.utime(path, (seqno, seqno)) + + _logger.debug('Post %r file', path) + + return File(path, digest, meta) + + def update(self, path, meta): + path = self.path(path) + enforce(exists(path), http.NotFound, 'No such blob') + orig_meta = _read_meta(path) + orig_meta.update(meta) + _write_meta(path, orig_meta, None) + + def get(self, digest): + path = self.path(digest) + if exists(path + _META_SUFFIX): + return File(path, digest, _read_meta(path)) + + def delete(self, path): + self._delete(path, None) + + def diff(self, r, path=None, recursive=True): + if path is None: + is_files = False + root = self._blob_path() + else: + path = path.strip('/').split('/') + enforce(not [i for i in path if i == '..'], + http.BadRequest, 'Relative paths are not allowed') + is_files = True + root = self.path(*path) + checkin_seqno = None + + for root, __, files in os.walk(root): + if not ranges.contains(r, int(os.stat(root).st_mtime)): continue - for digest in os.listdir(dirpath): - if len(digest) != 40: - continue - path = join(dirpath, digest) - if not isfile(path): + rel_root = root[len(self._root) + 7:] if is_files else None + for filename in files: + path = join(root, filename) + if filename.endswith(_META_SUFFIX): + seqno = int(os.stat(path).st_mtime) + path = path[:-len(_META_SUFFIX)] + meta = None + if exists(path): + stat = os.stat(path) + if seqno != int(stat.st_mtime): + _logger.debug('Found updated %r file', path) + seqno = self._seqno.next() + meta = _read_meta(path) + meta['x-seqno'] = str(seqno) + meta['content-length'] = str(stat.st_size) + _write_meta(path, meta, seqno) + os.utime(path, (seqno, seqno)) + if not ranges.contains(r, seqno): + continue + if meta is None: + meta = _read_meta(path) + if is_files: + digest = join(rel_root, filename[:-len(_META_SUFFIX)]) + meta['path'] = digest + else: + digest = filename[:-len(_META_SUFFIX)] + elif not is_files or exists(path + _META_SUFFIX): continue - ctime = int(os.stat(path).st_ctime) - if ctime not in in_seq: - continue - blob = get(digest) - if blob is None: - continue - yield blob - out_seq.include(ctime, ctime) - if is_the_only_seq: - # There is only one diff, so, we can stretch it to remove all holes - out_seq.stretch() - except StopIteration: - pass - - -def _path(digest=None): - enforce(_root is not None, 'Blobs storage is not initialized') - return join(_root, digest[:3], digest) if digest else _root + else: + _logger.debug('Found new %r file', path) + mime_type = mimetypes.guess_type(filename)[0] or \ + 'application/octet-stream' + if checkin_seqno is None: + checkin_seqno = self._seqno.next() + seqno = checkin_seqno + meta = [('content-type', mime_type), + ('content-length', str(os.stat(path).st_size)), + ('x-seqno', str(seqno)), + ] + _write_meta(path, meta, seqno) + os.utime(path, (seqno, seqno)) + if not ranges.contains(r, seqno): + continue + digest = join(rel_root, filename) + meta.append(('path', digest)) + yield File(path, digest, meta) + if not recursive: + break + + def patch(self, patch, seqno): + if 'path' in patch: + path = self.path(patch.pop('path')) + else: + path = self._blob_path(patch.digest) + if not patch.size: + self._delete(path, seqno) + return + if not exists(dirname(path)): + os.makedirs(dirname(path)) + os.rename(patch.path, path) + if exists(path + _META_SUFFIX): + meta = _read_meta(path) + meta.update(patch) + else: + meta = patch + meta['x-seqno'] = str(seqno) + _write_meta(path, meta, seqno) + os.utime(path, (seqno, seqno)) + + def _delete(self, path, seqno): + path = self.path(path) + if exists(path + _META_SUFFIX): + if seqno is None: + seqno = self._seqno.next() + meta = _read_meta(path) + meta['status'] = '410 Gone' + meta['x-seqno'] = str(seqno) + _write_meta(path, meta, seqno) + if exists(path): + _logger.debug('Delete %r file', path) + os.unlink(path) + + def _blob_path(self, digest=None): + if not digest: + return join(self._root, 'blobs') + return join(self._root, 'blobs', digest[:3], digest) + + +def _write_meta(path, meta, seqno): + path += _META_SUFFIX + with toolkit.new_file(path) as f: + for key, value in meta.items() if isinstance(meta, dict) else meta: + if seqno is None and key == 'x-seqno': + seqno = int(value) + f.write('%s: %s\n' % (key, value)) + os.utime(path, (seqno, seqno)) + + +def _read_meta(path): + meta = {} + with file(path + _META_SUFFIX) as f: + for line in f: + key, value = line.split(':', 1) + meta[key] = value.strip() + return meta diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py index c6957d7..3ef4b91 100644 --- a/sugar_network/db/directory.py +++ b/sugar_network/db/directory.py @@ -13,7 +13,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import shutil import logging from os.path import exists, join @@ -33,7 +32,7 @@ _logger = logging.getLogger('db.directory') class Directory(object): - def __init__(self, root, resource_class, index_class, seqno=None): + def __init__(self, root, resource, index_class, seqno): """ :param index_class: what class to use to access to indexes, for regular casses @@ -41,19 +40,16 @@ class Directory(object): keep writer in separate process). """ - if not exists(root): - os.makedirs(root) - - if resource_class.metadata is None: + if resource.metadata is None: # Metadata cannot be recreated - resource_class.metadata = Metadata(resource_class) - resource_class.metadata['guid'] = Guid() - self.metadata = resource_class.metadata + resource.metadata = Metadata(resource) + resource.metadata['guid'] = Guid() + self.metadata = resource.metadata - self.resource_class = resource_class + self.resource = resource self._index_class = index_class self._root = root - self._seqno = _SessionSeqno() if seqno is None else seqno + self._seqno = seqno self._storage = None self._index = None @@ -62,7 +58,10 @@ class Directory(object): def wipe(self): self.close() _logger.debug('Wipe %r directory', self.metadata.name) - shutil.rmtree(self._root, ignore_errors=True) + shutil.rmtree(join(self._root, 'index', self.metadata.name), + ignore_errors=True) + shutil.rmtree(join(self._root, 'db', self.metadata.name), + ignore_errors=True) self._open() def close(self): @@ -129,7 +128,7 @@ class Directory(object): enforce(cached_props or record.exists, http.NotFound, 'Resource %r does not exist in %r', guid, self.metadata.name) - return self.resource_class(guid, record, cached_props) + return self.resource(guid, record, cached_props) def __getitem__(self, guid): return self.get(guid) @@ -141,7 +140,7 @@ class Directory(object): for hit in mset: guid = hit.document.get_value(0) record = self._storage.get(guid) - yield self.resource_class(guid, record) + yield self.resource(guid, record) return iterate(), mset.get_matches_estimated() @@ -186,74 +185,52 @@ class Directory(object): self._save_layout() self.commit() - def diff(self, seq, exclude_seq=None, **params): - if exclude_seq is not None: - for start, end in exclude_seq: - seq.exclude(start, end) - if 'group_by' in params: - # Pickup only most recent change - params['order_by'] = '-seqno' - else: - params['order_by'] = 'seqno' - params['no_cache'] = True - - for start, end in seq: - query = 'seqno:%s..' % start - if end: - query += str(end) - documents, __ = self.find(query=query, **params) - for doc in documents: - yield doc.guid, doc.diff(seq) - - def merge(self, guid, diff): + def patch(self, guid, patch, seqno=None): """Apply changes for documents.""" - doc = self.resource_class(guid, self._storage.get(guid)) + doc = self.resource(guid, self._storage.get(guid)) - for prop, meta in diff.items(): + for prop, meta in patch.items(): orig_meta = doc.meta(prop) if orig_meta and orig_meta['mtime'] >= meta['mtime']: continue if doc.post_seqno is None: - doc.post_seqno = self._seqno.next() + if seqno is None: + seqno = self._seqno.next() + doc.post_seqno = seqno doc.post(prop, **meta) - if doc.post_seqno is None: - return None, False - - if doc.exists: + if doc.post_seqno is not None and doc.exists: # No need in after-merge event, further commit event # is enough to avoid increasing events flow self._index.store(guid, doc.props, self._preindex) - return doc.post_seqno, True + return seqno def _open(self): - if not exists(self._root): - os.makedirs(self._root) - index_path = join(self._root, 'index') + index_path = join(self._root, 'index', self.metadata.name) if self._is_layout_stale(): if exists(index_path): _logger.warning('%r layout is stale, remove index', self.metadata.name) shutil.rmtree(index_path, ignore_errors=True) self._save_layout() - self._storage = Storage(self._root, self.metadata) self._index = self._index_class(index_path, self.metadata, self._postcommit) - _logger.debug('Open %r resource', self.resource_class) + self._storage = Storage(join(self._root, 'db', self.metadata.name)) + _logger.debug('Open %r resource', self.resource) def _broadcast(self, event): event['resource'] = self.metadata.name this.broadcast(event) def _preindex(self, guid, changes): - doc = self.resource_class(guid, self._storage.get(guid), changes) + doc = self.resource(guid, self._storage.get(guid), changes) for prop in self.metadata: enforce(doc[prop] is not None, 'Empty %r property', prop) return doc.props def _prestore(self, guid, changes, event): - doc = self.resource_class(guid, self._storage.get(guid)) + doc = self.resource(guid, self._storage.get(guid)) doc.post_seqno = self._seqno.next() for prop in self.metadata.keys(): value = changes.get(prop) @@ -272,31 +249,14 @@ class Directory(object): self._broadcast({'event': 'commit', 'mtime': self._index.mtime}) def _save_layout(self): - path = join(self._root, 'layout') + path = join(self._root, 'index', self.metadata.name, 'layout') with toolkit.new_file(path) as f: f.write(str(_LAYOUT_VERSION)) def _is_layout_stale(self): - path = join(self._root, 'layout') + path = join(self._root, 'index', self.metadata.name, 'layout') if not exists(path): return True with file(path) as f: version = f.read() return not version.isdigit() or int(version) != _LAYOUT_VERSION - - -class _SessionSeqno(object): - - def __init__(self): - self._value = 0 - - @property - def value(self): - return self._value - - def next(self): - self._value += 1 - return self._value - - def commit(self): - pass diff --git a/sugar_network/db/index.py b/sugar_network/db/index.py index b44bdfb..eb8f0cb 100644 --- a/sugar_network/db/index.py +++ b/sugar_network/db/index.py @@ -123,7 +123,7 @@ class IndexReader(object): raise NotImplementedError() def find(self, offset=0, limit=None, query='', reply=('guid',), - order_by=None, no_cache=False, group_by=None, **request): + order_by=None, group_by=None, **request): """Search resources within the index. The result will be an array of dictionaries with found documents' diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py index 9ba5998..ecefdab 100644 --- a/sugar_network/db/metadata.py +++ b/sugar_network/db/metadata.py @@ -16,7 +16,6 @@ import xapian from sugar_network import toolkit -from sugar_network.db import blobs from sugar_network.toolkit.router import ACL, File from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import i18n, http, enforce @@ -304,49 +303,27 @@ class Blob(Property): return value enforce(value is None or isinstance(value, basestring) or - isinstance(value, dict) and value or hasattr(value, 'read'), - 'Inappropriate blob value') + hasattr(value, 'read'), + http.BadRequest, 'Inappropriate blob value') if not value: return '' - if not isinstance(value, dict): - mime_type = None - if this.request.prop == self.name: - mime_type = this.request.content_type - if not mime_type: - mime_type = self.mime_type - return blobs.post(value, mime_type).digest - - digest = this.resource[self.name] if self.name else None - if digest: - orig = blobs.get(digest) - enforce('digest' not in value or value.pop('digest') == digest, - "Inappropriate 'digest' value") - enforce(orig.path or 'location' in orig or 'location' in value, - 'Blob points to nothing') - if 'location' in value and orig.path: - blobs.delete(digest) - orig.update(value) - value = orig - else: - enforce('location' in value, 'Blob points to nothing') - enforce('digest' in value, "Missed 'digest' value") - if 'content-type' not in value: - value['content-type'] = self.mime_type - digest = value.pop('digest') - - blobs.update(digest, value) - return digest + mime_type = None + if this.request.prop == self.name: + mime_type = this.request.content_type + if not mime_type: + mime_type = self.mime_type + return this.volume.blobs.post(value, mime_type).digest def reprcast(self, value): if not value: return File.AWAY - return blobs.get(value) + return this.volume.blobs.get(value) def teardown(self, value): if value: - blobs.delete(value) + this.volume.blobs.delete(value) def assert_access(self, mode, value=None): if mode == ACL.WRITE and not value: diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py index 2636dca..71a3efd 100644 --- a/sugar_network/db/resource.py +++ b/sugar_network/db/resource.py @@ -18,6 +18,7 @@ from sugar_network.db.metadata import Numeric, List, Authors from sugar_network.db.metadata import Composite, Aggregated from sugar_network.toolkit.coroutine import this from sugar_network.toolkit.router import ACL +from sugar_network.toolkit import ranges class Resource(object): @@ -25,6 +26,8 @@ class Resource(object): #: `Metadata` object that describes the document metadata = None + #: Whether these resources should be migrated from slave-to-master only + one_way = False def __init__(self, guid, record, cached_props=None): self.props = cached_props or {} @@ -118,7 +121,9 @@ class Resource(object): if self.record is not None: return self.record.get(prop) - def diff(self, seq): + def diff(self, r): + patch = {} + last_seqno = None for name, prop in self.metadata.items(): if name == 'seqno' or prop.acl & ACL.CALC: continue @@ -126,19 +131,20 @@ class Resource(object): if meta is None: continue seqno = meta.get('seqno') - if seqno not in seq: + if not ranges.contains(r, seqno): continue + last_seqno = max(seqno, last_seqno) value = meta.get('value') if isinstance(prop, Aggregated): value_ = {} for key, agg in value.items(): - if agg.pop('seqno') in seq: + if ranges.contains(r, agg.pop('seqno')): value_[key] = agg value = value_ - meta = {'mtime': meta['mtime'], 'value': value} - yield name, meta, seqno + patch[name] = {'mtime': meta['mtime'], 'value': value} + return last_seqno, patch - def patch(self, props): + def format_patch(self, props): if not props: return {} patch = {} diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py index d8d2fb4..153e0a7 100644 --- a/sugar_network/db/routes.py +++ b/sugar_network/db/routes.py @@ -19,10 +19,8 @@ import logging from contextlib import contextmanager from sugar_network import toolkit -from sugar_network.db import blobs from sugar_network.db.metadata import Aggregated -from sugar_network.toolkit.router import ACL, File -from sugar_network.toolkit.router import route, preroute, fallbackroute +from sugar_network.toolkit.router import ACL, File, route, fallbackroute from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, enforce @@ -39,11 +37,6 @@ class Routes(object): self._find_limit = find_limit this.volume = self.volume - @preroute - def __preroute__(self, op, request, response): - this.request = request - this.response = response - @route('POST', [None], acl=ACL.AUTH, mime_type='application/json') def create(self, request): with self._post(request, ACL.CREATE) as doc: @@ -91,14 +84,6 @@ class Routes(object): self.volume[request.resource].update(doc.guid, doc.props) self.after_post(doc) - @route('GET', [None, None], cmd='diff', mime_type='application/json') - def diff(self, request): - result = {} - res = self.volume[request.resource][request.guid] - for prop, meta, __ in res.diff(toolkit.Sequence([[0, None]])): - result[prop] = meta - return result - @route('PUT', [None, None, None], acl=ACL.AUTH | ACL.AUTHOR) def update_prop(self, request): if request.content is None: @@ -187,8 +172,8 @@ class Routes(object): directory.update(request.guid, {'author': authors}) @fallbackroute('GET', ['blobs']) - def blobs(self, request): - return blobs.get(request.guid) + def blobs(self): + return this.volume.blobs.get(this.request.guid) def on_create(self, request, props): ts = int(time.time()) @@ -215,7 +200,7 @@ class Routes(object): directory = self.volume[request.resource] if access == ACL.CREATE: - doc = directory.resource_class(None, None) + doc = directory.resource(None, None) if 'guid' in content: # TODO Temporal security hole, see TODO guid = content['guid'] diff --git a/sugar_network/db/storage.py b/sugar_network/db/storage.py index 72cbcf7..bbb50db 100644 --- a/sugar_network/db/storage.py +++ b/sugar_network/db/storage.py @@ -25,9 +25,8 @@ from sugar_network import toolkit class Storage(object): """Get access to documents' data storage.""" - def __init__(self, root, metadata): + def __init__(self, root): self._root = root - self.metadata = metadata def get(self, guid): """Get access to particular document's properties. @@ -50,12 +49,7 @@ class Storage(object): path = self._path(guid) if not exists(path): return - try: - shutil.rmtree(path) - except Exception, error: - toolkit.exception() - raise RuntimeError('Cannot delete %r document from %r: %s' % - (guid, self.metadata.name, error)) + shutil.rmtree(path) def walk(self, mtime): """Generator function to enumerate all existing documents. diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py index 6457b93..5ec5683 100644 --- a/sugar_network/db/volume.py +++ b/sugar_network/db/volume.py @@ -15,12 +15,14 @@ import os import logging +from copy import deepcopy from os.path import exists, join, abspath from sugar_network import toolkit from sugar_network.db.directory import Directory from sugar_network.db.index import IndexWriter -from sugar_network.toolkit import http, coroutine, enforce +from sugar_network.db.blobs import Blobs +from sugar_network.toolkit import http, coroutine, ranges, enforce _logger = logging.getLogger('db.volume') @@ -44,8 +46,10 @@ class Volume(dict): if not exists(root): os.makedirs(root) self._index_class = index_class - self.seqno = toolkit.Seqno(join(self._root, 'db.seqno')) - self.releases_seqno = toolkit.Seqno(join(self._root, 'releases.seqno')) + self.seqno = toolkit.Seqno(join(self._root, 'var', 'db.seqno')) + self.releases_seqno = toolkit.Seqno( + join(self._root, 'var', 'releases.seqno')) + self.blobs = Blobs(root, self.seqno) for document in documents: if isinstance(document, basestring): @@ -72,6 +76,74 @@ class Volume(dict): for __ in cls.populate(): coroutine.dispatch() + def diff(self, r, files=None, one_way=False): + last_seqno = None + try: + for resource, directory in self.items(): + if one_way and directory.resource.one_way: + continue + directory.commit() + yield {'resource': resource} + for start, end in r: + query = 'seqno:%s..' % start + if end: + query += str(end) + docs, __ = directory.find(query=query, order_by='seqno') + for doc in docs: + seqno, patch = doc.diff(r) + if not patch: + continue + yield {'guid': doc.guid, 'patch': patch} + last_seqno = max(last_seqno, seqno) + for blob in self.blobs.diff(r): + seqno = int(blob.pop('x-seqno')) + yield blob + last_seqno = max(last_seqno, seqno) + for dirpath in files or []: + for blob in self.blobs.diff(r, dirpath): + seqno = int(blob.pop('x-seqno')) + yield blob + last_seqno = max(last_seqno, seqno) + except StopIteration: + pass + + if last_seqno: + commit_r = deepcopy(r) + ranges.exclude(commit_r, last_seqno + 1, None) + ranges.exclude(r, None, last_seqno) + yield {'commit': commit_r} + + def patch(self, records): + directory = None + commit_r = [] + merged_r = [] + seqno = None + + for record in records: + resource_ = record.get('resource') + if resource_: + directory = self[resource_] + continue + + if 'guid' in record: + seqno = directory.patch(record['guid'], record['patch'], seqno) + continue + + if 'content-length' in record: + if seqno is None: + seqno = self.seqno.next() + self.blobs.patch(record, seqno) + continue + + commit = record.get('commit') + if commit is not None: + ranges.include(commit_r, commit) + continue + + if seqno is not None: + ranges.include(merged_r, seqno, seqno) + return commit_r, merged_r + def __enter__(self): return self @@ -79,8 +151,8 @@ class Volume(dict): self.close() def __getitem__(self, name): - directory = self.get(name) - if directory is None: + dir_ = self.get(name) + if dir_ is None: enforce(name in self.resources, http.BadRequest, 'Unknown %r resource', name) resource = self.resources[name] @@ -89,11 +161,10 @@ class Volume(dict): cls = getattr(mod, name.capitalize()) else: cls = resource - directory = Directory(join(self._root, name), cls, - self._index_class, self.seqno) - self._populators.spawn(self._populate, directory) - self[name] = directory - return directory + dir_ = Directory(self._root, cls, self._index_class, self.seqno) + self._populators.spawn(self._populate, dir_) + self[name] = dir_ + return dir_ def _populate(self, directory): for __ in directory.populate(): diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py index 5b7a245..bd7405d 100644 --- a/sugar_network/model/__init__.py +++ b/sugar_network/model/__init__.py @@ -22,7 +22,6 @@ from os.path import join import xapian from sugar_network import toolkit, db -from sugar_network.db import blobs from sugar_network.model.routes import FrontRoutes from sugar_network.toolkit.spec import parse_version, parse_requires from sugar_network.toolkit.spec import EMPTY_LICENSE @@ -81,7 +80,7 @@ class Release(object): return release if not isinstance(release, dict): __, release = load_bundle( - blobs.post(release, this.request.content_type), + this.volume.blobs.post(release, this.request.content_type), context=this.request.guid) return release['bundles']['*-*']['blob'], release @@ -91,7 +90,7 @@ class Release(object): 'book' not in this.resource['type']: return for bundle in release['bundles'].values(): - blobs.delete(bundle['blob']) + this.volume.blobs.delete(bundle['blob']) def encode(self, value): return [] @@ -123,6 +122,7 @@ def populate_context_images(props, svg): if 'guid' in props: from sugar_network.toolkit.sugar import color_svg svg = color_svg(svg, props['guid']) + blobs = this.volume.blobs props['artifact_icon'] = blobs.post(svg, 'image/svg+xml').digest props['icon'] = blobs.post(svg_to_png(svg, 55, 55), 'image/png').digest props['logo'] = blobs.post(svg_to_png(svg, 140, 140), 'image/png').digest @@ -212,10 +212,10 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): _logger.debug('Load %r release: %r', context, release) if this.request.principal in context_doc['author']: - diff = context_doc.patch(context_meta) - if diff: - this.call(method='PUT', path=['context', context], content=diff) - context_doc.props.update(diff) + patch = context_doc.format_patch(context_meta) + if patch: + this.call(method='PUT', path=['context', context], content=patch) + context_doc.props.update(patch) # TRANS: Release notes title title = i18n._('%(name)s %(version)s release') else: @@ -237,7 +237,7 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): ''.join(i18n.decode(context_doc['title']).split()), version, mimetypes.guess_extension(blob.get('content-type')) or '', ) - blobs.update(blob.digest, blob) + this.volume.blobs.update(blob.digest, blob) return context, release diff --git a/sugar_network/model/report.py b/sugar_network/model/report.py index 980c3ff..be9fd9f 100644 --- a/sugar_network/model/report.py +++ b/sugar_network/model/report.py @@ -32,6 +32,8 @@ class _Solution(db.Property): class Report(db.Resource): + one_way = True + @db.indexed_property(prefix='C', acl=ACL.CREATE | ACL.READ) def context(self, value): return value diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py index 35c56a9..af19023 100644 --- a/sugar_network/model/routes.py +++ b/sugar_network/model/routes.py @@ -15,7 +15,6 @@ import logging -from sugar_network.db import blobs from sugar_network.toolkit.router import route from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import coroutine @@ -60,8 +59,8 @@ class FrontRoutes(object): return 'User-agent: *\nDisallow: /\n' @route('GET', ['favicon.ico']) - def favicon(self, request, response): - return blobs.get('favicon.ico') + def favicon(self): + return this.volume.blobs.get('favicon.ico') def _broadcast(self, event): _logger.debug('Broadcast event: %r', event) diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index 792267a..d3d9b88 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -21,6 +21,7 @@ import shutil import logging import tempfile import collections +from copy import deepcopy from cStringIO import StringIO from os.path import exists, join, islink, isdir, dirname, basename, abspath from os.path import lexists, isfile @@ -487,231 +488,71 @@ class NamedTemporaryFile(object): return getattr(self._file, name) -class Seqno(object): - """Sequence number counter with persistent storing in a file.""" +class Bin(object): + """Store variable in a file.""" - def __init__(self, path): - """ - :param path: - path to file to [re]store seqno value + def __init__(self, path, default_value=None): + self._path = abspath(path) + self.value = default_value + self._orig_value = None - """ - self._path = path - self._value = 0 - if exists(path): - with file(path) as f: - self._value = int(f.read().strip()) - self._orig_value = self._value + if exists(self._path): + with file(self._path) as f: + self.value = json.load(f) + else: + self.commit() + self._orig_value = deepcopy(self.value) @property - def value(self): - """Current seqno value.""" - return self._value - - def next(self): - """Incerement seqno. - - :returns: - new seqno value - - """ - self._value += 1 - return self._value + def mtime(self): + if exists(self._path): + return os.stat(self._path).st_mtime + else: + return 0 def commit(self): - """Store current seqno value in a file. + """Store current value in a file. :returns: `True` if commit was happened """ - if self._value == self._orig_value: + if self.value == self._orig_value: return False with new_file(self._path) as f: - f.write(str(self._value)) + json.dump(self.value, f) f.flush() os.fsync(f.fileno()) - self._orig_value = self._value + self._orig_value = self.value return True + def __enter__(self): + return self -class Sequence(list): - """List of sorted and non-overlapping ranges. - - List items are ranges, [`start`, `stop']. If `start` or `stop` - is `None`, it means the beginning or ending of the entire sequence. + def __exit__(self, exc_type, exc_value, traceback): + self.commit() - """ - def __init__(self, value=None, empty_value=None): - """ - :param value: - default value to initialize range - :param empty_value: - if not `None`, the initial value for empty range +class Seqno(Bin): + """Sequence number counter with persistent storing in a file.""" + def __init__(self, path): """ - if empty_value is None: - self._empty_value = [] - else: - self._empty_value = [empty_value] - - if value: - self.extend(value) - else: - self.clear() - - def __contains__(self, value): - for start, end in self: - if value >= start and (end is None or value <= end): - return True - else: - return False - - @property - def empty(self): - """Is timeline in the initial state.""" - return self == self._empty_value - - def clear(self): - """Reset range to the initial value.""" - self[:] = self._empty_value - - def stretch(self): - """Remove all holes between the first and the last items.""" - if self: - self[:] = [[self[0][0], self[-1][-1]]] - - def include(self, start, end=None): - """Include specified range. - - :param start: - either including range start or a list of - (`start`, `end`) pairs - :param end: - including range end + :param path: + path to file to [re]store seqno value """ - if issubclass(type(start), collections.Iterable): - for range_start, range_end in start: - self._include(range_start, range_end) - elif start is not None: - self._include(start, end) + Bin.__init__(self, path, 0) - def exclude(self, start, end=None): - """Exclude specified range. + def next(self): + """Incerement seqno. - :param start: - either excluding range start or a list of - (`start`, `end`) pairs - :param end: - excluding range end + :returns: + new seqno value """ - if issubclass(type(start), collections.Iterable): - for range_start, range_end in start: - self._exclude(range_start, range_end) - else: - enforce(end is not None) - self._exclude(start, end) - - def _include(self, range_start, range_end): - if range_start is None: - range_start = 1 - - range_start_new = None - range_start_i = 0 - - for range_start_i, (start, end) in enumerate(self): - if range_end is not None and start - 1 > range_end: - break - if (range_end is None or start - 1 <= range_end) and \ - (end is None or end + 1 >= range_start): - range_start_new = min(start, range_start) - break - else: - range_start_i += 1 - - if range_start_new is None: - self.insert(range_start_i, [range_start, range_end]) - return - - range_end_new = range_end - range_end_i = range_start_i - for i, (start, end) in enumerate(self[range_start_i:]): - if range_end is not None and start - 1 > range_end: - break - if range_end is None or end is None: - range_end_new = None - else: - range_end_new = max(end, range_end) - range_end_i = range_start_i + i - - del self[range_start_i:range_end_i] - self[range_start_i] = [range_start_new, range_end_new] - - def _exclude(self, range_start, range_end): - if range_start is None: - range_start = 1 - enforce(range_end is not None) - enforce(range_start <= range_end and range_start > 0, - 'Start value %r is less than 0 or not less than %r', - range_start, range_end) - - for i, interval in enumerate(self): - start, end = interval - - if end is not None and end < range_start: - # Current `interval` is below new one - continue - - if range_end is not None and range_end < start: - # Current `interval` is above new one - continue - - if end is None or end > range_end: - # Current `interval` will exist after changing - self[i] = [range_end + 1, end] - if start < range_start: - self.insert(i, [start, range_start - 1]) - else: - if start < range_start: - self[i] = [start, range_start - 1] - else: - del self[i] - - if end is not None: - range_start = end + 1 - if range_start < range_end: - self.exclude(range_start, range_end) - break - - -class PersistentSequence(Sequence): - - def __init__(self, path, empty_value=None): - Sequence.__init__(self, empty_value=empty_value) - self._path = path - - if exists(self._path): - with file(self._path) as f: - self[:] = json.load(f) - - @property - def mtime(self): - if exists(self._path): - return os.stat(self._path).st_mtime - else: - return 0 - - def commit(self): - dir_path = dirname(self._path) - if dir_path and not exists(dir_path): - os.makedirs(dir_path) - with new_file(self._path) as f: - json.dump(self, f) - f.flush() - os.fsync(f.fileno()) + self.value += 1 + return self.value class Pool(object): diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 47f13bc..d280035 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -378,14 +378,16 @@ class SugarAuth(object): from M2Crypto import RSA from base64 import b64encode + key_dir = dirname(self._key_path) if exists(self._key_path): + if os.stat(key_dir) & 077: + os.chmod(key_dir, 0700) self._key = RSA.load_key(self._key_path) return - key_dir = dirname(self._key_path) if not exists(key_dir): os.makedirs(key_dir) - os.chmod(key_dir, 0700) + os.chmod(key_dir, 0700) _logger.info('Generate RSA private key at %r', self._key_path) self._key = RSA.gen_key(1024, 65537, lambda *args: None) diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py index 457ea07..43e6960 100644 --- a/sugar_network/toolkit/parcel.py +++ b/sugar_network/toolkit/parcel.py @@ -19,15 +19,19 @@ import zlib import time import json import struct +import hashlib import logging from types import GeneratorType from os.path import dirname, exists, join from sugar_network import toolkit from sugar_network.toolkit.router import File +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce +DEFAULT_COMPRESSLEVEL = 6 + _FILENAME_SUFFIX = '.parcel' _RESERVED_DISK_SPACE = 1024 * 1024 @@ -48,15 +52,19 @@ def decode(stream, limit=None): packet.next() if packet.name == 'last': break - packet.props.update(header) + packet.header.update(header) yield packet -def encode(packets, limit=None, header=None, compresslevel=6): +def encode(packets, limit=None, header=None, compresslevel=None, + on_complete=None): _logger.debug('Encode %r packets limit=%r header=%r', packets, limit, header) ostream = _ZipStream(compresslevel) + # In case of downloading blobs + # (?) reuse current `this.http` + this.http = http.Connection() if limit is None: limit = sys.maxint @@ -87,12 +95,13 @@ def encode(packets, limit=None, header=None, compresslevel=6): record = next(content) continue blob_len = 0 - if isinstance(record, File) and record.path: + if isinstance(record, File): blob_len = record.size chunk = ostream.write_record(record, None if finalizing else limit - blob_len) if chunk is None: _logger.debug('Reach the encoding limit') + on_complete = None if not isinstance(content, GeneratorType): raise StopIteration() finalizing = True @@ -101,22 +110,21 @@ def encode(packets, limit=None, header=None, compresslevel=6): if chunk: yield chunk if blob_len: - with file(record.path, 'rb') as blob: - while True: - chunk = blob.read(BUFFER_SIZE) - if not chunk: - break - blob_len -= len(chunk) - if not blob_len: - chunk += '\n' - chunk = ostream.write(chunk) - if chunk: - yield chunk + for chunk in record.iter_content(): + blob_len -= len(chunk) + if not blob_len: + chunk += '\n' + chunk = ostream.write(chunk) + if chunk: + yield chunk enforce(blob_len == 0, EOFError, 'Blob size mismatch') record = next(content) except StopIteration: pass + if on_complete is not None: + on_complete() + chunk = ostream.write_record({'packet': 'last'}) if chunk: yield chunk @@ -173,7 +181,7 @@ class _DecodeIterator(object): def __init__(self, stream): self._stream = stream - self.props = {} + self.header = {} self._name = None self._shift = True @@ -190,10 +198,10 @@ class _DecodeIterator(object): self._shift = True def __repr__(self): - return '<Packet %r>' % self.props + return '<Packet %r>' % self.header def __getitem__(self, key): - return self.props.get(key) + return self.header.get(key) def __iter__(self): while True: @@ -203,7 +211,7 @@ class _DecodeIterator(object): raise EOFError() if 'packet' in record: self._name = record['packet'] or '' - self.props = record + self.header = record self._shift = False break blob_len = record.get('content-length') @@ -212,13 +220,15 @@ class _DecodeIterator(object): continue blob_len = int(blob_len) with toolkit.NamedTemporaryFile() as blob: + digest = hashlib.sha1() while blob_len: chunk = self._stream.read(min(blob_len, BUFFER_SIZE)) enforce(chunk, 'Blob size mismatch') blob.write(chunk) blob_len -= len(chunk) + digest.update(chunk) blob.flush() - yield File(blob.name, meta=record) + yield File(blob.name, digest=digest.hexdigest(), meta=record) def __enter__(self): return self @@ -229,7 +239,9 @@ class _DecodeIterator(object): class _ZipStream(object): - def __init__(self, compresslevel=6): + def __init__(self, compresslevel=None): + if compresslevel is None: + compresslevel = DEFAULT_COMPRESSLEVEL self._zipper = zlib.compressobj(compresslevel, zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0) self._offset = 0 diff --git a/sugar_network/toolkit/ranges.py b/sugar_network/toolkit/ranges.py new file mode 100644 index 0000000..247944e --- /dev/null +++ b/sugar_network/toolkit/ranges.py @@ -0,0 +1,198 @@ +# Copyright (C) 2011-2014 Aleksey Lim +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Routines to treat lists of sorted and non-overlapping ranges. + +List items are [`start`, `stop'] ranges. If `start` or `stop` is `None`, +it means the beginning or ending of the entire list. + +""" +import sys +import collections + +from sugar_network.toolkit import enforce + + +def contains(r, value): + """Whether specified value included to one of ranges.""" + for start, end in r: + if value >= start and (end is None or value <= end): + return True + else: + return False + + +def stretch(r): + """Remove all holes between the first and the last ranges.""" + if r: + r[:] = [[r[0][0], r[-1][-1]]] + + +def include(r, start, end=None): + """Insert specified range. + + :param start: + either including range start or a list of + (`start`, `end`) pairs + :param end: + including range end + + """ + if issubclass(type(start), collections.Iterable): + for range_start, range_end in start: + _include(r, range_start, range_end) + elif start is not None: + _include(r, start, end) + + +def exclude(r, start, end=None): + """Remove specified range. + + :param start: + either excluding range start or a list of + (`start`, `end`) pairs + :param end: + excluding range end + + """ + if issubclass(type(start), collections.Iterable): + for range_start, range_end in start: + _exclude(r, range_start, range_end) + else: + _exclude(r, start, end) + + +def intersect(r1, r2): + """Return an intersection between two range sets.""" + result = [] + for start1, end1 in r1: + if end1 is None: + end1 = sys.maxint + for start2, end2 in r2: + if end2 is None: + end2 = sys.maxint + start = max(start1, start2) + end = min(end1, end2) + if start > end: + continue + if end == sys.maxint: + result.append([start, None]) + break + result.append([start, end]) + return result + + +def _include(r, range_start, range_end): + if range_start is None: + range_start = 1 + + range_start_new = None + range_start_i = 0 + + for range_start_i, (start, end) in enumerate(r): + if range_end is not None and start - 1 > range_end: + break + if (range_end is None or start - 1 <= range_end) and \ + (end is None or end + 1 >= range_start): + range_start_new = min(start, range_start) + break + else: + range_start_i += 1 + + if range_start_new is None: + r.insert(range_start_i, [range_start, range_end]) + return + + range_end_new = range_end + range_end_i = range_start_i + for i, (start, end) in enumerate(r[range_start_i:]): + if range_end is not None and start - 1 > range_end: + break + if range_end is None or end is None: + range_end_new = None + else: + range_end_new = max(end, range_end) + range_end_i = range_start_i + i + + del r[range_start_i:range_end_i] + r[range_start_i] = [range_start_new, range_end_new] + + +def _exclude(r, range_start, range_end): + enforce(range_start is not None or range_end is not None) + + if range_start is None: + for i, interval in enumerate(r): + start, end = interval + if range_end < start: + del r[:i] + return + if end is not None: + if range_end == end: + del r[:i + 1] + return + if range_end < end: + interval[0] = min(range_end + 1, end) + del r[:i] + return + if r and r[-1][1] is None: + r[:] = [[range_end + 1, None]] + else: + del r[:] + return + + if range_end is None: + for i, interval in enumerate(r): + start, end = interval + if end is None or range_start <= end: + if range_start <= start: + del r[i:] + else: + interval[1] = range_start - 1 + del r[i + 1:] + return + return + + enforce(range_start <= range_end and range_start > 0, + 'Start value %r is less than 0 or not less than %r', + range_start, range_end) + + for i, interval in enumerate(r): + start, end = interval + + if end is not None and end < range_start: + # Current `interval` is below new one + continue + + if range_end is not None and range_end < start: + # Current `interval` is above new one + continue + + if end is None or end > range_end: + # Current `interval` will exist after changing + r[i] = [range_end + 1, end] + if start < range_start: + r.insert(i, [start, range_start - 1]) + else: + if start < range_start: + r[i] = [start, range_start - 1] + else: + del r[i] + + if end is not None: + range_start = end + 1 + if range_start < range_end: + exclude(r, range_start, range_end) + break diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index 4206121..48a04fe 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -20,12 +20,13 @@ import time import types import logging import calendar -from base64 import b64decode +from base64 import b64decode, b64encode from bisect import bisect_left from urllib import urlencode +from Cookie import SimpleCookie from urlparse import parse_qsl, urlsplit from email.utils import parsedate, formatdate -from os.path import isfile +from os.path import isfile, basename, exists from sugar_network import toolkit from sugar_network.toolkit.coroutine import this @@ -357,11 +358,11 @@ class CaseInsensitiveDict(dict): def __setitem__(self, key, value): return self.set(key.lower(), value) - def __delitem__(self, key, value): + def __delitem__(self, key): self.remove(key.lower()) - def get(self, key): - return dict.get(self, key) + def get(self, key, default=None): + return dict.get(self, key, default) def set(self, key, value): dict.__setitem__(self, key, value) @@ -426,17 +427,21 @@ class File(CaseInsensitiveDict): pass def __init__(self, path, digest=None, meta=None): - CaseInsensitiveDict.__init__(self) + CaseInsensitiveDict.__init__(self, meta or []) self.path = path self.digest = File.Digest(digest) if digest else None - if meta is not None: - for key, value in meta.items() if isinstance(meta, dict) else meta: - self[key] = value self._stat = None @property + def exists(self): + return self.path and exists(self.path) + + @property def size(self): if self._stat is None: + if not self.exists: + size = self.get('content-length', 0) + return int(size) if size else 0 self._stat = os.stat(self.path) return self._stat.st_size @@ -453,9 +458,37 @@ class File(CaseInsensitiveDict): return self.get('location') or \ '%s/blobs/%s' % (this.request.static_prefix, self.digest) + @property + def name(self): + if self.path: + return basename(self.path) + def __repr__(self): return '<File %r>' % self.url + def iter_content(self): + if self.path: + return self._iter_content() + url = self.get('location') + enforce(url, http.NotFound, 'No location') + blob = this.http.request('GET', url, allow_redirects=True, + # Request for uncompressed data + headers={'accept-encoding': ''}) + self.clear() + for tag in ('content-length', 'content-type', 'content-disposition'): + value = blob.headers.get(tag) + if value: + self[tag] = value + return blob.iter_content(toolkit.BUFFER_SIZE) + + def _iter_content(self): + with file(self.path, 'rb') as f: + while True: + chunk = f.read(toolkit.BUFFER_SIZE) + if not chunk: + break + yield chunk + class Router(object): @@ -532,6 +565,8 @@ class Router(object): if response is None: response = Response() + this.request = request + this.response = response route_ = self._resolve_route(request) for arg, cast in route_.arguments.items(): @@ -592,6 +627,8 @@ class Router(object): content = None try: + this.cookie = _load_cookie(request, 'sugar_network_node') + if 'HTTP_ORIGIN' in request.environ: enforce(self._assert_origin(request.environ), http.Forbidden, 'Cross-site is not allowed for %r origin', @@ -655,10 +692,10 @@ class Router(object): content = json.dumps(content) if 'content-length' not in response: response.content_length = len(content) if content else 0 - if request.method == 'HEAD' and content is not None: _logger.warning('Content from HEAD response is ignored') content = None + _save_cookie(response, 'sugar_network_node', this.cookie) _logger.trace('%s call: request=%s response=%r content=%r', self, request.environ, response, repr(content)[:256]) @@ -845,6 +882,42 @@ def _parse_accept_language(value): return langs +def _load_cookie(request, name): + cookie_str = request.environ.get('HTTP_COOKIE') + if not cookie_str: + return _Cookie() + cookie = SimpleCookie() + cookie.load(cookie_str) + if name not in cookie: + return _Cookie() + raw_value = cookie.get(name).value + if raw_value == 'unset_%s' % name: + _logger.debug('Found unset %r cookie', name) + return _Cookie() + value = _Cookie(json.loads(b64decode(raw_value))) + value.loaded = True + _logger.debug('Found %r cookie value=%r', name, value) + return value + + +def _save_cookie(response, name, value, age=3600): + if value: + _logger.debug('Set %r cookie value=%r age=%s', name, value, age) + raw_value = b64encode(json.dumps(value)) + else: + if not value.loaded: + return + _logger.debug('Unset %r cookie') + raw_value = 'unset_%s' % name + cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, raw_value, age) + response.setdefault('set-cookie', []).append(cookie) + + +class _Cookie(dict): + + loaded = False + + class _Routes(dict): def __init__(self, parent=None): |