Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2014-03-06 15:33:04 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2014-03-07 04:58:00 (GMT)
commit90f74541ec4925bad47466e39517c22ff7eadfe4 (patch)
treef8fca9c302904981a46e275fcaa5a2305ea99f8d /sugar_network
parent1028755053ef3d8c538138b37e61ece13b9c1a23 (diff)
Keep data synchronization in db module; use blobs storage to keep standalone files
Diffstat (limited to 'sugar_network')
-rw-r--r--sugar_network/db/blobs.py314
-rw-r--r--sugar_network/db/directory.py96
-rw-r--r--sugar_network/db/index.py2
-rw-r--r--sugar_network/db/metadata.py43
-rw-r--r--sugar_network/db/resource.py18
-rw-r--r--sugar_network/db/routes.py23
-rw-r--r--sugar_network/db/storage.py10
-rw-r--r--sugar_network/db/volume.py91
-rw-r--r--sugar_network/model/__init__.py16
-rw-r--r--sugar_network/model/report.py2
-rw-r--r--sugar_network/model/routes.py5
-rw-r--r--sugar_network/toolkit/__init__.py235
-rw-r--r--sugar_network/toolkit/http.py6
-rw-r--r--sugar_network/toolkit/parcel.py52
-rw-r--r--sugar_network/toolkit/ranges.py198
-rw-r--r--sugar_network/toolkit/router.py93
16 files changed, 701 insertions, 503 deletions
diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py
index a9d66e0..cd795c6 100644
--- a/sugar_network/db/blobs.py
+++ b/sugar_network/db/blobs.py
@@ -16,137 +16,215 @@
import os
import logging
import hashlib
+import mimetypes
from contextlib import contextmanager
-from os.path import exists, abspath, join, isdir, isfile
+from os.path import exists, abspath, join, dirname
from sugar_network import toolkit
from sugar_network.toolkit.router import File
-from sugar_network.toolkit import http, enforce
+from sugar_network.toolkit import http, ranges, enforce
_META_SUFFIX = '.meta'
_logger = logging.getLogger('db.blobs')
-_root = None
-
-
-def init(path):
- global _root
- _root = abspath(path)
- if not exists(_root):
- os.makedirs(_root)
-
-
-def post(content, mime_type=None, digest_to_assert=None, meta=None):
- if meta is None:
- meta = []
- meta.append(('content-type', mime_type or 'application/octet-stream'))
- else:
- meta = meta.items()
- if mime_type:
- meta.append(('content-type', mime_type))
-
- @contextmanager
- def write_blob():
- tmp_path = join(_path(), 'post')
- if hasattr(content, 'read'):
- with toolkit.new_file(tmp_path) as blob:
- digest = hashlib.sha1()
- while True:
- chunk = content.read(toolkit.BUFFER_SIZE)
- if not chunk:
- break
- blob.write(chunk)
- digest.update(chunk)
- yield blob, digest.hexdigest()
- elif isinstance(content, dict):
- enforce('location' in content, http.BadRequest, 'No location')
- enforce('digest' in content, http.BadRequest, 'No digest')
- meta.append(('status', '301 Moved Permanently'))
- meta.append(('location', content['location']))
- with toolkit.new_file(tmp_path) as blob:
- yield blob, content['digest']
- else:
- with toolkit.new_file(tmp_path) as blob:
- blob.write(content)
- yield blob, hashlib.sha1(content).hexdigest()
-
- with write_blob() as (blob, digest):
- if digest_to_assert and digest != digest_to_assert:
- blob.unlink()
- raise http.BadRequest('Digest mismatch')
- path = _path(digest)
- meta.append(('content-length', str(blob.tell())))
- with toolkit.new_file(path + _META_SUFFIX) as f:
- for key, value in meta:
- f.write('%s: %s\n' % (key, value))
- blob.name = path
-
- return File(path, digest, meta)
-
-
-def update(digest, meta):
- path = _path(digest) + _META_SUFFIX
- enforce(exists(path), http.NotFound, 'No such blob')
- meta_content = ''
- for key, value in meta.items():
- meta_content += '%s: %s\n' % (key, value)
- with toolkit.new_file(path) as f:
- f.write(meta_content)
-
-def get(digest):
- path = _path(digest)
- if not exists(path) or not exists(path + _META_SUFFIX):
- return None
- meta = []
- with file(path + _META_SUFFIX) as f:
- for line in f:
- key, value = line.split(':', 1)
- meta.append((key, value.strip()))
- return File(path, digest, meta)
+class Blobs(object):
-def delete(digest):
- path = _path(digest)
- if exists(path + _META_SUFFIX):
- os.unlink(path + _META_SUFFIX)
- if exists(path):
- os.unlink(path)
+ def __init__(self, root, seqno):
+ self._root = abspath(root)
+ self._seqno = seqno
+ def path(self, *args):
+ if len(args) == 1 and len(args[0]) == 40 and '.' not in args[0]:
+ return self._blob_path(args[0])
+ else:
+ return join(self._root, 'files', *args)
-def diff(in_seq, out_seq=None):
- if out_seq is None:
- out_seq = toolkit.Sequence([])
- is_the_only_seq = not out_seq
-
- try:
- root = _path()
- for name in os.listdir(root):
- dirpath = join(root, name)
- if not isdir(dirpath) or os.stat(dirpath).st_ctime not in in_seq:
+ def post(self, content, mime_type=None, digest_to_assert=None, meta=None):
+ if meta is None:
+ meta = []
+ meta.append(('content-type',
+ mime_type or 'application/octet-stream'))
+ else:
+ meta = meta.items()
+ if mime_type:
+ meta.append(('content-type', mime_type))
+
+ @contextmanager
+ def write_blob():
+ tmp_path = join(self._blob_path(), 'post')
+ if hasattr(content, 'read'):
+ with toolkit.new_file(tmp_path) as blob:
+ digest = hashlib.sha1()
+ while True:
+ chunk = content.read(toolkit.BUFFER_SIZE)
+ if not chunk:
+ break
+ blob.write(chunk)
+ digest.update(chunk)
+ yield blob, digest.hexdigest()
+ elif isinstance(content, dict):
+ enforce('location' in content, http.BadRequest, 'No location')
+ enforce('digest' in content, http.BadRequest, 'No digest')
+ meta.append(('status', '301 Moved Permanently'))
+ meta.append(('location', content['location']))
+ with toolkit.new_file(tmp_path) as blob:
+ yield blob, content['digest']
+ else:
+ with toolkit.new_file(tmp_path) as blob:
+ blob.write(content)
+ yield blob, hashlib.sha1(content).hexdigest()
+
+ with write_blob() as (blob, digest):
+ if digest_to_assert and digest != digest_to_assert:
+ blob.unlink()
+ raise http.BadRequest('Digest mismatch')
+ path = self._blob_path(digest)
+ seqno = self._seqno.next()
+ meta.append(('content-length', str(blob.tell())))
+ meta.append(('x-seqno', str(seqno)))
+ _write_meta(path, meta, seqno)
+ blob.name = path
+ os.utime(path, (seqno, seqno))
+
+ _logger.debug('Post %r file', path)
+
+ return File(path, digest, meta)
+
+ def update(self, path, meta):
+ path = self.path(path)
+ enforce(exists(path), http.NotFound, 'No such blob')
+ orig_meta = _read_meta(path)
+ orig_meta.update(meta)
+ _write_meta(path, orig_meta, None)
+
+ def get(self, digest):
+ path = self.path(digest)
+ if exists(path + _META_SUFFIX):
+ return File(path, digest, _read_meta(path))
+
+ def delete(self, path):
+ self._delete(path, None)
+
+ def diff(self, r, path=None, recursive=True):
+ if path is None:
+ is_files = False
+ root = self._blob_path()
+ else:
+ path = path.strip('/').split('/')
+ enforce(not [i for i in path if i == '..'],
+ http.BadRequest, 'Relative paths are not allowed')
+ is_files = True
+ root = self.path(*path)
+ checkin_seqno = None
+
+ for root, __, files in os.walk(root):
+ if not ranges.contains(r, int(os.stat(root).st_mtime)):
continue
- for digest in os.listdir(dirpath):
- if len(digest) != 40:
- continue
- path = join(dirpath, digest)
- if not isfile(path):
+ rel_root = root[len(self._root) + 7:] if is_files else None
+ for filename in files:
+ path = join(root, filename)
+ if filename.endswith(_META_SUFFIX):
+ seqno = int(os.stat(path).st_mtime)
+ path = path[:-len(_META_SUFFIX)]
+ meta = None
+ if exists(path):
+ stat = os.stat(path)
+ if seqno != int(stat.st_mtime):
+ _logger.debug('Found updated %r file', path)
+ seqno = self._seqno.next()
+ meta = _read_meta(path)
+ meta['x-seqno'] = str(seqno)
+ meta['content-length'] = str(stat.st_size)
+ _write_meta(path, meta, seqno)
+ os.utime(path, (seqno, seqno))
+ if not ranges.contains(r, seqno):
+ continue
+ if meta is None:
+ meta = _read_meta(path)
+ if is_files:
+ digest = join(rel_root, filename[:-len(_META_SUFFIX)])
+ meta['path'] = digest
+ else:
+ digest = filename[:-len(_META_SUFFIX)]
+ elif not is_files or exists(path + _META_SUFFIX):
continue
- ctime = int(os.stat(path).st_ctime)
- if ctime not in in_seq:
- continue
- blob = get(digest)
- if blob is None:
- continue
- yield blob
- out_seq.include(ctime, ctime)
- if is_the_only_seq:
- # There is only one diff, so, we can stretch it to remove all holes
- out_seq.stretch()
- except StopIteration:
- pass
-
-
-def _path(digest=None):
- enforce(_root is not None, 'Blobs storage is not initialized')
- return join(_root, digest[:3], digest) if digest else _root
+ else:
+ _logger.debug('Found new %r file', path)
+ mime_type = mimetypes.guess_type(filename)[0] or \
+ 'application/octet-stream'
+ if checkin_seqno is None:
+ checkin_seqno = self._seqno.next()
+ seqno = checkin_seqno
+ meta = [('content-type', mime_type),
+ ('content-length', str(os.stat(path).st_size)),
+ ('x-seqno', str(seqno)),
+ ]
+ _write_meta(path, meta, seqno)
+ os.utime(path, (seqno, seqno))
+ if not ranges.contains(r, seqno):
+ continue
+ digest = join(rel_root, filename)
+ meta.append(('path', digest))
+ yield File(path, digest, meta)
+ if not recursive:
+ break
+
+ def patch(self, patch, seqno):
+ if 'path' in patch:
+ path = self.path(patch.pop('path'))
+ else:
+ path = self._blob_path(patch.digest)
+ if not patch.size:
+ self._delete(path, seqno)
+ return
+ if not exists(dirname(path)):
+ os.makedirs(dirname(path))
+ os.rename(patch.path, path)
+ if exists(path + _META_SUFFIX):
+ meta = _read_meta(path)
+ meta.update(patch)
+ else:
+ meta = patch
+ meta['x-seqno'] = str(seqno)
+ _write_meta(path, meta, seqno)
+ os.utime(path, (seqno, seqno))
+
+ def _delete(self, path, seqno):
+ path = self.path(path)
+ if exists(path + _META_SUFFIX):
+ if seqno is None:
+ seqno = self._seqno.next()
+ meta = _read_meta(path)
+ meta['status'] = '410 Gone'
+ meta['x-seqno'] = str(seqno)
+ _write_meta(path, meta, seqno)
+ if exists(path):
+ _logger.debug('Delete %r file', path)
+ os.unlink(path)
+
+ def _blob_path(self, digest=None):
+ if not digest:
+ return join(self._root, 'blobs')
+ return join(self._root, 'blobs', digest[:3], digest)
+
+
+def _write_meta(path, meta, seqno):
+ path += _META_SUFFIX
+ with toolkit.new_file(path) as f:
+ for key, value in meta.items() if isinstance(meta, dict) else meta:
+ if seqno is None and key == 'x-seqno':
+ seqno = int(value)
+ f.write('%s: %s\n' % (key, value))
+ os.utime(path, (seqno, seqno))
+
+
+def _read_meta(path):
+ meta = {}
+ with file(path + _META_SUFFIX) as f:
+ for line in f:
+ key, value = line.split(':', 1)
+ meta[key] = value.strip()
+ return meta
diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py
index c6957d7..3ef4b91 100644
--- a/sugar_network/db/directory.py
+++ b/sugar_network/db/directory.py
@@ -13,7 +13,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import shutil
import logging
from os.path import exists, join
@@ -33,7 +32,7 @@ _logger = logging.getLogger('db.directory')
class Directory(object):
- def __init__(self, root, resource_class, index_class, seqno=None):
+ def __init__(self, root, resource, index_class, seqno):
"""
:param index_class:
what class to use to access to indexes, for regular casses
@@ -41,19 +40,16 @@ class Directory(object):
keep writer in separate process).
"""
- if not exists(root):
- os.makedirs(root)
-
- if resource_class.metadata is None:
+ if resource.metadata is None:
# Metadata cannot be recreated
- resource_class.metadata = Metadata(resource_class)
- resource_class.metadata['guid'] = Guid()
- self.metadata = resource_class.metadata
+ resource.metadata = Metadata(resource)
+ resource.metadata['guid'] = Guid()
+ self.metadata = resource.metadata
- self.resource_class = resource_class
+ self.resource = resource
self._index_class = index_class
self._root = root
- self._seqno = _SessionSeqno() if seqno is None else seqno
+ self._seqno = seqno
self._storage = None
self._index = None
@@ -62,7 +58,10 @@ class Directory(object):
def wipe(self):
self.close()
_logger.debug('Wipe %r directory', self.metadata.name)
- shutil.rmtree(self._root, ignore_errors=True)
+ shutil.rmtree(join(self._root, 'index', self.metadata.name),
+ ignore_errors=True)
+ shutil.rmtree(join(self._root, 'db', self.metadata.name),
+ ignore_errors=True)
self._open()
def close(self):
@@ -129,7 +128,7 @@ class Directory(object):
enforce(cached_props or record.exists, http.NotFound,
'Resource %r does not exist in %r',
guid, self.metadata.name)
- return self.resource_class(guid, record, cached_props)
+ return self.resource(guid, record, cached_props)
def __getitem__(self, guid):
return self.get(guid)
@@ -141,7 +140,7 @@ class Directory(object):
for hit in mset:
guid = hit.document.get_value(0)
record = self._storage.get(guid)
- yield self.resource_class(guid, record)
+ yield self.resource(guid, record)
return iterate(), mset.get_matches_estimated()
@@ -186,74 +185,52 @@ class Directory(object):
self._save_layout()
self.commit()
- def diff(self, seq, exclude_seq=None, **params):
- if exclude_seq is not None:
- for start, end in exclude_seq:
- seq.exclude(start, end)
- if 'group_by' in params:
- # Pickup only most recent change
- params['order_by'] = '-seqno'
- else:
- params['order_by'] = 'seqno'
- params['no_cache'] = True
-
- for start, end in seq:
- query = 'seqno:%s..' % start
- if end:
- query += str(end)
- documents, __ = self.find(query=query, **params)
- for doc in documents:
- yield doc.guid, doc.diff(seq)
-
- def merge(self, guid, diff):
+ def patch(self, guid, patch, seqno=None):
"""Apply changes for documents."""
- doc = self.resource_class(guid, self._storage.get(guid))
+ doc = self.resource(guid, self._storage.get(guid))
- for prop, meta in diff.items():
+ for prop, meta in patch.items():
orig_meta = doc.meta(prop)
if orig_meta and orig_meta['mtime'] >= meta['mtime']:
continue
if doc.post_seqno is None:
- doc.post_seqno = self._seqno.next()
+ if seqno is None:
+ seqno = self._seqno.next()
+ doc.post_seqno = seqno
doc.post(prop, **meta)
- if doc.post_seqno is None:
- return None, False
-
- if doc.exists:
+ if doc.post_seqno is not None and doc.exists:
# No need in after-merge event, further commit event
# is enough to avoid increasing events flow
self._index.store(guid, doc.props, self._preindex)
- return doc.post_seqno, True
+ return seqno
def _open(self):
- if not exists(self._root):
- os.makedirs(self._root)
- index_path = join(self._root, 'index')
+ index_path = join(self._root, 'index', self.metadata.name)
if self._is_layout_stale():
if exists(index_path):
_logger.warning('%r layout is stale, remove index',
self.metadata.name)
shutil.rmtree(index_path, ignore_errors=True)
self._save_layout()
- self._storage = Storage(self._root, self.metadata)
self._index = self._index_class(index_path, self.metadata,
self._postcommit)
- _logger.debug('Open %r resource', self.resource_class)
+ self._storage = Storage(join(self._root, 'db', self.metadata.name))
+ _logger.debug('Open %r resource', self.resource)
def _broadcast(self, event):
event['resource'] = self.metadata.name
this.broadcast(event)
def _preindex(self, guid, changes):
- doc = self.resource_class(guid, self._storage.get(guid), changes)
+ doc = self.resource(guid, self._storage.get(guid), changes)
for prop in self.metadata:
enforce(doc[prop] is not None, 'Empty %r property', prop)
return doc.props
def _prestore(self, guid, changes, event):
- doc = self.resource_class(guid, self._storage.get(guid))
+ doc = self.resource(guid, self._storage.get(guid))
doc.post_seqno = self._seqno.next()
for prop in self.metadata.keys():
value = changes.get(prop)
@@ -272,31 +249,14 @@ class Directory(object):
self._broadcast({'event': 'commit', 'mtime': self._index.mtime})
def _save_layout(self):
- path = join(self._root, 'layout')
+ path = join(self._root, 'index', self.metadata.name, 'layout')
with toolkit.new_file(path) as f:
f.write(str(_LAYOUT_VERSION))
def _is_layout_stale(self):
- path = join(self._root, 'layout')
+ path = join(self._root, 'index', self.metadata.name, 'layout')
if not exists(path):
return True
with file(path) as f:
version = f.read()
return not version.isdigit() or int(version) != _LAYOUT_VERSION
-
-
-class _SessionSeqno(object):
-
- def __init__(self):
- self._value = 0
-
- @property
- def value(self):
- return self._value
-
- def next(self):
- self._value += 1
- return self._value
-
- def commit(self):
- pass
diff --git a/sugar_network/db/index.py b/sugar_network/db/index.py
index b44bdfb..eb8f0cb 100644
--- a/sugar_network/db/index.py
+++ b/sugar_network/db/index.py
@@ -123,7 +123,7 @@ class IndexReader(object):
raise NotImplementedError()
def find(self, offset=0, limit=None, query='', reply=('guid',),
- order_by=None, no_cache=False, group_by=None, **request):
+ order_by=None, group_by=None, **request):
"""Search resources within the index.
The result will be an array of dictionaries with found documents'
diff --git a/sugar_network/db/metadata.py b/sugar_network/db/metadata.py
index 9ba5998..ecefdab 100644
--- a/sugar_network/db/metadata.py
+++ b/sugar_network/db/metadata.py
@@ -16,7 +16,6 @@
import xapian
from sugar_network import toolkit
-from sugar_network.db import blobs
from sugar_network.toolkit.router import ACL, File
from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import i18n, http, enforce
@@ -304,49 +303,27 @@ class Blob(Property):
return value
enforce(value is None or isinstance(value, basestring) or
- isinstance(value, dict) and value or hasattr(value, 'read'),
- 'Inappropriate blob value')
+ hasattr(value, 'read'),
+ http.BadRequest, 'Inappropriate blob value')
if not value:
return ''
- if not isinstance(value, dict):
- mime_type = None
- if this.request.prop == self.name:
- mime_type = this.request.content_type
- if not mime_type:
- mime_type = self.mime_type
- return blobs.post(value, mime_type).digest
-
- digest = this.resource[self.name] if self.name else None
- if digest:
- orig = blobs.get(digest)
- enforce('digest' not in value or value.pop('digest') == digest,
- "Inappropriate 'digest' value")
- enforce(orig.path or 'location' in orig or 'location' in value,
- 'Blob points to nothing')
- if 'location' in value and orig.path:
- blobs.delete(digest)
- orig.update(value)
- value = orig
- else:
- enforce('location' in value, 'Blob points to nothing')
- enforce('digest' in value, "Missed 'digest' value")
- if 'content-type' not in value:
- value['content-type'] = self.mime_type
- digest = value.pop('digest')
-
- blobs.update(digest, value)
- return digest
+ mime_type = None
+ if this.request.prop == self.name:
+ mime_type = this.request.content_type
+ if not mime_type:
+ mime_type = self.mime_type
+ return this.volume.blobs.post(value, mime_type).digest
def reprcast(self, value):
if not value:
return File.AWAY
- return blobs.get(value)
+ return this.volume.blobs.get(value)
def teardown(self, value):
if value:
- blobs.delete(value)
+ this.volume.blobs.delete(value)
def assert_access(self, mode, value=None):
if mode == ACL.WRITE and not value:
diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py
index 2636dca..71a3efd 100644
--- a/sugar_network/db/resource.py
+++ b/sugar_network/db/resource.py
@@ -18,6 +18,7 @@ from sugar_network.db.metadata import Numeric, List, Authors
from sugar_network.db.metadata import Composite, Aggregated
from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit.router import ACL
+from sugar_network.toolkit import ranges
class Resource(object):
@@ -25,6 +26,8 @@ class Resource(object):
#: `Metadata` object that describes the document
metadata = None
+ #: Whether these resources should be migrated from slave-to-master only
+ one_way = False
def __init__(self, guid, record, cached_props=None):
self.props = cached_props or {}
@@ -118,7 +121,9 @@ class Resource(object):
if self.record is not None:
return self.record.get(prop)
- def diff(self, seq):
+ def diff(self, r):
+ patch = {}
+ last_seqno = None
for name, prop in self.metadata.items():
if name == 'seqno' or prop.acl & ACL.CALC:
continue
@@ -126,19 +131,20 @@ class Resource(object):
if meta is None:
continue
seqno = meta.get('seqno')
- if seqno not in seq:
+ if not ranges.contains(r, seqno):
continue
+ last_seqno = max(seqno, last_seqno)
value = meta.get('value')
if isinstance(prop, Aggregated):
value_ = {}
for key, agg in value.items():
- if agg.pop('seqno') in seq:
+ if ranges.contains(r, agg.pop('seqno')):
value_[key] = agg
value = value_
- meta = {'mtime': meta['mtime'], 'value': value}
- yield name, meta, seqno
+ patch[name] = {'mtime': meta['mtime'], 'value': value}
+ return last_seqno, patch
- def patch(self, props):
+ def format_patch(self, props):
if not props:
return {}
patch = {}
diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py
index d8d2fb4..153e0a7 100644
--- a/sugar_network/db/routes.py
+++ b/sugar_network/db/routes.py
@@ -19,10 +19,8 @@ import logging
from contextlib import contextmanager
from sugar_network import toolkit
-from sugar_network.db import blobs
from sugar_network.db.metadata import Aggregated
-from sugar_network.toolkit.router import ACL, File
-from sugar_network.toolkit.router import route, preroute, fallbackroute
+from sugar_network.toolkit.router import ACL, File, route, fallbackroute
from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, enforce
@@ -39,11 +37,6 @@ class Routes(object):
self._find_limit = find_limit
this.volume = self.volume
- @preroute
- def __preroute__(self, op, request, response):
- this.request = request
- this.response = response
-
@route('POST', [None], acl=ACL.AUTH, mime_type='application/json')
def create(self, request):
with self._post(request, ACL.CREATE) as doc:
@@ -91,14 +84,6 @@ class Routes(object):
self.volume[request.resource].update(doc.guid, doc.props)
self.after_post(doc)
- @route('GET', [None, None], cmd='diff', mime_type='application/json')
- def diff(self, request):
- result = {}
- res = self.volume[request.resource][request.guid]
- for prop, meta, __ in res.diff(toolkit.Sequence([[0, None]])):
- result[prop] = meta
- return result
-
@route('PUT', [None, None, None], acl=ACL.AUTH | ACL.AUTHOR)
def update_prop(self, request):
if request.content is None:
@@ -187,8 +172,8 @@ class Routes(object):
directory.update(request.guid, {'author': authors})
@fallbackroute('GET', ['blobs'])
- def blobs(self, request):
- return blobs.get(request.guid)
+ def blobs(self):
+ return this.volume.blobs.get(this.request.guid)
def on_create(self, request, props):
ts = int(time.time())
@@ -215,7 +200,7 @@ class Routes(object):
directory = self.volume[request.resource]
if access == ACL.CREATE:
- doc = directory.resource_class(None, None)
+ doc = directory.resource(None, None)
if 'guid' in content:
# TODO Temporal security hole, see TODO
guid = content['guid']
diff --git a/sugar_network/db/storage.py b/sugar_network/db/storage.py
index 72cbcf7..bbb50db 100644
--- a/sugar_network/db/storage.py
+++ b/sugar_network/db/storage.py
@@ -25,9 +25,8 @@ from sugar_network import toolkit
class Storage(object):
"""Get access to documents' data storage."""
- def __init__(self, root, metadata):
+ def __init__(self, root):
self._root = root
- self.metadata = metadata
def get(self, guid):
"""Get access to particular document's properties.
@@ -50,12 +49,7 @@ class Storage(object):
path = self._path(guid)
if not exists(path):
return
- try:
- shutil.rmtree(path)
- except Exception, error:
- toolkit.exception()
- raise RuntimeError('Cannot delete %r document from %r: %s' %
- (guid, self.metadata.name, error))
+ shutil.rmtree(path)
def walk(self, mtime):
"""Generator function to enumerate all existing documents.
diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py
index 6457b93..5ec5683 100644
--- a/sugar_network/db/volume.py
+++ b/sugar_network/db/volume.py
@@ -15,12 +15,14 @@
import os
import logging
+from copy import deepcopy
from os.path import exists, join, abspath
from sugar_network import toolkit
from sugar_network.db.directory import Directory
from sugar_network.db.index import IndexWriter
-from sugar_network.toolkit import http, coroutine, enforce
+from sugar_network.db.blobs import Blobs
+from sugar_network.toolkit import http, coroutine, ranges, enforce
_logger = logging.getLogger('db.volume')
@@ -44,8 +46,10 @@ class Volume(dict):
if not exists(root):
os.makedirs(root)
self._index_class = index_class
- self.seqno = toolkit.Seqno(join(self._root, 'db.seqno'))
- self.releases_seqno = toolkit.Seqno(join(self._root, 'releases.seqno'))
+ self.seqno = toolkit.Seqno(join(self._root, 'var', 'db.seqno'))
+ self.releases_seqno = toolkit.Seqno(
+ join(self._root, 'var', 'releases.seqno'))
+ self.blobs = Blobs(root, self.seqno)
for document in documents:
if isinstance(document, basestring):
@@ -72,6 +76,74 @@ class Volume(dict):
for __ in cls.populate():
coroutine.dispatch()
+ def diff(self, r, files=None, one_way=False):
+ last_seqno = None
+ try:
+ for resource, directory in self.items():
+ if one_way and directory.resource.one_way:
+ continue
+ directory.commit()
+ yield {'resource': resource}
+ for start, end in r:
+ query = 'seqno:%s..' % start
+ if end:
+ query += str(end)
+ docs, __ = directory.find(query=query, order_by='seqno')
+ for doc in docs:
+ seqno, patch = doc.diff(r)
+ if not patch:
+ continue
+ yield {'guid': doc.guid, 'patch': patch}
+ last_seqno = max(last_seqno, seqno)
+ for blob in self.blobs.diff(r):
+ seqno = int(blob.pop('x-seqno'))
+ yield blob
+ last_seqno = max(last_seqno, seqno)
+ for dirpath in files or []:
+ for blob in self.blobs.diff(r, dirpath):
+ seqno = int(blob.pop('x-seqno'))
+ yield blob
+ last_seqno = max(last_seqno, seqno)
+ except StopIteration:
+ pass
+
+ if last_seqno:
+ commit_r = deepcopy(r)
+ ranges.exclude(commit_r, last_seqno + 1, None)
+ ranges.exclude(r, None, last_seqno)
+ yield {'commit': commit_r}
+
+ def patch(self, records):
+ directory = None
+ commit_r = []
+ merged_r = []
+ seqno = None
+
+ for record in records:
+ resource_ = record.get('resource')
+ if resource_:
+ directory = self[resource_]
+ continue
+
+ if 'guid' in record:
+ seqno = directory.patch(record['guid'], record['patch'], seqno)
+ continue
+
+ if 'content-length' in record:
+ if seqno is None:
+ seqno = self.seqno.next()
+ self.blobs.patch(record, seqno)
+ continue
+
+ commit = record.get('commit')
+ if commit is not None:
+ ranges.include(commit_r, commit)
+ continue
+
+ if seqno is not None:
+ ranges.include(merged_r, seqno, seqno)
+ return commit_r, merged_r
+
def __enter__(self):
return self
@@ -79,8 +151,8 @@ class Volume(dict):
self.close()
def __getitem__(self, name):
- directory = self.get(name)
- if directory is None:
+ dir_ = self.get(name)
+ if dir_ is None:
enforce(name in self.resources, http.BadRequest,
'Unknown %r resource', name)
resource = self.resources[name]
@@ -89,11 +161,10 @@ class Volume(dict):
cls = getattr(mod, name.capitalize())
else:
cls = resource
- directory = Directory(join(self._root, name), cls,
- self._index_class, self.seqno)
- self._populators.spawn(self._populate, directory)
- self[name] = directory
- return directory
+ dir_ = Directory(self._root, cls, self._index_class, self.seqno)
+ self._populators.spawn(self._populate, dir_)
+ self[name] = dir_
+ return dir_
def _populate(self, directory):
for __ in directory.populate():
diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py
index 5b7a245..bd7405d 100644
--- a/sugar_network/model/__init__.py
+++ b/sugar_network/model/__init__.py
@@ -22,7 +22,6 @@ from os.path import join
import xapian
from sugar_network import toolkit, db
-from sugar_network.db import blobs
from sugar_network.model.routes import FrontRoutes
from sugar_network.toolkit.spec import parse_version, parse_requires
from sugar_network.toolkit.spec import EMPTY_LICENSE
@@ -81,7 +80,7 @@ class Release(object):
return release
if not isinstance(release, dict):
__, release = load_bundle(
- blobs.post(release, this.request.content_type),
+ this.volume.blobs.post(release, this.request.content_type),
context=this.request.guid)
return release['bundles']['*-*']['blob'], release
@@ -91,7 +90,7 @@ class Release(object):
'book' not in this.resource['type']:
return
for bundle in release['bundles'].values():
- blobs.delete(bundle['blob'])
+ this.volume.blobs.delete(bundle['blob'])
def encode(self, value):
return []
@@ -123,6 +122,7 @@ def populate_context_images(props, svg):
if 'guid' in props:
from sugar_network.toolkit.sugar import color_svg
svg = color_svg(svg, props['guid'])
+ blobs = this.volume.blobs
props['artifact_icon'] = blobs.post(svg, 'image/svg+xml').digest
props['icon'] = blobs.post(svg_to_png(svg, 55, 55), 'image/png').digest
props['logo'] = blobs.post(svg_to_png(svg, 140, 140), 'image/png').digest
@@ -212,10 +212,10 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None):
_logger.debug('Load %r release: %r', context, release)
if this.request.principal in context_doc['author']:
- diff = context_doc.patch(context_meta)
- if diff:
- this.call(method='PUT', path=['context', context], content=diff)
- context_doc.props.update(diff)
+ patch = context_doc.format_patch(context_meta)
+ if patch:
+ this.call(method='PUT', path=['context', context], content=patch)
+ context_doc.props.update(patch)
# TRANS: Release notes title
title = i18n._('%(name)s %(version)s release')
else:
@@ -237,7 +237,7 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None):
''.join(i18n.decode(context_doc['title']).split()),
version, mimetypes.guess_extension(blob.get('content-type')) or '',
)
- blobs.update(blob.digest, blob)
+ this.volume.blobs.update(blob.digest, blob)
return context, release
diff --git a/sugar_network/model/report.py b/sugar_network/model/report.py
index 980c3ff..be9fd9f 100644
--- a/sugar_network/model/report.py
+++ b/sugar_network/model/report.py
@@ -32,6 +32,8 @@ class _Solution(db.Property):
class Report(db.Resource):
+ one_way = True
+
@db.indexed_property(prefix='C', acl=ACL.CREATE | ACL.READ)
def context(self, value):
return value
diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py
index 35c56a9..af19023 100644
--- a/sugar_network/model/routes.py
+++ b/sugar_network/model/routes.py
@@ -15,7 +15,6 @@
import logging
-from sugar_network.db import blobs
from sugar_network.toolkit.router import route
from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import coroutine
@@ -60,8 +59,8 @@ class FrontRoutes(object):
return 'User-agent: *\nDisallow: /\n'
@route('GET', ['favicon.ico'])
- def favicon(self, request, response):
- return blobs.get('favicon.ico')
+ def favicon(self):
+ return this.volume.blobs.get('favicon.ico')
def _broadcast(self, event):
_logger.debug('Broadcast event: %r', event)
diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py
index 792267a..d3d9b88 100644
--- a/sugar_network/toolkit/__init__.py
+++ b/sugar_network/toolkit/__init__.py
@@ -21,6 +21,7 @@ import shutil
import logging
import tempfile
import collections
+from copy import deepcopy
from cStringIO import StringIO
from os.path import exists, join, islink, isdir, dirname, basename, abspath
from os.path import lexists, isfile
@@ -487,231 +488,71 @@ class NamedTemporaryFile(object):
return getattr(self._file, name)
-class Seqno(object):
- """Sequence number counter with persistent storing in a file."""
+class Bin(object):
+ """Store variable in a file."""
- def __init__(self, path):
- """
- :param path:
- path to file to [re]store seqno value
+ def __init__(self, path, default_value=None):
+ self._path = abspath(path)
+ self.value = default_value
+ self._orig_value = None
- """
- self._path = path
- self._value = 0
- if exists(path):
- with file(path) as f:
- self._value = int(f.read().strip())
- self._orig_value = self._value
+ if exists(self._path):
+ with file(self._path) as f:
+ self.value = json.load(f)
+ else:
+ self.commit()
+ self._orig_value = deepcopy(self.value)
@property
- def value(self):
- """Current seqno value."""
- return self._value
-
- def next(self):
- """Incerement seqno.
-
- :returns:
- new seqno value
-
- """
- self._value += 1
- return self._value
+ def mtime(self):
+ if exists(self._path):
+ return os.stat(self._path).st_mtime
+ else:
+ return 0
def commit(self):
- """Store current seqno value in a file.
+ """Store current value in a file.
:returns:
`True` if commit was happened
"""
- if self._value == self._orig_value:
+ if self.value == self._orig_value:
return False
with new_file(self._path) as f:
- f.write(str(self._value))
+ json.dump(self.value, f)
f.flush()
os.fsync(f.fileno())
- self._orig_value = self._value
+ self._orig_value = self.value
return True
+ def __enter__(self):
+ return self
-class Sequence(list):
- """List of sorted and non-overlapping ranges.
-
- List items are ranges, [`start`, `stop']. If `start` or `stop`
- is `None`, it means the beginning or ending of the entire sequence.
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.commit()
- """
- def __init__(self, value=None, empty_value=None):
- """
- :param value:
- default value to initialize range
- :param empty_value:
- if not `None`, the initial value for empty range
+class Seqno(Bin):
+ """Sequence number counter with persistent storing in a file."""
+ def __init__(self, path):
"""
- if empty_value is None:
- self._empty_value = []
- else:
- self._empty_value = [empty_value]
-
- if value:
- self.extend(value)
- else:
- self.clear()
-
- def __contains__(self, value):
- for start, end in self:
- if value >= start and (end is None or value <= end):
- return True
- else:
- return False
-
- @property
- def empty(self):
- """Is timeline in the initial state."""
- return self == self._empty_value
-
- def clear(self):
- """Reset range to the initial value."""
- self[:] = self._empty_value
-
- def stretch(self):
- """Remove all holes between the first and the last items."""
- if self:
- self[:] = [[self[0][0], self[-1][-1]]]
-
- def include(self, start, end=None):
- """Include specified range.
-
- :param start:
- either including range start or a list of
- (`start`, `end`) pairs
- :param end:
- including range end
+ :param path:
+ path to file to [re]store seqno value
"""
- if issubclass(type(start), collections.Iterable):
- for range_start, range_end in start:
- self._include(range_start, range_end)
- elif start is not None:
- self._include(start, end)
+ Bin.__init__(self, path, 0)
- def exclude(self, start, end=None):
- """Exclude specified range.
+ def next(self):
+ """Incerement seqno.
- :param start:
- either excluding range start or a list of
- (`start`, `end`) pairs
- :param end:
- excluding range end
+ :returns:
+ new seqno value
"""
- if issubclass(type(start), collections.Iterable):
- for range_start, range_end in start:
- self._exclude(range_start, range_end)
- else:
- enforce(end is not None)
- self._exclude(start, end)
-
- def _include(self, range_start, range_end):
- if range_start is None:
- range_start = 1
-
- range_start_new = None
- range_start_i = 0
-
- for range_start_i, (start, end) in enumerate(self):
- if range_end is not None and start - 1 > range_end:
- break
- if (range_end is None or start - 1 <= range_end) and \
- (end is None or end + 1 >= range_start):
- range_start_new = min(start, range_start)
- break
- else:
- range_start_i += 1
-
- if range_start_new is None:
- self.insert(range_start_i, [range_start, range_end])
- return
-
- range_end_new = range_end
- range_end_i = range_start_i
- for i, (start, end) in enumerate(self[range_start_i:]):
- if range_end is not None and start - 1 > range_end:
- break
- if range_end is None or end is None:
- range_end_new = None
- else:
- range_end_new = max(end, range_end)
- range_end_i = range_start_i + i
-
- del self[range_start_i:range_end_i]
- self[range_start_i] = [range_start_new, range_end_new]
-
- def _exclude(self, range_start, range_end):
- if range_start is None:
- range_start = 1
- enforce(range_end is not None)
- enforce(range_start <= range_end and range_start > 0,
- 'Start value %r is less than 0 or not less than %r',
- range_start, range_end)
-
- for i, interval in enumerate(self):
- start, end = interval
-
- if end is not None and end < range_start:
- # Current `interval` is below new one
- continue
-
- if range_end is not None and range_end < start:
- # Current `interval` is above new one
- continue
-
- if end is None or end > range_end:
- # Current `interval` will exist after changing
- self[i] = [range_end + 1, end]
- if start < range_start:
- self.insert(i, [start, range_start - 1])
- else:
- if start < range_start:
- self[i] = [start, range_start - 1]
- else:
- del self[i]
-
- if end is not None:
- range_start = end + 1
- if range_start < range_end:
- self.exclude(range_start, range_end)
- break
-
-
-class PersistentSequence(Sequence):
-
- def __init__(self, path, empty_value=None):
- Sequence.__init__(self, empty_value=empty_value)
- self._path = path
-
- if exists(self._path):
- with file(self._path) as f:
- self[:] = json.load(f)
-
- @property
- def mtime(self):
- if exists(self._path):
- return os.stat(self._path).st_mtime
- else:
- return 0
-
- def commit(self):
- dir_path = dirname(self._path)
- if dir_path and not exists(dir_path):
- os.makedirs(dir_path)
- with new_file(self._path) as f:
- json.dump(self, f)
- f.flush()
- os.fsync(f.fileno())
+ self.value += 1
+ return self.value
class Pool(object):
diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py
index 47f13bc..d280035 100644
--- a/sugar_network/toolkit/http.py
+++ b/sugar_network/toolkit/http.py
@@ -378,14 +378,16 @@ class SugarAuth(object):
from M2Crypto import RSA
from base64 import b64encode
+ key_dir = dirname(self._key_path)
if exists(self._key_path):
+ if os.stat(key_dir) & 077:
+ os.chmod(key_dir, 0700)
self._key = RSA.load_key(self._key_path)
return
- key_dir = dirname(self._key_path)
if not exists(key_dir):
os.makedirs(key_dir)
- os.chmod(key_dir, 0700)
+ os.chmod(key_dir, 0700)
_logger.info('Generate RSA private key at %r', self._key_path)
self._key = RSA.gen_key(1024, 65537, lambda *args: None)
diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py
index 457ea07..43e6960 100644
--- a/sugar_network/toolkit/parcel.py
+++ b/sugar_network/toolkit/parcel.py
@@ -19,15 +19,19 @@ import zlib
import time
import json
import struct
+import hashlib
import logging
from types import GeneratorType
from os.path import dirname, exists, join
from sugar_network import toolkit
from sugar_network.toolkit.router import File
+from sugar_network.toolkit.coroutine import this
from sugar_network.toolkit import http, coroutine, BUFFER_SIZE, enforce
+DEFAULT_COMPRESSLEVEL = 6
+
_FILENAME_SUFFIX = '.parcel'
_RESERVED_DISK_SPACE = 1024 * 1024
@@ -48,15 +52,19 @@ def decode(stream, limit=None):
packet.next()
if packet.name == 'last':
break
- packet.props.update(header)
+ packet.header.update(header)
yield packet
-def encode(packets, limit=None, header=None, compresslevel=6):
+def encode(packets, limit=None, header=None, compresslevel=None,
+ on_complete=None):
_logger.debug('Encode %r packets limit=%r header=%r',
packets, limit, header)
ostream = _ZipStream(compresslevel)
+ # In case of downloading blobs
+ # (?) reuse current `this.http`
+ this.http = http.Connection()
if limit is None:
limit = sys.maxint
@@ -87,12 +95,13 @@ def encode(packets, limit=None, header=None, compresslevel=6):
record = next(content)
continue
blob_len = 0
- if isinstance(record, File) and record.path:
+ if isinstance(record, File):
blob_len = record.size
chunk = ostream.write_record(record,
None if finalizing else limit - blob_len)
if chunk is None:
_logger.debug('Reach the encoding limit')
+ on_complete = None
if not isinstance(content, GeneratorType):
raise StopIteration()
finalizing = True
@@ -101,22 +110,21 @@ def encode(packets, limit=None, header=None, compresslevel=6):
if chunk:
yield chunk
if blob_len:
- with file(record.path, 'rb') as blob:
- while True:
- chunk = blob.read(BUFFER_SIZE)
- if not chunk:
- break
- blob_len -= len(chunk)
- if not blob_len:
- chunk += '\n'
- chunk = ostream.write(chunk)
- if chunk:
- yield chunk
+ for chunk in record.iter_content():
+ blob_len -= len(chunk)
+ if not blob_len:
+ chunk += '\n'
+ chunk = ostream.write(chunk)
+ if chunk:
+ yield chunk
enforce(blob_len == 0, EOFError, 'Blob size mismatch')
record = next(content)
except StopIteration:
pass
+ if on_complete is not None:
+ on_complete()
+
chunk = ostream.write_record({'packet': 'last'})
if chunk:
yield chunk
@@ -173,7 +181,7 @@ class _DecodeIterator(object):
def __init__(self, stream):
self._stream = stream
- self.props = {}
+ self.header = {}
self._name = None
self._shift = True
@@ -190,10 +198,10 @@ class _DecodeIterator(object):
self._shift = True
def __repr__(self):
- return '<Packet %r>' % self.props
+ return '<Packet %r>' % self.header
def __getitem__(self, key):
- return self.props.get(key)
+ return self.header.get(key)
def __iter__(self):
while True:
@@ -203,7 +211,7 @@ class _DecodeIterator(object):
raise EOFError()
if 'packet' in record:
self._name = record['packet'] or ''
- self.props = record
+ self.header = record
self._shift = False
break
blob_len = record.get('content-length')
@@ -212,13 +220,15 @@ class _DecodeIterator(object):
continue
blob_len = int(blob_len)
with toolkit.NamedTemporaryFile() as blob:
+ digest = hashlib.sha1()
while blob_len:
chunk = self._stream.read(min(blob_len, BUFFER_SIZE))
enforce(chunk, 'Blob size mismatch')
blob.write(chunk)
blob_len -= len(chunk)
+ digest.update(chunk)
blob.flush()
- yield File(blob.name, meta=record)
+ yield File(blob.name, digest=digest.hexdigest(), meta=record)
def __enter__(self):
return self
@@ -229,7 +239,9 @@ class _DecodeIterator(object):
class _ZipStream(object):
- def __init__(self, compresslevel=6):
+ def __init__(self, compresslevel=None):
+ if compresslevel is None:
+ compresslevel = DEFAULT_COMPRESSLEVEL
self._zipper = zlib.compressobj(compresslevel,
zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0)
self._offset = 0
diff --git a/sugar_network/toolkit/ranges.py b/sugar_network/toolkit/ranges.py
new file mode 100644
index 0000000..247944e
--- /dev/null
+++ b/sugar_network/toolkit/ranges.py
@@ -0,0 +1,198 @@
+# Copyright (C) 2011-2014 Aleksey Lim
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Routines to treat lists of sorted and non-overlapping ranges.
+
+List items are [`start`, `stop'] ranges. If `start` or `stop` is `None`,
+it means the beginning or ending of the entire list.
+
+"""
+import sys
+import collections
+
+from sugar_network.toolkit import enforce
+
+
+def contains(r, value):
+ """Whether specified value included to one of ranges."""
+ for start, end in r:
+ if value >= start and (end is None or value <= end):
+ return True
+ else:
+ return False
+
+
+def stretch(r):
+ """Remove all holes between the first and the last ranges."""
+ if r:
+ r[:] = [[r[0][0], r[-1][-1]]]
+
+
+def include(r, start, end=None):
+ """Insert specified range.
+
+ :param start:
+ either including range start or a list of
+ (`start`, `end`) pairs
+ :param end:
+ including range end
+
+ """
+ if issubclass(type(start), collections.Iterable):
+ for range_start, range_end in start:
+ _include(r, range_start, range_end)
+ elif start is not None:
+ _include(r, start, end)
+
+
+def exclude(r, start, end=None):
+ """Remove specified range.
+
+ :param start:
+ either excluding range start or a list of
+ (`start`, `end`) pairs
+ :param end:
+ excluding range end
+
+ """
+ if issubclass(type(start), collections.Iterable):
+ for range_start, range_end in start:
+ _exclude(r, range_start, range_end)
+ else:
+ _exclude(r, start, end)
+
+
+def intersect(r1, r2):
+ """Return an intersection between two range sets."""
+ result = []
+ for start1, end1 in r1:
+ if end1 is None:
+ end1 = sys.maxint
+ for start2, end2 in r2:
+ if end2 is None:
+ end2 = sys.maxint
+ start = max(start1, start2)
+ end = min(end1, end2)
+ if start > end:
+ continue
+ if end == sys.maxint:
+ result.append([start, None])
+ break
+ result.append([start, end])
+ return result
+
+
+def _include(r, range_start, range_end):
+ if range_start is None:
+ range_start = 1
+
+ range_start_new = None
+ range_start_i = 0
+
+ for range_start_i, (start, end) in enumerate(r):
+ if range_end is not None and start - 1 > range_end:
+ break
+ if (range_end is None or start - 1 <= range_end) and \
+ (end is None or end + 1 >= range_start):
+ range_start_new = min(start, range_start)
+ break
+ else:
+ range_start_i += 1
+
+ if range_start_new is None:
+ r.insert(range_start_i, [range_start, range_end])
+ return
+
+ range_end_new = range_end
+ range_end_i = range_start_i
+ for i, (start, end) in enumerate(r[range_start_i:]):
+ if range_end is not None and start - 1 > range_end:
+ break
+ if range_end is None or end is None:
+ range_end_new = None
+ else:
+ range_end_new = max(end, range_end)
+ range_end_i = range_start_i + i
+
+ del r[range_start_i:range_end_i]
+ r[range_start_i] = [range_start_new, range_end_new]
+
+
+def _exclude(r, range_start, range_end):
+ enforce(range_start is not None or range_end is not None)
+
+ if range_start is None:
+ for i, interval in enumerate(r):
+ start, end = interval
+ if range_end < start:
+ del r[:i]
+ return
+ if end is not None:
+ if range_end == end:
+ del r[:i + 1]
+ return
+ if range_end < end:
+ interval[0] = min(range_end + 1, end)
+ del r[:i]
+ return
+ if r and r[-1][1] is None:
+ r[:] = [[range_end + 1, None]]
+ else:
+ del r[:]
+ return
+
+ if range_end is None:
+ for i, interval in enumerate(r):
+ start, end = interval
+ if end is None or range_start <= end:
+ if range_start <= start:
+ del r[i:]
+ else:
+ interval[1] = range_start - 1
+ del r[i + 1:]
+ return
+ return
+
+ enforce(range_start <= range_end and range_start > 0,
+ 'Start value %r is less than 0 or not less than %r',
+ range_start, range_end)
+
+ for i, interval in enumerate(r):
+ start, end = interval
+
+ if end is not None and end < range_start:
+ # Current `interval` is below new one
+ continue
+
+ if range_end is not None and range_end < start:
+ # Current `interval` is above new one
+ continue
+
+ if end is None or end > range_end:
+ # Current `interval` will exist after changing
+ r[i] = [range_end + 1, end]
+ if start < range_start:
+ r.insert(i, [start, range_start - 1])
+ else:
+ if start < range_start:
+ r[i] = [start, range_start - 1]
+ else:
+ del r[i]
+
+ if end is not None:
+ range_start = end + 1
+ if range_start < range_end:
+ exclude(r, range_start, range_end)
+ break
diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py
index 4206121..48a04fe 100644
--- a/sugar_network/toolkit/router.py
+++ b/sugar_network/toolkit/router.py
@@ -20,12 +20,13 @@ import time
import types
import logging
import calendar
-from base64 import b64decode
+from base64 import b64decode, b64encode
from bisect import bisect_left
from urllib import urlencode
+from Cookie import SimpleCookie
from urlparse import parse_qsl, urlsplit
from email.utils import parsedate, formatdate
-from os.path import isfile
+from os.path import isfile, basename, exists
from sugar_network import toolkit
from sugar_network.toolkit.coroutine import this
@@ -357,11 +358,11 @@ class CaseInsensitiveDict(dict):
def __setitem__(self, key, value):
return self.set(key.lower(), value)
- def __delitem__(self, key, value):
+ def __delitem__(self, key):
self.remove(key.lower())
- def get(self, key):
- return dict.get(self, key)
+ def get(self, key, default=None):
+ return dict.get(self, key, default)
def set(self, key, value):
dict.__setitem__(self, key, value)
@@ -426,17 +427,21 @@ class File(CaseInsensitiveDict):
pass
def __init__(self, path, digest=None, meta=None):
- CaseInsensitiveDict.__init__(self)
+ CaseInsensitiveDict.__init__(self, meta or [])
self.path = path
self.digest = File.Digest(digest) if digest else None
- if meta is not None:
- for key, value in meta.items() if isinstance(meta, dict) else meta:
- self[key] = value
self._stat = None
@property
+ def exists(self):
+ return self.path and exists(self.path)
+
+ @property
def size(self):
if self._stat is None:
+ if not self.exists:
+ size = self.get('content-length', 0)
+ return int(size) if size else 0
self._stat = os.stat(self.path)
return self._stat.st_size
@@ -453,9 +458,37 @@ class File(CaseInsensitiveDict):
return self.get('location') or \
'%s/blobs/%s' % (this.request.static_prefix, self.digest)
+ @property
+ def name(self):
+ if self.path:
+ return basename(self.path)
+
def __repr__(self):
return '<File %r>' % self.url
+ def iter_content(self):
+ if self.path:
+ return self._iter_content()
+ url = self.get('location')
+ enforce(url, http.NotFound, 'No location')
+ blob = this.http.request('GET', url, allow_redirects=True,
+ # Request for uncompressed data
+ headers={'accept-encoding': ''})
+ self.clear()
+ for tag in ('content-length', 'content-type', 'content-disposition'):
+ value = blob.headers.get(tag)
+ if value:
+ self[tag] = value
+ return blob.iter_content(toolkit.BUFFER_SIZE)
+
+ def _iter_content(self):
+ with file(self.path, 'rb') as f:
+ while True:
+ chunk = f.read(toolkit.BUFFER_SIZE)
+ if not chunk:
+ break
+ yield chunk
+
class Router(object):
@@ -532,6 +565,8 @@ class Router(object):
if response is None:
response = Response()
+ this.request = request
+ this.response = response
route_ = self._resolve_route(request)
for arg, cast in route_.arguments.items():
@@ -592,6 +627,8 @@ class Router(object):
content = None
try:
+ this.cookie = _load_cookie(request, 'sugar_network_node')
+
if 'HTTP_ORIGIN' in request.environ:
enforce(self._assert_origin(request.environ), http.Forbidden,
'Cross-site is not allowed for %r origin',
@@ -655,10 +692,10 @@ class Router(object):
content = json.dumps(content)
if 'content-length' not in response:
response.content_length = len(content) if content else 0
-
if request.method == 'HEAD' and content is not None:
_logger.warning('Content from HEAD response is ignored')
content = None
+ _save_cookie(response, 'sugar_network_node', this.cookie)
_logger.trace('%s call: request=%s response=%r content=%r',
self, request.environ, response, repr(content)[:256])
@@ -845,6 +882,42 @@ def _parse_accept_language(value):
return langs
+def _load_cookie(request, name):
+ cookie_str = request.environ.get('HTTP_COOKIE')
+ if not cookie_str:
+ return _Cookie()
+ cookie = SimpleCookie()
+ cookie.load(cookie_str)
+ if name not in cookie:
+ return _Cookie()
+ raw_value = cookie.get(name).value
+ if raw_value == 'unset_%s' % name:
+ _logger.debug('Found unset %r cookie', name)
+ return _Cookie()
+ value = _Cookie(json.loads(b64decode(raw_value)))
+ value.loaded = True
+ _logger.debug('Found %r cookie value=%r', name, value)
+ return value
+
+
+def _save_cookie(response, name, value, age=3600):
+ if value:
+ _logger.debug('Set %r cookie value=%r age=%s', name, value, age)
+ raw_value = b64encode(json.dumps(value))
+ else:
+ if not value.loaded:
+ return
+ _logger.debug('Unset %r cookie')
+ raw_value = 'unset_%s' % name
+ cookie = '%s=%s; Max-Age=%s; HttpOnly' % (name, raw_value, age)
+ response.setdefault('set-cookie', []).append(cookie)
+
+
+class _Cookie(dict):
+
+ loaded = False
+
+
class _Routes(dict):
def __init__(self, parent=None):