diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2014-04-20 11:24:21 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2014-04-20 11:24:21 (GMT) |
commit | 046073b04229021ec53833a353ffd069d0a5b561 (patch) | |
tree | 1930c720a4391daeaf3e8540b2b027f9cd1ab97f | |
parent | 71391e654f497234fac0a4602bba769820aa521c (diff) |
Pull node updates for checked-in resources
30 files changed, 1268 insertions, 434 deletions
@@ -1,6 +1,5 @@ - proxying as a tool to sort out downstream content - push local offline changes to the node on getting online -- diff/merge while checking in node context - deliver spawn events only to local subscribers - test/run presolve - if node relocates api calls, do it only once in toolkit.http @@ -9,6 +8,8 @@ - cache init sync pull - switch auth from WWW-AUTHENTICATE to mutual authentication over the HTTPS - restrict ACL.LOCAL routes only to localhost clients +- prevent calling diff api cmd from clients to avoid disclosuring private props +- pull node changes periodically for checked-in contexts v2.0 ==== diff --git a/sugar-network-client b/sugar-network-client index 386a3b8..7e51aa2 100755 --- a/sugar-network-client +++ b/sugar-network-client @@ -28,7 +28,7 @@ coroutine.inject() import sugar_network_webui as webui from sugar_network import db, toolkit, client, node -from sugar_network.client.routes import CachedClientRoutes +from sugar_network.client.routes import ClientRoutes from sugar_network.client.injector import Injector from sugar_network.client.model import Volume from sugar_network.client.auth import BasicCreds, SugarCreds @@ -112,7 +112,7 @@ class Application(application.Daemon): creds = SugarCreds(client.keyfile.value) else: raise RuntimeError('No credentials specified') - routes = CachedClientRoutes(volume, creds) + routes = ClientRoutes(volume, creds) router = Router(routes, allow_spawn=True) logging.info('Listening for IPC requests on %s port', diff --git a/sugar_network/client/model.py b/sugar_network/client/model.py index 70c8f46..fd85a4d 100644 --- a/sugar_network/client/model.py +++ b/sugar_network/client/model.py @@ -29,7 +29,7 @@ _logger = logging.getLogger('client.model') class Context(_Context): - @db.indexed_property(db.List, prefix='RP', default=[], + @db.indexed_property(db.List, prefix='P', default=[], acl=ACL.READ | ACL.LOCAL) def pins(self, value): return value + this.injector.pins(self.guid) @@ -37,7 +37,9 @@ class Context(_Context): class Volume(db.Volume): - def __init__(self, root): - db.Volume.__init__(self, root, [User, Context, Post, Report]) - for resource in ('user', 'context', 'post'): - self[resource].metadata['author'].acl |= ACL.LOCAL + def __init__(self, root, resources=None): + if resources is None: + resources = [User, Context, Post, Report] + db.Volume.__init__(self, root, resources) + for directory in self.values(): + directory.metadata['author'].acl |= ACL.LOCAL diff --git a/sugar_network/client/routes.py b/sugar_network/client/routes.py index f580789..f618df3 100644 --- a/sugar_network/client/routes.py +++ b/sugar_network/client/routes.py @@ -18,42 +18,46 @@ import logging from httplib import IncompleteRead from os.path import join -from sugar_network import db, client, node, toolkit, model -from sugar_network.client import journal -from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit.router import Request, Router, File +from sugar_network import db, client, node, toolkit +from sugar_network.model import FrontRoutes +from sugar_network.client.journal import Routes as JournalRoutes +from sugar_network.toolkit.router import Request, Router, Response from sugar_network.toolkit.router import route, fallbackroute +from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import netlink, zeroconf, coroutine, http, parcel from sugar_network.toolkit import ranges, lsb_release, enforce -# Flag file to recognize a directory as a synchronization directory +_SYNC_TIMEOUT = 30 _RECONNECT_TIMEOUT = 3 _RECONNECT_TIMEOUT_MAX = 60 * 15 _logger = logging.getLogger('client.routes') -class ClientRoutes(model.FrontRoutes, journal.Routes): +class ClientRoutes(FrontRoutes, JournalRoutes): def __init__(self, home_volume, creds, no_subscription=False): - model.FrontRoutes.__init__(self) - journal.Routes.__init__(self) + FrontRoutes.__init__(self) + JournalRoutes.__init__(self) this.localcast = this.broadcast self._local = _LocalRoutes(home_volume) + self._remote = None + self._remote_urls = [] self._creds = creds self._inline = coroutine.Event() self._inline_job = coroutine.Pool() - self._remote_urls = [] - self._node = None self._connect_jobs = coroutine.Pool() + self._sync_jobs = coroutine.Pool() self._no_subscription = no_subscription + self._pull_r = toolkit.Bin( + join(home_volume.root, 'var', 'pull'), [[1, None]]) self._push_r = toolkit.Bin( - join(home_volume.root, 'var', 'push'), - [[1, None]]) - self._push_job = coroutine.Pool() + join(home_volume.root, 'var', 'push'), [[1, None]]) + self._push_guids_map = toolkit.Bin( + join(home_volume.root, 'var', 'push-guids'), {}) def connect(self, api=None): if self._connect_jobs: @@ -64,11 +68,13 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): else: self._remote_urls.append(api) self._connect_jobs.spawn(self._wait_for_connectivity) + self._local.volume.populate() def close(self): self._connect_jobs.kill() self._got_offline() self._local.volume.close() + self._pull_r.commit() @fallbackroute('GET', ['hub']) def hub(self): @@ -99,7 +105,7 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): @fallbackroute('GET', ['packages']) def route_packages(self): - if self._inline.is_set(): + if self.inline(): return self.fallback() else: # Let caller know that we are in offline and @@ -113,7 +119,7 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): @route('GET', cmd='whoami', mime_type='application/json') def whoami(self): - if self._inline.is_set(): + if self.inline(): result = self.fallback() result['route'] = 'proxy' else: @@ -121,47 +127,6 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): result['guid'] = self._creds.login return result - @route('GET', [None], - arguments={'offset': int, 'limit': int, 'reply': ('guid',)}, - mime_type='application/json') - def find(self): - request = this.request - if not self._inline.is_set() or 'pins' in request: - return self._local.call(request, this.response) - - reply = request.setdefault('reply', ['guid']) - if 'pins' not in reply: - return self.fallback() - - if 'guid' not in reply: - # Otherwise there is no way to mixin `pins` - reply.append('guid') - result = self.fallback() - - directory = self._local.volume[request.resource] - for item in result['result']: - doc = directory[item['guid']] - if doc.exists: - item['pins'] += doc.repr('pins') - - return result - - @route('GET', [None, None], mime_type='application/json') - def get(self): - request = this.request - if self._local.volume[request.resource][request.guid].exists: - return self._local.call(request, this.response) - else: - return self.fallback() - - @route('GET', [None, None, None], mime_type='application/json') - def get_prop(self): - request = this.request - if self._local.volume[request.resource][request.guid].exists: - return self._local.call(request, this.response) - else: - return self.fallback() - @route('POST', ['report'], cmd='submit', mime_type='text/event-stream') def submit_report(self): props = this.request.content @@ -208,6 +173,62 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): def recycle(self): return this.injector.recycle() + @route('GET', [None], + arguments={'offset': int, 'limit': int, 'reply': ['guid']}, + mime_type='application/json') + def find(self, reply): + request = this.request + if not self.inline() or 'pins' in request: + return self._local.call(request, this.response) + if 'guid' not in reply: + # Otherwise no way to mixin `pins` or sync checkins + reply.append('guid') + if 'mtime' not in reply: + # To track updates for checked-in resources + reply.append('mtime') + result = self.fallback() + directory = self._local.volume[request.resource] + for item in result['result']: + checkin = directory[item['guid']] + if not checkin.exists: + continue + pins = item['pins'] = checkin.repr('pins') + if pins and item['mtime'] > checkin['mtime']: + pull = Request(method='GET', + path=[checkin.metadata.name, checkin.guid], cmd='diff') + self._sync_jobs.spawn(self._pull_checkin, pull, None, 'range') + return result + + @route('GET', [None, None], mime_type='application/json') + def get(self): + request = this.request + if self._local.volume[request.resource][request.guid].exists: + return self._local.call(request, this.response) + else: + return self.fallback() + + @route('GET', [None, None, None], mime_type='application/json') + def get_prop(self): + return self.get() + + @route('PUT', [None, None]) + def update(self): + if not self.inline(): + return self.fallback() + request = this.request + local = self._local.volume[request.resource][request.guid] + if not local.exists or not local.repr('pins'): + return self.fallback() + self._pull_checkin(request, None, 'pull') + + @route('PUT', [None, None, None]) + def update_prop(self): + self.update() + + @route('DELETE', [None, None]) + def delete(self): + self.update() + @fallbackroute() def fallback(self, request=None, response=None, **kwargs): if request is None: @@ -215,18 +236,18 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): if response is None: response = this.response - if not self._inline.is_set(): + if not self.inline(): return self._local.call(request, response) try: - reply = self._node.call(request, response) - if hasattr(reply, 'read'): + result = self._remote.call(request, response) + if hasattr(result, 'read'): if response.relocations: - return reply + return result else: - return _ResponseStream(reply, self._restart_online) + return _ResponseStream(result, self._restart_online) else: - return reply + return result except (http.ConnectionError, IncompleteRead): if response.relocations: raise @@ -234,28 +255,30 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): return self._local.call(request, response) def _got_online(self, url): - enforce(not self._inline.is_set()) - _logger.debug('Got online on %r', self._node) + enforce(not self.inline()) + _logger.debug('Got online on %r', self._remote) self._inline.set() self._local.volume.mute = True this.injector.api = url this.localcast({'event': 'inline', 'state': 'online'}) - self._push_job.spawn(self._push) + if not self._local.volume.empty: + self._sync_jobs.spawn_later(_SYNC_TIMEOUT, self._sync) def _got_offline(self): - if self._node is not None: - self._node.close() - if self._inline.is_set(): - _logger.debug('Got offline on %r', self._node) + if self._remote is not None: + self._remote.close() + self._remote = None + if self.inline(): + _logger.debug('Got offline on %r', self._remote) self._inline.clear() self._local.volume.mute = False this.injector.api = None this.localcast({'event': 'inline', 'state': 'offline'}) - self._push_job.kill() + self._sync_jobs.kill() def _restart_online(self): _logger.debug('Lost %r connection, try to reconnect in %s seconds', - self._node, _RECONNECT_TIMEOUT) + self._remote, _RECONNECT_TIMEOUT) self._remote_connect(_RECONNECT_TIMEOUT) def _discover_node(self): @@ -275,19 +298,19 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): def _remote_connect(self, timeout=0): def pull_events(): - for event in self._node.subscribe(): + for event in self._remote.subscribe(): if event.get('event') == 'release': this.injector.seqno = event['seqno'] this.broadcast(event) def handshake(url): _logger.debug('Connecting to %r node', url) - self._node = client.Connection(url, creds=self._creds) - status = self._node.get(cmd='status') + self._remote = client.Connection(url, creds=self._creds) + status = self._remote.get(cmd='status') seqno = status.get('seqno') if seqno and 'releases' in seqno: this.injector.seqno = seqno['releases'] - if self._inline.is_set(): + if self.inline(): _logger.info('Reconnected to %r node', url) else: self._got_online(url) @@ -322,36 +345,63 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): self._inline_job.spawn_later(timeout, connect) def _checkin_context(self, pin=None): - context = this.volume['context'][this.request.guid] - if not context.exists: + contexts = self._local.volume['context'] + local_context = contexts[this.request.guid] + if not local_context.exists: enforce(self.inline(), http.ServiceUnavailable, 'Not available in offline') - _logger.debug('Checkin %r context', context.guid) - clone = self.fallback( - method='GET', path=['context', context.guid], cmd='clone') - seqno, __ = this.volume.patch(next(parcel.decode(clone))) - if seqno: - ranges.exclude(self._push_r.value, seqno, seqno) - pins = context['pins'] + _logger.debug('Checkin %r context', local_context.guid) + pull = Request(method='GET', + path=['context', local_context.guid], cmd='diff') + self._pull_checkin(pull, None, 'range') + pins = local_context['pins'] if pin and pin not in pins: - this.volume['context'].update(context.guid, {'pins': pins + [pin]}) + contexts.update(local_context.guid, {'pins': pins + [pin]}) def _checkout_context(self, pin=None): - directory = this.volume['context'] - context = directory[this.request.guid] - if not context.exists: + contexts = self._local.volume['context'] + local_context = contexts[this.request.guid] + if not local_context.exists: return - pins = set(context.repr('pins')) + pins = set(local_context.repr('pins')) if pin: pins -= set([pin]) - if not self._inline.is_set() or pins: + if not self.inline() or pins: if pin: - directory.update(context.guid, {'pins': list(pins)}) + contexts.update(local_context.guid, {'pins': list(pins)}) else: - directory.delete(context.guid) + contexts.delete(local_context.guid) + + def _pull_checkin(self, request, response, header_key): + request.headers[header_key] = self._pull_r.value + patch = self.fallback(request, response) + __, committed = self._local.volume.patch(next(parcel.decode(patch)), + shift_seqno=False) + ranges.exclude(self._pull_r.value, committed) + + def _sync(self): + _logger.info('Start pulling updates') + + for directory in self._local.volume.values(): + if directory.empty: + continue + request = Request(method='GET', + path=[directory.metadata.name], cmd='diff') + response = Response() + while True: + request.headers['range'] = self._pull_r.value + r, guids = self.fallback(request, response) + if not r: + break + for guid in guids: + checkin = Request(method='GET', + path=[request.resource, guid], cmd='diff') + self._pull_checkin(checkin, response, 'range') + ranges.exclude(self._pull_r.value, r) + self._pull_r.commit() + this.localcast({'event': 'sync', 'state': 'pull'}) - def _push(self): - return + """ resource = None metadata = None @@ -396,6 +446,7 @@ class ClientRoutes(model.FrontRoutes, journal.Routes): request.content_type = 'application/json' request.content = props self.fallback(request) + """ class _LocalRoutes(db.Routes, Router): diff --git a/sugar_network/db/blobs.py b/sugar_network/db/blobs.py index 54fd78a..ce5bb1b 100644 --- a/sugar_network/db/blobs.py +++ b/sugar_network/db/blobs.py @@ -150,7 +150,7 @@ class Blobs(object): if exists(path): stat = os.stat(path) if seqno != int(stat.st_mtime): - _logger.debug('Found updated %r file', path) + _logger.debug('Found updated %r blob', path) seqno = self._seqno.next() meta = _read_meta(path) meta['x-seqno'] = str(seqno) @@ -169,7 +169,7 @@ class Blobs(object): elif not is_files or exists(path + _META_SUFFIX): continue else: - _logger.debug('Found new %r file', path) + _logger.debug('Found new %r blob', path) mime_type = mimetypes.guess_type(filename)[0] or \ 'application/octet-stream' if checkin_seqno is None: diff --git a/sugar_network/db/directory.py b/sugar_network/db/directory.py index ecda920..17ff27d 100644 --- a/sugar_network/db/directory.py +++ b/sugar_network/db/directory.py @@ -56,6 +56,10 @@ class Directory(object): self._open() + @property + def empty(self): + return True if self._index is None else (self._index.mtime == 0) + def wipe(self): self.close() _logger.debug('Wipe %r directory', self.metadata.name) @@ -182,21 +186,32 @@ class Directory(object): self._save_layout() self.commit() + def diff(self, r): + for start, end in r: + query = 'seqno:%s..' % start + if end: + query += str(end) + docs, __ = self.find(query=query, order_by='seqno') + for doc in docs: + yield doc + def patch(self, guid, patch, seqno=None): """Apply changes for documents.""" doc = self.resource(guid, self._storage.get(guid)) + merged = False for prop, meta in patch.items(): orig_meta = doc.meta(prop) if orig_meta and orig_meta['mtime'] >= meta['mtime']: continue - if doc.post_seqno is None: - if seqno is None: + if doc.post_seqno is None and seqno is not False: + if not seqno: seqno = self._seqno.next() doc.post_seqno = seqno doc.post(prop, **meta) + merged = True - if doc.post_seqno is not None and doc.exists: + if merged and doc.exists: # No need in after-merge event, further commit event # is enough to avoid increasing events flow self._index.store(guid, doc.posts, self._preindex) @@ -234,6 +249,8 @@ class Directory(object): if not doc.post_seqno and not doc.metadata[prop].acl & ACL.LOCAL: doc.post_seqno = self._seqno.next() doc.post(prop, changes[prop]) + if not doc.exists: + return None for prop in self.metadata.keys(): enforce(doc[prop] is not None, 'Empty %r property', prop) return doc diff --git a/sugar_network/db/index.py b/sugar_network/db/index.py index 89ea6e8..0270dd4 100644 --- a/sugar_network/db/index.py +++ b/sugar_network/db/index.py @@ -70,14 +70,13 @@ class IndexReader(object): @property def mtime(self): """UNIX seconds of the last `commit()` call.""" - return int(os.stat(self._mtime_path).st_mtime) + if exists(self._mtime_path): + return int(os.stat(self._mtime_path).st_mtime) + else: + return 0 def ensure_open(self): - if not exists(self._mtime_path): - with file(self._mtime_path, 'w'): - pass - # Outter code should understand the initial state - os.utime(self._mtime_path, (0, 0)) + pass def get_cached(self, guid): """Return cached document. @@ -337,6 +336,8 @@ class IndexWriter(IndexReader): if pre_cb is not None: properties = pre_cb(guid, properties, *args) + if properties is None: + return _logger.debug('Index %r object: %r', self.metadata.name, properties) @@ -419,7 +420,11 @@ class IndexWriter(IndexReader): self._db.flush() checkpoint = time.time() - os.utime(self._mtime_path, (checkpoint, checkpoint)) + if exists(self._mtime_path): + os.utime(self._mtime_path, (checkpoint, checkpoint)) + else: + with file(self._mtime_path, 'w'): + pass self._pending_updates = 0 _logger.debug('Commit to %r took %s seconds', diff --git a/sugar_network/db/resource.py b/sugar_network/db/resource.py index 9af5086..2c2e46b 100644 --- a/sugar_network/db/resource.py +++ b/sugar_network/db/resource.py @@ -55,7 +55,7 @@ class Resource(object): self._post_seqno = value self.post('seqno', value) - @indexed_property(Numeric, slot=1000, prefix='RS', acl=0) + @indexed_property(Numeric, slot=1000, prefix='RS', acl=0, default=0) def seqno(self, value): return value @@ -85,7 +85,8 @@ class Resource(object): def status(self, value): return value - @indexed_property(List, prefix='RP', default=[], acl=ACL.READ) + @indexed_property(List, prefix='RP', default=[], + acl=ACL.READ | ACL.LOCAL) def pins(self, value): return value @@ -93,6 +94,10 @@ class Resource(object): def exists(self): return self.record is not None and self.record.consistent + @property + def available(self): + return self.exists and self['state'] != 'deleted' + def created(self): ts = int(time.time()) self.posts['ctime'] = ts @@ -160,7 +165,7 @@ class Resource(object): if self.record is not None: return self.record.get(prop) - def diff(self, r): + def diff(self, r, out_r=None): patch = {} for name, prop in self.metadata.items(): if name == 'seqno' or prop.acl & (ACL.CALC | ACL.LOCAL): @@ -171,6 +176,8 @@ class Resource(object): seqno = meta.get('seqno') if not ranges.contains(r, seqno): continue + if out_r is not None: + ranges.include(out_r, seqno, seqno) value = meta.get('value') if isinstance(prop, Aggregated): value_ = {} @@ -178,6 +185,8 @@ class Resource(object): agg_seqno = agg.pop('seqno') if ranges.contains(r, agg_seqno): value_[key] = agg + if out_r is not None: + ranges.include(out_r, agg_seqno, agg_seqno) value = value_ patch[name] = {'mtime': meta['mtime'], 'value': value} return patch @@ -204,7 +213,7 @@ class Resource(object): if prop.on_set is not None: value = prop.on_set(self, value) seqno = None - if not prop.acl & ACL.LOCAL: + if self.post_seqno and not prop.acl & ACL.LOCAL: seqno = meta['seqno'] = self.post_seqno if seqno and isinstance(prop, Aggregated): for agg in value.values(): diff --git a/sugar_network/db/routes.py b/sugar_network/db/routes.py index c74a93e..a1bb75e 100644 --- a/sugar_network/db/routes.py +++ b/sugar_network/db/routes.py @@ -13,18 +13,22 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +# pylint: disable-msg=W0611 + import re import logging from contextlib import contextmanager from sugar_network import toolkit from sugar_network.db.metadata import Aggregated -from sugar_network.toolkit.router import ACL, route, fallbackroute +from sugar_network.toolkit.router import ACL, File +from sugar_network.toolkit.router import route, postroute, fallbackroute from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import http, parcel, enforce +from sugar_network.toolkit import http, parcel, ranges, enforce _GUID_RE = re.compile('[a-zA-Z0-9_+-.]+$') +_GROUPED_DIFF_LIMIT = 1024 _logger = logging.getLogger('db.routes') @@ -35,6 +39,17 @@ class Routes(object): this.volume = self.volume = volume self._find_limit = find_limit + @postroute + def postroute(self, result, exception): + request = this.request + if not request.guid: + return result + pull = request.headers['pull'] + if pull is None: + return result + this.response.content_type = 'application/octet-stream' + return self._object_diff(pull) + @route('POST', [None], acl=ACL.AUTH, mime_type='application/json') def create(self): with self._post(ACL.CREATE) as doc: @@ -45,25 +60,6 @@ class Routes(object): self.volume[this.request.resource].create(doc.posts) return doc['guid'] - @route('GET', [None], - arguments={'offset': int, 'limit': int, 'reply': ('guid',)}, - mime_type='application/json') - def find(self, reply, limit): - self._preget() - request = this.request - if self._find_limit and limit > self._find_limit: - _logger.warning('The find limit is restricted to %s', - self._find_limit) - request['limit'] = self._find_limit - documents, total = self.volume[request.resource].find( - not_state='deleted', **request) - result = [self._postget(i, reply) for i in documents] - return {'total': total, 'result': result} - - @route('GET', [None, None], cmd='exists', mime_type='application/json') - def exists(self): - return self.volume[this.request.resource][this.request.guid].exists - @route('PUT', [None, None], acl=ACL.AUTH | ACL.AUTHOR) def update(self): with self._post(ACL.WRITE) as doc: @@ -88,11 +84,30 @@ class Routes(object): # to make master-slave synchronization possible directory = self.volume[this.request.resource] doc = directory[this.request.guid] - enforce(doc.exists, http.NotFound, 'Resource not found') + enforce(doc.available, http.NotFound, 'Resource not found') doc.posts['state'] = 'deleted' doc.updated() directory.update(doc.guid, doc.posts, 'delete') + @route('GET', [None], + arguments={'offset': int, 'limit': int, 'reply': ('guid',)}, + mime_type='application/json') + def find(self, reply, limit): + self._preget() + request = this.request + if self._find_limit and limit > self._find_limit: + _logger.warning('The find limit is restricted to %s', + self._find_limit) + request['limit'] = self._find_limit + documents, total = self.volume[request.resource].find( + not_state='deleted', **request) + result = [self._postget(i, reply) for i in documents] + return {'total': total, 'result': result} + + @route('GET', [None, None], cmd='exists', mime_type='application/json') + def exists(self): + return self.volume[this.request.resource][this.request.guid].available + @route('GET', [None, None], arguments={'reply': list}, mime_type='application/json') def get(self, reply): @@ -103,8 +118,7 @@ class Routes(object): reply.append(prop.name) self._preget() doc = self.volume[this.request.resource].get(this.request.guid) - enforce(doc.exists and doc['state'] != 'deleted', http.NotFound, - 'Resource not found') + enforce(doc.available, http.NotFound, 'Resource not found') return self._postget(doc, reply) @route('GET', [None, None, None], mime_type='application/json') @@ -166,15 +180,66 @@ class Routes(object): del authors[user] directory.update(request.guid, {'author': authors}) - @route('GET', [None, None], cmd='clone') - def clone(self): - clone = self.volume.clone(this.request.resource, this.request.guid) - return parcel.encode([('push', None, clone)]) + @route('GET', [None], cmd='diff', mime_type='application/json') + def grouped_diff(self, key): + if not key: + key = 'guid' + in_r = this.request.headers['range'] or [[1, None]] + out_r = [] + diff = set() + + for doc in self.volume[this.request.resource].diff(in_r): + diff.add(doc.guid) + if len(diff) > _GROUPED_DIFF_LIMIT: + break + ranges.include(out_r, doc['seqno'], doc['seqno']) + doc.diff(in_r, out_r) + + return out_r, list(diff) + + @route('GET', [None, None], cmd='diff') + def object_diff(self): + return self._object_diff(this.request.headers['range']) @fallbackroute('GET', ['blobs']) def blobs(self): return self.volume.blobs.get(this.request.guid) + def _object_diff(self, in_r): + request = this.request + doc = self.volume[request.resource][request.guid] + enforce(doc.exists, http.NotFound, 'Resource not found') + + out_r = [] + if in_r is None: + in_r = [[1, None]] + patch = doc.diff(in_r, out_r) + if not patch: + return parcel.encode([(None, None, [])], compresslevel=0) + + diff = [{'resource': request.resource}, + {'guid': request.guid, 'patch': patch}, + ] + + def add_blob(blob): + if not isinstance(blob, File): + return + seqno = int(blob.meta['x-seqno']) + ranges.include(out_r, seqno, seqno) + diff.append(blob) + + for prop, meta in patch.items(): + prop = doc.metadata[prop] + value = prop.reprcast(meta['value']) + if isinstance(prop, Aggregated): + for __, aggvalue in value: + add_blob(aggvalue) + else: + add_blob(value) + diff.append({'commit': out_r}) + + return parcel.encode([(None, None, diff)], compresslevel=0) + @contextmanager def _post(self, access): content = this.request.content @@ -197,7 +262,7 @@ class Routes(object): doc.posts[name] = prop.default else: doc = self.volume[this.request.resource][this.request.guid] - enforce(doc.exists, 'Resource not found') + enforce(doc.available, 'Resource not found') this.resource = doc def teardown(new, old): @@ -244,7 +309,7 @@ class Routes(object): def _useradd(self, authors, user, role): props = {} user_doc = self.volume['user'][user] - if user_doc.exists: + if user_doc.available: props['name'] = user_doc['name'] role |= ACL.INSYSTEM else: diff --git a/sugar_network/db/volume.py b/sugar_network/db/volume.py index 295fc02..382176c 100644 --- a/sugar_network/db/volume.py +++ b/sugar_network/db/volume.py @@ -19,7 +19,6 @@ from copy import deepcopy from os.path import exists, join, abspath from sugar_network import toolkit -from sugar_network.db.metadata import Blob from sugar_network.db.directory import Directory from sugar_network.db.index import IndexWriter from sugar_network.db.blobs import Blobs @@ -64,6 +63,13 @@ class Volume(dict): def root(self): return self._root + @property + def empty(self): + for directory in self.values(): + if not directory.empty: + return False + return True + def close(self): """Close operations with the server.""" _logger.info('Closing documents in %r', self._root) @@ -90,19 +96,13 @@ class Volume(dict): for resource, directory in self.items(): if one_way and directory.resource.one_way: continue - directory.commit() yield {'resource': resource} - for start, end in r: - query = 'seqno:%s..' % start - if end: - query += str(end) - docs, __ = directory.find(query=query, order_by='seqno') - for doc in docs: - patch = doc.diff(include) - if patch: - yield {'guid': doc.guid, 'patch': patch} - found = True - last_seqno = max(last_seqno, doc['seqno']) + for doc in directory.diff(r): + patch = doc.diff(include) + if patch: + yield {'guid': doc.guid, 'patch': patch} + found = True + last_seqno = max(last_seqno, doc['seqno']) if blobs: for blob in self.blobs.diff(include): seqno = int(blob.meta.pop('x-seqno')) @@ -124,27 +124,16 @@ class Volume(dict): ranges.exclude(r, None, last_seqno) yield {'commit': commit_r} - def clone(self, resource, guid): - doc = self[resource][guid] - patch = doc.diff([[1, None]]) - if not patch: - return - for name, prop in self[resource].metadata.items(): - if isinstance(prop, Blob) and name in patch: - yield self.blobs.get(patch[name]['value']) - yield {'resource': resource} - yield {'guid': guid, 'patch': patch} - - def patch(self, records): + def patch(self, records, shift_seqno=True): directory = None committed = [] - seqno = None + seqno = None if shift_seqno else False for record in records: if isinstance(record, File): if seqno is None: seqno = self.seqno.next() - self.blobs.patch(record, seqno) + self.blobs.patch(record, seqno or 0) continue resource = record.get('resource') if resource: diff --git a/sugar_network/model/__init__.py b/sugar_network/model/__init__.py index c6b3321..3f6aef1 100644 --- a/sugar_network/model/__init__.py +++ b/sugar_network/model/__init__.py @@ -199,7 +199,7 @@ def load_bundle(blob, context=None, initial=False, extra_deps=None): this.call(method='POST', path=['context'], content=context_meta, principal=principal) else: - enforce(doc.exists, http.NotFound, 'No context') + enforce(doc.available, http.NotFound, 'No context') enforce(context_type in doc['type'], http.BadRequest, 'Inappropriate bundle type') diff --git a/sugar_network/model/context.py b/sugar_network/model/context.py index 78df790..9153552 100644 --- a/sugar_network/model/context.py +++ b/sugar_network/model/context.py @@ -21,10 +21,6 @@ from sugar_network.toolkit import svg_to_png class Context(db.Resource): - @db.indexed_property(db.List, prefix='P', default=[]) - def pins(self, value): - return value - @db.indexed_property(db.List, prefix='T', subtype=db.Enum(model.CONTEXT_TYPES)) def type(self, value): diff --git a/sugar_network/model/routes.py b/sugar_network/model/routes.py index eda26dc..8012853 100644 --- a/sugar_network/model/routes.py +++ b/sugar_network/model/routes.py @@ -17,7 +17,7 @@ import logging from sugar_network.toolkit.router import route from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import coroutine +from sugar_network.toolkit import coroutine, http _logger = logging.getLogger('model.routes') @@ -30,9 +30,9 @@ class FrontRoutes(object): this.broadcast = self._broadcast this.localcast = self._broadcast - @route('GET', mime_type='text/html') + @route('GET') def hello(self): - return _HELLO_HTML + raise http.Redirect('http://wiki.sugarlabs.org/go/Sugar_Network/API') @route('OPTIONS') def options(self): @@ -86,7 +86,7 @@ class FrontRoutes(object): @route('GET', ['favicon.ico']) def favicon(self): - return this.volume.blobs.get('favicon.ico') + return this.volume.blobs.get('assets/favicon.ico') def _broadcast(self, event): _logger.debug('Broadcast event: %r', event) @@ -97,10 +97,3 @@ class FrontRoutes(object): coroutine.select([rfile.fileno()], [], []) finally: self._spooler.notify_all(rfile) - - -_HELLO_HTML = """\ -<h2>Welcome to Sugar Network API!</h2> -Visit the <a href="http://wiki.sugarlabs.org/go/Sugar_Network/API"> -Sugar Labs Wiki</a> to learn how it can be used. -""" diff --git a/sugar_network/node/auth.py b/sugar_network/node/auth.py index 27b334c..00054f5 100644 --- a/sugar_network/node/auth.py +++ b/sugar_network/node/auth.py @@ -17,7 +17,7 @@ import time import hashlib import logging from ConfigParser import ConfigParser -from os.path import join, dirname, exists, expanduser, abspath +from os.path import join, exists from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import pylru, http, enforce @@ -89,7 +89,7 @@ class SugarAuth(object): signature = creds['signature'] nonce = int(creds['nonce']) user = this.volume['user'][login] - enforce(user.exists, Unauthorized, 'Principal does not exist') + enforce(user.available, Unauthorized, 'Principal does not exist') key = RSA.load_pub_key_bio(BIO.MemoryBuffer(str(user['pubkey']))) data = hashlib.sha1('%s:%s' % (login, nonce)).digest() enforce(key.verify(data, signature.decode('hex')), diff --git a/sugar_network/node/model.py b/sugar_network/node/model.py index b1cb401..144dab0 100644 --- a/sugar_network/node/model.py +++ b/sugar_network/node/model.py @@ -181,7 +181,7 @@ def solve(volume, top_context, command=None, lsb_id=None, lsb_release=None, if context in context_clauses: return context_clauses[context] context = volume['context'][context] - enforce(context.exists, http.NotFound, 'Context not found') + enforce(context.available, http.NotFound, 'Context not found') releases = context['releases'] clause = [] diff --git a/sugar_network/node/routes.py b/sugar_network/node/routes.py index 4457b2f..ac8a840 100644 --- a/sugar_network/node/routes.py +++ b/sugar_network/node/routes.py @@ -13,15 +13,16 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +# pylint: disable-msg=W0611 + import logging from os.path import join from sugar_network import db from sugar_network.model import FrontRoutes, load_bundle from sugar_network.node import model -# pylint: disable-msg=W0611 -from sugar_network.toolkit.router import route, postroute, ACL, File -from sugar_network.toolkit.router import Request, fallbackroute, preroute +from sugar_network.toolkit.router import ACL, File +from sugar_network.toolkit.router import route, fallbackroute, preroute from sugar_network.toolkit.spec import parse_requires, parse_version from sugar_network.toolkit.bundle import Bundle from sugar_network.toolkit.coroutine import this @@ -83,7 +84,7 @@ class NodeRoutes(db.Routes, FrontRoutes): return {'guid': self.guid, 'seqno': { 'db': self.volume.seqno.value, - 'releases': self.volume.releases_seqno.value, + 'releases': self.volume.release_seqno.value, }, } diff --git a/sugar_network/toolkit/__init__.py b/sugar_network/toolkit/__init__.py index 675c25f..7585e29 100644 --- a/sugar_network/toolkit/__init__.py +++ b/sugar_network/toolkit/__init__.py @@ -464,6 +464,35 @@ class NamedTemporaryFile(object): return getattr(self._file, name) +class Variable(list): + + def __init__(self, default=None): + list.__init__(self, [default]) + + @property + def value(self): + return self[0] + + @value.setter + def value(self, value): + self[0] = value + + def __contains__(self, key): + return key in self[0] + + def __getitem__(self, key): + return self[0].get(key) + + def __setitem__(self, key, value): + self[0][key] = value + + def __delitem__(self, key): + del self[0][key] + + def __getattr__(self, name): + return getattr(self[0], name) + + class Bin(object): """Store variable in a file.""" @@ -471,10 +500,7 @@ class Bin(object): self._path = abspath(path) self.value = default_value - if exists(self._path): - with file(self._path) as f: - self.value = json.load(f) - else: + if not self.reset(): self.commit() @property @@ -491,6 +517,13 @@ class Bin(object): f.flush() os.fsync(f.fileno()) + def reset(self): + if not exists(self._path): + return False + with file(self._path) as f: + self.value = json.load(f) + return True + def __enter__(self): return self.value @@ -535,6 +568,30 @@ class Seqno(Bin): return self.value +class CaseInsensitiveDict(dict): + + def __contains__(self, key): + return dict.__contains__(self, key.lower()) + + def __getitem__(self, key): + return self.get(key.lower()) + + def __setitem__(self, key, value): + return self.set(key.lower(), value) + + def __delitem__(self, key): + self.remove(key.lower()) + + def get(self, key, default=None): + return dict.get(self, key, default) + + def set(self, key, value): + dict.__setitem__(self, key, value) + + def remove(self, key): + dict.__delitem__(self, key) + + class Pool(object): """Stack that keeps its iterators correct after changing content.""" diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 0ebee86..0cbd535 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -267,6 +267,7 @@ class Connection(object): value = request.environ.get(env_key) if value is not None: headers[key] = value + headers.update(request.headers) path = request.path while True: diff --git a/sugar_network/toolkit/parcel.py b/sugar_network/toolkit/parcel.py index 9d583cd..edbbf02 100644 --- a/sugar_network/toolkit/parcel.py +++ b/sugar_network/toolkit/parcel.py @@ -46,7 +46,14 @@ _logger = logging.getLogger('parcel') def decode(stream, limit=None): _logger.debug('Decode %r stream limit=%r', stream, limit) - stream = _UnzipStream(stream, limit) + if limit is not None: + limit -= 2 + magic = stream.read(2) + enforce(len(magic) == 2, http.BadRequest, 'Malformed parcel') + if magic == '\037\213': + stream = _ZippedDecoder(stream, limit) + else: + stream = _Decoder(magic, stream, limit) header = stream.read_record() packet = _DecodeIterator(stream) @@ -63,7 +70,11 @@ def encode(packets, limit=None, header=None, compresslevel=None, _logger.debug('Encode %r packets limit=%r header=%r', packets, limit, header) - ostream = _ZipStream(compresslevel) + if compresslevel is 0: + ostream = _Encoder() + else: + ostream = _ZippedEncoder(compresslevel) + # In case of downloading blobs # (?) reuse current `this.http` this.http = http.Connection() @@ -242,16 +253,10 @@ class _DecodeIterator(object): pass -class _ZipStream(object): +class _Encoder(object): - def __init__(self, compresslevel=None): - if compresslevel is None: - compresslevel = DEFAULT_COMPRESSLEVEL - self._zipper = zlib.compressobj(compresslevel, - zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0) + def __init__(self): self._offset = 0 - self._size = 0 - self._crc = zlib.crc32('') & 0xffffffffL def write_record(self, record, limit=None): chunk = json.dumps(record) + '\n' @@ -260,49 +265,58 @@ class _ZipStream(object): return self.write(chunk) def write(self, chunk): + chunk = self._encode(chunk) + if chunk: + self._offset += len(chunk) + return chunk + + def flush(self): + chunk = self._flush() + self._offset += len(chunk) + return chunk + + def _encode(self, chunk): + return chunk + + def _flush(self): + return '' + + +class _ZippedEncoder(_Encoder): + + def __init__(self, compresslevel=None): + _Encoder.__init__(self) + if compresslevel is None: + compresslevel = DEFAULT_COMPRESSLEVEL + self._zipper = zlib.compressobj(compresslevel, + zlib.DEFLATED, -_ZLIB_WBITS, zlib.DEF_MEM_LEVEL, 0) + self._size = 0 + self._crc = zlib.crc32('') & 0xffffffffL + + def _encode(self, chunk): self._size += len(chunk) self._crc = zlib.crc32(chunk, self._crc) & 0xffffffffL chunk = self._zipper.compress(chunk) - if self._offset == 0: chunk = '\037\213' + '\010' + chr(0) + \ struct.pack('<L', long(time.time())) + \ '\002' + '\377' + \ chunk self._offset = _ZLIB_WBITS_SIZE - if chunk: - self._offset += len(chunk) - return chunk - def flush(self): - chunk = self._zipper.flush() + \ + def _flush(self): + return self._zipper.flush() + \ struct.pack('<L', self._crc) + \ struct.pack('<L', self._size & 0xffffffffL) - self._offset += len(chunk) - return chunk -class _UnzipStream(object): +class _Decoder(object): - def __init__(self, stream, limit): + def __init__(self, prefix, stream, limit): + self._buffer = prefix self._stream = stream self._limit = limit - self._unzipper = zlib.decompressobj(-_ZLIB_WBITS) - self._crc = zlib.crc32('') & 0xffffffffL - self._size = 0 - self._buffer = '' - - if self._limit is not None: - self._limit -= 10 - magic = stream.read(2) - enforce(magic == '\037\213', http.BadRequest, - 'Not a gzipped file') - enforce(ord(stream.read(1)) == 8, http.BadRequest, - 'Unknown compression method') - enforce(ord(stream.read(1)) == 0, http.BadRequest, - 'Gzip flags should be empty') - stream.read(6) # Ignore the rest of header def read_record(self): while True: @@ -328,20 +342,41 @@ class _UnzipStream(object): if self._limit is not None: size = min(size, self._limit) chunk = self._stream.read(size) + if chunk and self._limit is not None: + self._limit -= len(chunk) + return self._decode(chunk) + + def _decode(self, chunk): + self._buffer += chunk + return bool(self._buffer) + +class _ZippedDecoder(_Decoder): + + def __init__(self, stream, limit): + _Decoder.__init__(self, '', stream, limit) + self._unzipper = zlib.decompressobj(-_ZLIB_WBITS) + self._crc = zlib.crc32('') & 0xffffffffL + self._size = 0 + + if self._limit is not None: + self._limit -= 8 + enforce(ord(stream.read(1)) == 8, http.BadRequest, + 'Unknown compression method') + enforce(ord(stream.read(1)) == 0, http.BadRequest, + 'Gzip flags should be empty') + stream.read(6) # Ignore the rest of header + + def _decode(self, chunk): if chunk: - if self._limit is not None: - self._limit -= len(chunk) self._add_to_buffer(self._unzipper.decompress(chunk)) return True - enforce(len(self._unzipper.unused_data) >= 8, http.BadRequest, 'Malformed gzipped file') crc = struct.unpack('<I', self._unzipper.unused_data[:4])[0] enforce(crc == self._crc, http.BadRequest, 'CRC check failed') size = struct.unpack('<I', self._unzipper.unused_data[4:8])[0] enforce(size == self._size, http.BadRequest, 'Incorrect length') - return self._add_to_buffer(self._unzipper.flush()) def _add_to_buffer(self, chunk): diff --git a/sugar_network/toolkit/router.py b/sugar_network/toolkit/router.py index e9e91fd..f4b23ce 100644 --- a/sugar_network/toolkit/router.py +++ b/sugar_network/toolkit/router.py @@ -140,6 +140,7 @@ class Request(dict): else: dict.__setitem__(self, key, value) self.environ = environ + self.headers = _RequestHeaders(self.environ) if method: self.environ['REQUEST_METHOD'] = method @@ -312,35 +313,15 @@ class Request(dict): (self.method, self.path, self.cmd, dict(self)) -class CaseInsensitiveDict(dict): - - def __contains__(self, key): - return dict.__contains__(self, key.lower()) - - def __getitem__(self, key): - return self.get(key.lower()) - - def __setitem__(self, key, value): - return self.set(key.lower(), value) - - def __delitem__(self, key): - self.remove(key.lower()) - - def get(self, key, default=None): - return dict.get(self, key, default) - - def set(self, key, value): - dict.__setitem__(self, key, value) - - def remove(self, key): - dict.__delitem__(self, key) - - -class Response(CaseInsensitiveDict): +class Response(toolkit.CaseInsensitiveDict): status = '200 OK' relocations = 0 + def __init__(self): + toolkit.CaseInsensitiveDict.__init__(self) + self.headers = _ResponseHeaders(self) + @property def content_length(self): return int(self.get('content-length') or '0') @@ -392,7 +373,7 @@ class File(str): pass def __new__(cls, path=None, digest=None, meta=None): - meta = CaseInsensitiveDict(meta or []) + meta = toolkit.CaseInsensitiveDict(meta or []) url = '' if meta: @@ -568,7 +549,7 @@ class Router(object): raise finally: for i in self._postroutes: - i(result, exception) + result = i(result, exception) return result @@ -915,4 +896,39 @@ class _Route(object): return '%s /%s (%s)' % (self.method, path, self.callback.__name__) +class _RequestHeaders(dict): + + def __init__(self, environ): + dict.__init__(self) + self._environ = environ + + def __contains__(self, key): + return 'HTTP_X_%s' % key.upper() in self._environ + + def __getitem__(self, key): + value = self._environ.get('HTTP_X_%s' % key.upper()) + if value is not None: + return json.loads(value) + + def __setitem__(self, key, value): + dict.__setitem__(self, 'x-%s' % key, json.dumps(value)) + + +class _ResponseHeaders(object): + + def __init__(self, headers): + self._headers = headers + + def __contains__(self, key): + return 'x-%s' % key.lower() in self._headers + + def __getitem__(self, key): + value = self._headers.get('x-%s' % key.lower()) + if value is not None: + return json.loads(value) + + def __setitem__(self, key, value): + self._headers.set('x-%s' % key.lower(), json.dumps(value)) + + File.AWAY = File(None) diff --git a/tests/__init__.py b/tests/__init__.py index e1c3222..32bc3ea 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -24,10 +24,12 @@ from sugar_network.toolkit import http, mountpoints, Option, gbus, i18n, languag from sugar_network.toolkit.router import Router, Request, Response from sugar_network.toolkit.coroutine import this from sugar_network.client import IPCConnection, journal, routes as client_routes, model as client_model +from sugar_network.client.model import Volume as LocalVolume from sugar_network.client.injector import Injector from sugar_network.client.routes import ClientRoutes from sugar_network.client.auth import SugarCreds from sugar_network import db, client, node, toolkit, model +from sugar_network.db import routes as db_routes from sugar_network.model.user import User from sugar_network.model.context import Context from sugar_network.node.model import Context as MasterContext @@ -103,6 +105,8 @@ class Test(unittest.TestCase): client.cache_lifetime.value = 0 client.keyfile.value = join(root, 'data', UID) client_routes._RECONNECT_TIMEOUT = 0 + client_routes._SYNC_TIMEOUT = 30 + db_routes._GROUPED_DIFF_LIMIT = 1024 journal._ds_root = tmpdir + '/datastore' mountpoints._connects.clear() mountpoints._found.clear() @@ -141,6 +145,7 @@ class Test(unittest.TestCase): this.volume = None this.call = None this.broadcast = lambda x: x + this.localcast = lambda x: x this.injector = None this.principal = None @@ -287,12 +292,14 @@ class Test(unittest.TestCase): this.call = self.node_router.call return self.node_volume - def fork_master(self, classes=None, routes=MasterRoutes): + def fork_master(self, classes=None, routes=MasterRoutes, cb=None): if classes is None: classes = master.RESOURCES def node(): volume = NodeVolume('master', classes) + if cb is not None: + cb(volume) node = coroutine.WSGIServer(('127.0.0.1', 7777), Router(routes(volume=volume, auth=SugarAuth('master')))) node.serve_forever() @@ -314,10 +321,7 @@ class Test(unittest.TestCase): def start_online_client(self, classes=None): self.fork_master(classes) this.injector = Injector('client/cache') - if classes: - home_volume = db.Volume('client', classes) - else: - home_volume = client_model.Volume('client') + home_volume = LocalVolume('client', classes) self.client_routes = ClientRoutes(home_volume, SugarCreds(client.keyfile.value)) self.client_routes.connect(client.api.value) self.wait_for_events(self.client_routes, event='inline', state='online').wait() diff --git a/tests/units/client/injector.py b/tests/units/client/injector.py index 7170758..b266cda 100755 --- a/tests/units/client/injector.py +++ b/tests/units/client/injector.py @@ -14,6 +14,7 @@ from __init__ import tests from sugar_network import db, client from sugar_network.client import Connection, keyfile, api, packagekit, injector as injector_, model from sugar_network.client.injector import _PreemptivePool, Injector +from sugar_network.client.model import Volume as LocalVolume from sugar_network.client.auth import SugarCreds from sugar_network.toolkit.coroutine import this from sugar_network.toolkit import http, lsb_release @@ -349,7 +350,8 @@ class InjectorTest(tests.Test): assert not exists('releases/2') def test_solve(self): - self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -382,7 +384,8 @@ class InjectorTest(tests.Test): self.assertEqual([client.api.value, 'stable', 0, solution], json.load(file('client/solutions/context'))) def test_solve_FailInOffline(self): - self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = None @@ -403,7 +406,8 @@ class InjectorTest(tests.Test): self.assertRaises(http.ServiceUnavailable, injector._solve, 'context', 'stable') def test_solve_ReuseCachedSolution(self): - volume = self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -420,13 +424,14 @@ class InjectorTest(tests.Test): ]))), cmd='submit', initial=True) assert 'context' in injector._solve('context', 'stable') - volume['context'].delete('context') + conn.delete(['context', 'context']) assert 'context' in injector._solve('context', 'stable') os.unlink('client/solutions/context') self.assertRaises(http.NotFound, injector._solve, 'context', 'stable') def test_solve_InvalidateCachedSolution(self): - volume = self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = 'http://127.0.0.1:7777' @@ -492,7 +497,8 @@ class InjectorTest(tests.Test): self.assertEqual(['http://localhost:7777', 'stable', 2], json.load(file('client/solutions/context'))[:-1]) def test_solve_ForceUsingStaleCachedSolutionInOffline(self): - volume = self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -519,7 +525,8 @@ class InjectorTest(tests.Test): self.assertRaises(http.ServiceUnavailable, injector._solve, 'context', 'stable') def test_download_SetExecPermissions(self): - volume = self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -552,7 +559,8 @@ class InjectorTest(tests.Test): assert not os.access(path + 'test/file2', os.X_OK) def test_checkin(self): - self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -603,7 +611,8 @@ class InjectorTest(tests.Test): [i for i in injector.checkin('context')]) def test_checkin_PreemptivePool(self): - self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -660,7 +669,8 @@ class InjectorTest(tests.Test): self.assertEqual([], this.volume['context']['context']['pins']) def test_checkin_Refresh(self): - volume = self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -694,7 +704,8 @@ class InjectorTest(tests.Test): assert exists('client/releases/%s' % release2) def test_launch(self): - self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -742,7 +753,8 @@ class InjectorTest(tests.Test): [i for i in injector.launch('context', activity_id='activity_id')]) def test_launch_PreemptivePool(self): - self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -783,7 +795,8 @@ class InjectorTest(tests.Test): self.assertEqual(len(activity_info), injector._pool._du) def test_launch_DonntAcquireCheckins(self): - volume = self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector('client') injector.api = client.api.value @@ -810,7 +823,8 @@ class InjectorTest(tests.Test): assert injector._pool._du == 0 def test_launch_RefreshCheckins(self): - self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector(tests.tmpdir + '/client') injector.api = client.api.value @@ -862,7 +876,26 @@ class InjectorTest(tests.Test): self.assertEqual('2', file('client/releases/%s/output' % release2).read()) def test_launch_InstallDeps(self): - volume = self.start_master() + + def master_cb(volume): + distro = '%s-%s' % (lsb_release.distributor_id(), lsb_release.release()) + volume['context'].create({ + 'guid': 'package1', 'type': ['package'], 'title': {}, 'summary': {}, 'description': {}, 'releases': { + 'resolves': { + distro: {'version': [[1], 0], 'packages': ['pkg1', 'pkg2']}, + }, + }, + }) + volume['context'].create({ + 'guid': 'package2', 'type': ['package'], 'title': {}, 'summary': {}, 'description': {}, 'releases': { + 'resolves': { + distro: {'version': [[1], 0], 'packages': ['pkg3', 'pkg4']}, + }, + }, + }) + + self.fork_master(cb=master_cb) + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector(tests.tmpdir + '/client') injector.api = client.api.value @@ -878,21 +911,6 @@ class InjectorTest(tests.Test): 'license = Public Domain', 'requires = package1; package2', ]))), cmd='submit', initial=True) - distro = '%s-%s' % (lsb_release.distributor_id(), lsb_release.release()) - volume['context'].create({ - 'guid': 'package1', 'type': ['package'], 'title': {}, 'summary': {}, 'description': {}, 'releases': { - 'resolves': { - distro: {'version': [[1], 0], 'packages': ['pkg1', 'pkg2']}, - }, - }, - }) - volume['context'].create({ - 'guid': 'package2', 'type': ['package'], 'title': {}, 'summary': {}, 'description': {}, 'releases': { - 'resolves': { - distro: {'version': [[1], 0], 'packages': ['pkg3', 'pkg4']}, - }, - }, - }) packages = [] self.override(packagekit, 'install', lambda names: packages.extend(names)) @@ -902,14 +920,15 @@ class InjectorTest(tests.Test): self.assertEqual(['pkg1', 'pkg2', 'pkg3', 'pkg4'], sorted(packages)) def test_launch_Document(self): - volume = self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector(tests.tmpdir + '/client') injector.api = client.api.value injector.seqno = 1 - volume['context'].create({'guid': 'book', 'type': ['book'], 'title': {}, 'summary': {}, 'description': {}}) - book = conn.upload(['context'], 'book', cmd='submit', context='book', version='1', license='Public Domain') + book_context = conn.post(['context'], {'type': ['book'], 'title': {}, 'summary': {}, 'description': {}}) + book = conn.upload(['context'], 'book', cmd='submit', context=book_context, version='1', license='Public Domain') app = conn.upload(['context'], self.zips( ('topdir/activity/activity.info', '\n'.join([ @@ -929,21 +948,22 @@ class InjectorTest(tests.Test): self.assertEqual( {'event': 'launch', 'state': 'exit'}, - [i for i in injector.launch('book', activity_id='activity_id', app='app')][-1]) + [i for i in injector.launch(book_context, activity_id='activity_id', app='app')][-1]) self.assertEqual( '-b app -a activity_id -u %s/client/releases/%s' % (tests.tmpdir, book), file('client/releases/%s/output' % app).read()) def test_launch_DocumentWithDetectingAppByMIMEType(self): - volume = self.start_master() + self.fork_master() + this.volume = LocalVolume('client') conn = Connection(creds=SugarCreds(keyfile.value)) injector = Injector(tests.tmpdir + '/client') injector.api = client.api.value injector.seqno = 1 - volume['context'].create({'guid': 'book', 'type': ['book'], 'title': {}, 'summary': {}, 'description': {}}) - book = conn.upload(['context'], 'book', cmd='submit', context='book', version='1', license='Public Domain') + book_context = conn.post(['context'], {'type': ['book'], 'title': {}, 'summary': {}, 'description': {}}) + book = conn.upload(['context'], 'book', cmd='submit', context=book_context, version='1', license='Public Domain') app = conn.upload(['context'], self.zips( ('topdir/activity/activity.info', '\n'.join([ @@ -964,7 +984,7 @@ class InjectorTest(tests.Test): self.override(injector_, '_app_by_mimetype', lambda mime_type: 'app') self.assertEqual( {'event': 'launch', 'state': 'exit'}, - [i for i in injector.launch('book', activity_id='activity_id')][-1]) + [i for i in injector.launch(book_context, activity_id='activity_id')][-1]) self.assertEqual( '-b app -a activity_id -u %s/client/releases/%s' % (tests.tmpdir, book), diff --git a/tests/units/client/routes.py b/tests/units/client/routes.py index 6072571..9145b42 100755 --- a/tests/units/client/routes.py +++ b/tests/units/client/routes.py @@ -12,7 +12,7 @@ from os.path import exists from __init__ import tests from sugar_network import db, client, toolkit -from sugar_network.client import journal, IPCConnection, cache_limit, cache_lifetime, api, injector, routes +from sugar_network.client import journal, Connection, IPCConnection, cache_limit, cache_lifetime, api, injector, routes from sugar_network.client.model import Volume from sugar_network.client.injector import Injector from sugar_network.client.routes import ClientRoutes @@ -81,25 +81,19 @@ class RoutesTest(tests.Test): ]), ], header={'to': '127.0.0.1:7777', 'from': 'slave'})), params={'cmd': 'push'}) - self.assertEqual([ - {'guid': '1'}, - {'guid': '2'}, - ], - ipc.get(['context'], query='йцу')['result']) - self.assertEqual([ - {'guid': '1'}, - {'guid': '2'}, - ], - ipc.get(['context'], query='qwe')['result']) + self.assertEqual( + sorted(['1', '2']), + sorted([i['guid'] for i in ipc.get(['context'], query='йцу')['result']])) + self.assertEqual( + sorted(['1', '2']), + sorted([i['guid'] for i in ipc.get(['context'], query='qwe')['result']])) - self.assertEqual([ - {'guid': '2'}, - ], - ipc.get(['context'], query='йцукен')['result']) - self.assertEqual([ - {'guid': '2'}, - ], - ipc.get(['context'], query='qwerty')['result']) + self.assertEqual( + sorted(['2']), + sorted([i['guid'] for i in ipc.get(['context'], query='йцукен')['result']])) + self.assertEqual( + sorted(['2']), + sorted([i['guid'] for i in ipc.get(['context'], query='qwerty')['result']])) def test_LanguagesFallbackInRequests(self): self.start_online_client() @@ -340,14 +334,12 @@ class RoutesTest(tests.Test): self.assertEqual( blob, ipc.request('GET', ['context', guid, 'logo']).content) - self.assertEqual({ - 'logo': 'http://127.0.0.1:7777/blobs/%s' % digest, - }, - ipc.get(['context', guid], reply=['logo'])) - self.assertEqual([{ - 'logo': 'http://127.0.0.1:7777/blobs/%s' % digest, - }], - ipc.get(['context'], reply=['logo'])['result']) + self.assertEqual( + 'http://127.0.0.1:7777/blobs/%s' % digest, + ipc.get(['context', guid], reply=['logo'])['logo']) + self.assertEqual( + ['http://127.0.0.1:7777/blobs/%s' % digest], + [i['logo'] for i in ipc.get(['context'], reply=['logo'])['result']]) def test_OnlinePins(self): home_volume = self.start_online_client() @@ -388,13 +380,9 @@ class RoutesTest(tests.Test): 'description': 'description', }) - self.assertEqual(sorted([ - {'guid': guid1, 'title': '1', 'pins': []}, - {'guid': guid2, 'title': '2', 'pins': []}, - {'guid': guid3, 'title': '3', 'pins': []}, - {'guid': guid4, 'title': '4', 'pins': []}, - ]), - sorted(ipc.get(['context'], reply=['guid', 'title', 'pins'])['result'])) + self.assertEqual( + sorted([(guid1, []), (guid2, []), (guid3, []), (guid4, [])]), + sorted([(i['guid'], i['pins']) for i in ipc.get(['context'], reply=['pins'])['result']])) self.assertEqual([ ], ipc.get(['context'], reply=['guid', 'title'], pins='favorite')['result']) @@ -420,34 +408,25 @@ class RoutesTest(tests.Test): home_volume['context'].update(guid2, {'title': {i18n.default_lang(): '2_'}}) home_volume['context'].update(guid3, {'title': {i18n.default_lang(): '3_'}}) - self.assertEqual(sorted([ - {'guid': guid1, 'title': '1', 'pins': ['favorite']}, - {'guid': guid2, 'title': '2', 'pins': ['checkin', 'favorite']}, - {'guid': guid3, 'title': '3', 'pins': ['checkin']}, - {'guid': guid4, 'title': '4', 'pins': []}, - ]), - sorted(ipc.get(['context'], reply=['guid', 'title', 'pins'])['result'])) + self.assertEqual( + sorted([(guid1, ['favorite']), (guid2, ['checkin', 'favorite']), (guid3, ['checkin']), (guid4, [])]), + sorted([(i['guid'], i['pins']) for i in ipc.get(['context'], reply=['pins'])['result']])) self.assertEqual([ {'guid': guid1, 'title': '1_'}, {'guid': guid2, 'title': '2_'}, ], ipc.get(['context'], reply=['guid', 'title'], pins='favorite')['result']) - self.assertEqual([ - {'guid': guid2, 'title': '2_'}, - {'guid': guid3, 'title': '3_'}, - ], - ipc.get(['context'], reply=['guid', 'title'], pins='checkin')['result']) + + self.assertEqual( + sorted([(guid2, '2_'), (guid3, '3_')]), + sorted([(i['guid'], i['title']) for i in ipc.get(['context'], reply=['guid', 'title'], pins='checkin')['result']])) ipc.delete(['context', guid1], cmd='favorite') ipc.delete(['context', guid2], cmd='checkin') - self.assertEqual(sorted([ - {'guid': guid1, 'pins': []}, - {'guid': guid2, 'pins': ['favorite']}, - {'guid': guid3, 'pins': ['checkin']}, - {'guid': guid4, 'pins': []}, - ]), - sorted(ipc.get(['context'], reply=['guid', 'pins'])['result'])) + self.assertEqual( + sorted([(guid1, []), (guid2, ['favorite']), (guid3, ['checkin']), (guid4, [])]), + sorted([(i['guid'], i['pins']) for i in ipc.get(['context'], reply=['pins'])['result']])) self.assertEqual([ {'guid': guid2, 'pins': ['favorite']}, ], @@ -496,12 +475,9 @@ class RoutesTest(tests.Test): {'event': 'checkin', 'state': 'ready'}, ], [i for i in ipc.put(['context', '2'], None, cmd='checkin')]) - self.assertEqual([ - {'guid': '1', 'pins': ['favorite']}, - {'guid': '2', 'pins': ['checkin']}, - {'guid': '3', 'pins': []}, - ], - ipc.get(['context'], reply=['guid', 'pins'])['result']) + self.assertEqual( + sorted([('1', ['favorite']), ('2', ['checkin']), ('3', [])]), + sorted([(i['guid'], i['pins']) for i in ipc.get(['context'], reply=['guid', 'pins'])['result']])) self.stop_master() self.wait_for_events(event='inline', state='offline').wait() @@ -692,8 +668,7 @@ class RoutesTest(tests.Test): ipc = IPCConnection() self.assertEqual([ - {'event': 'checkin', 'state': 'solve'}, - {'error': 'Context not found', 'event': 'failure', 'exception': 'NotFound'}, + {'error': 'Resource not found', 'event': 'failure', 'exception': 'NotFound'}, ], [i for i in ipc.put(['context', 'context'], None, cmd='checkin')]) @@ -862,11 +837,10 @@ class RoutesTest(tests.Test): self.assertEqual('done', events[-1]['event']) guid = events[-1]['guid'] - self.assertEqual({ - 'context': 'context', - 'error': 'error', - }, - ipc.get(['report', guid], reply=['context', 'error'])) + report = ipc.get(['report', guid], reply=['context', 'error']) + self.assertEqual('context', report['context']) + self.assertEqual('error', report['error']) + self.assertEqual(sorted([ 'content1', 'content2', @@ -1016,7 +990,7 @@ class RoutesTest(tests.Test): assert time.time() - ts >= 2 def kill(): - coroutine.sleep(.5) + coroutine.sleep(.4) self.waitpid(node_pid) coroutine.spawn(kill) @@ -1095,13 +1069,114 @@ class RoutesTest(tests.Test): self.assertEqual([{'event': 'pong'}], events) assert Routes.subscribe_tries > 2 + def test_PullCheckinsOnGets(self): + local_volume = self.start_online_client() + local = IPCConnection() + remote = Connection(creds=SugarCreds(client.keyfile.value)) + + self.assertEqual([[1, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) + + guid = remote.post(['context'], { + 'type': 'activity', + 'title': '1', + 'summary': '', + 'description': '', + }) + local.put(['context', guid], None, cmd='favorite') + self.assertEqual('1', remote.get(['context', guid, 'title'])) + self.assertEqual('1', local.get(['context', guid])['title']) + coroutine.sleep(1.1) + + self.assertEqual([[1, 1], [6, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) + remote.put(['context', guid, 'title'], '2') + self.assertEqual('2', remote.get(['context', guid, 'title'])) + self.assertEqual('1', local.get(['context', guid])['title']) + self.assertEqual([[1, 1], [6, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) + self.assertEqual('2', local.get(['context'], reply='title')['result'][0]['title']) + coroutine.sleep(.1) + self.assertEqual('2', remote.get(['context', guid, 'title'])) + self.assertEqual('2', local.get(['context', guid])['title']) + self.assertEqual([[1, 1], [7, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) + def test_PullCheckinsOnGettingOnline(self): + routes._RECONNECT_TIMEOUT = 1 + routes._SYNC_TIMEOUT = 0 + local_volume = self.start_online_client() + local = IPCConnection() + remote = Connection(creds=SugarCreds(client.keyfile.value)) + + self.assertEqual([[1, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) + + guid = remote.post(['context'], { + 'type': 'activity', + 'title': '1', + 'summary': '', + 'description': '', + }) + local.put(['context', guid], None, cmd='favorite') + self.assertEqual('1', remote.get(['context', guid, 'title'])) + self.assertEqual('1', local.get(['context', guid])['title']) + coroutine.sleep(1.1) + + remote.put(['context', guid, 'title'], '2') + self.assertEqual('2', remote.get(['context', guid, 'title'])) + self.assertEqual('1', local.get(['context', guid])['title']) + self.assertEqual([[1, 1], [6, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) + + self.stop_master() + self.wait_for_events(event='inline', state='offline').wait() + self.fork_master() + self.wait_for_events(event='sync', state='pull').wait() + + self.assertEqual('2', local.get(['context', guid])['title']) + self.assertEqual([[1, 1], [7, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) + + def test_PullCheckinsOnUpdates(self): + local_volume = self.start_online_client() + local = IPCConnection() + remote = Connection(creds=SugarCreds(client.keyfile.value)) + + self.assertEqual([[1, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) + + guid = remote.post(['context'], { + 'type': 'activity', + 'title': '1', + 'summary': '1', + 'description': '', + }) + local.put(['context', guid], None, cmd='favorite') + self.assertEqual('1', remote.get(['context', guid, 'title'])) + self.assertEqual('1', local.get(['context', guid])['title']) + coroutine.sleep(1.1) + remote.put(['context', guid, 'title'], '2') + self.assertEqual('2', remote.get(['context', guid, 'title'])) + self.assertEqual('1', remote.get(['context', guid, 'summary'])) + self.assertEqual('1', local.get(['context', guid])['title']) + self.assertEqual('1', local.get(['context', guid])['summary']) + self.assertEqual([[1, 1], [6, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) + + local.put(['context', guid, 'summary'], '2') + self.assertEqual('2', remote.get(['context', guid, 'title'])) + self.assertEqual('2', remote.get(['context', guid, 'summary'])) + self.assertEqual('2', local.get(['context', guid])['title']) + self.assertEqual('2', local.get(['context', guid])['summary']) + self.assertEqual([[1, 1], [8, None]], self.client_routes._pull_r.value) + self.assertEqual(0, local_volume.seqno.value) def ___test_CachedClientRoutes(self): volume = db.Volume('client', RESOURCES, lazy_open=True) diff --git a/tests/units/db/index.py b/tests/units/db/index.py index cb144c6..c0072c1 100755 --- a/tests/units/db/index.py +++ b/tests/units/db/index.py @@ -420,14 +420,18 @@ class IndexTest(tests.Test): post_stored = [] deleted = [] + def pre_stored_cb(*args): + pre_stored.append(args) + return {} + db.store('1', {}, - lambda *args: pre_stored.append(args), + pre_stored_cb, lambda *args: post_stored.append(args)) self.assertEqual(1, len(pre_stored)) self.assertEqual(1, len(post_stored)) db.store('1', {}, - lambda *args: pre_stored.append(args), + pre_stored_cb, lambda *args: post_stored.append(args)) self.assertEqual(2, len(pre_stored)) self.assertEqual(2, len(post_stored)) diff --git a/tests/units/db/resource.py b/tests/units/db/resource.py index 05aaddf..4bf80b7 100755 --- a/tests/units/db/resource.py +++ b/tests/units/db/resource.py @@ -483,6 +483,45 @@ class ResourceTest(tests.Test): self.assertEqual('set2!', doc['prop1']) self.assertEqual('set2!', doc['prop3']) + def test_diff_OutputRange(self): + + class Document(db.Resource): + + @db.stored_property() + def prop1(self, value): + return value + + @db.stored_property() + def prop2(self, value): + return value + + directory = Directory(tests.tmpdir, Document, IndexWriter, _SessionSeqno(), this.broadcast) + + guid = directory.create({'prop1': '1', 'prop2': '1'}) + self.utime('db', 0) + + out_r = [] + self.assertEqual({ + 'guid': {'mtime': 0, 'value': guid}, + 'prop1': {'mtime': 0, 'value': '1'}, + 'prop2': {'mtime': 0, 'value': '1'}, + }, + directory[guid].diff([[1, None]], out_r)) + self.assertEqual([[1, 1]], out_r) + + directory.update(guid, {'prop1': '2'}) + directory.update(guid, {'prop2': '2'}) + self.utime('db', 0) + + out_r = [] + self.assertEqual({ + 'guid': {'mtime': 0, 'value': guid}, + 'prop1': {'mtime': 0, 'value': '2'}, + 'prop2': {'mtime': 0, 'value': '2'}, + }, + directory[guid].diff([[1, None]], out_r)) + self.assertEqual([[1, 3]], out_r) + class _SessionSeqno(object): diff --git a/tests/units/db/routes.py b/tests/units/db/routes.py index 4189502..5d5a547 100755 --- a/tests/units/db/routes.py +++ b/tests/units/db/routes.py @@ -3,6 +3,7 @@ import os import sys +import json import time import shutil import hashlib @@ -16,10 +17,11 @@ src_root = abspath(dirname(__file__)) from __init__ import tests from sugar_network import db, toolkit +from sugar_network.db import routes as db_routes from sugar_network.model.user import User from sugar_network.toolkit.router import Router, Request, Response, fallbackroute, ACL, File from sugar_network.toolkit.coroutine import this -from sugar_network.toolkit import coroutine, http, i18n +from sugar_network.toolkit import coroutine, http, i18n, parcel class RoutesTest(tests.Test): @@ -1942,6 +1944,169 @@ class RoutesTest(tests.Test): [{'event': 'delete', 'resource': 'document', 'guid': guid}], events) + def test_ObjectDiff(self): + + class Document(db.Resource): + + @db.stored_property() + def prop1(self, value): + return value + + @db.stored_property() + def prop2(self, value): + return value + + @db.stored_property(db.Blob) + def prop3(self, value): + return value + + @db.stored_property(db.Blob) + def prop4(self, value): + return value + + volume = db.Volume('.', [Document]) + router = Router(db.Routes(volume)) + + volume['document'].create({ + 'guid': 'guid', + 'prop1': '1', + 'prop2': 2, + 'prop3': volume.blobs.post('333', '3/3').digest, + }) + volume['document'].update('guid', {'prop4': volume.blobs.post('4444', '4/4').digest}) + self.utime('db/document/gu/guid', 1) + + patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff')]) + self.assertEqual([( + {'packet': None}, [ + {'resource': 'document'}, + {'guid': 'guid', 'patch': { + 'guid': {'value': 'guid', 'mtime': 1}, + 'prop1': {'value': '1', 'mtime': 1}, + 'prop2': {'value': 2, 'mtime': 1}, + 'prop3': {'value': hashlib.sha1('333').hexdigest(), 'mtime': 1}, + 'prop4': {'value': hashlib.sha1('4444').hexdigest(), 'mtime': 1}, + }}, + {'content-type': '4/4', 'content-length': '4', 'x-seqno': '3'}, + {'content-type': '3/3', 'content-length': '3', 'x-seqno': '1'}, + {'commit': [[1, 4]]}, + ], + )], + [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))]) + + patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff', environ={ + 'HTTP_X_RANGE': json.dumps([[1, 1]]), + })]) + self.assertEqual([( + {'packet': None}, [], + )], + [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))]) + + patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff', environ={ + 'HTTP_X_RANGE': json.dumps([[2, 2]]), + })]) + self.assertEqual([( + {'packet': None}, [ + {'resource': 'document'}, + {'guid': 'guid', 'patch': { + 'guid': {'value': 'guid', 'mtime': 1}, + 'prop1': {'value': '1', 'mtime': 1}, + 'prop2': {'value': 2, 'mtime': 1}, + 'prop3': {'value': hashlib.sha1('333').hexdigest(), 'mtime': 1}, + }}, + {'content-type': '3/3', 'content-length': '3', 'x-seqno': '1'}, + {'commit': [[1, 2]]}, + ], + )], + [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))]) + + patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff', environ={ + 'HTTP_X_RANGE': json.dumps([[3, 3]]), + })]) + self.assertEqual([( + {'packet': None}, [], + )], + [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))]) + + patch = ''.join([i for i in this.call(method='GET', path=['document', 'guid'], cmd='diff', environ={ + 'HTTP_X_RANGE': json.dumps([[4, 4]]), + })]) + self.assertEqual([( + {'packet': None}, [ + {'resource': 'document'}, + {'guid': 'guid', 'patch': { + 'prop4': {'value': hashlib.sha1('4444').hexdigest(), 'mtime': 1}, + }}, + {'content-type': '4/4', 'content-length': '4', 'x-seqno': '3'}, + {'commit': [[3, 4]]}, + ], + )], + [(packet.header, [i.meta if isinstance(i, File) else i for i in packet]) for packet in parcel.decode(StringIO(patch))]) + + def test_GroupedDiff(self): + + class Document(db.Resource): + + @db.stored_property() + def prop(self, value): + return value + + volume = db.Volume('.', [Document]) + router = Router(db.Routes(volume)) + + volume['document'].create({'guid': '1', 'prop': 'q'}) + volume['document'].create({'guid': '2', 'prop': 'w'}) + volume['document'].create({'guid': '3', 'prop': 'w'}) + volume['document'].create({'guid': '4', 'prop': 'e'}) + volume['document'].create({'guid': '5', 'prop': 'e'}) + volume['document'].create({'guid': '6', 'prop': 'e'}) + self.utime('db/document', 0) + + self.assertEqual({ + '1': [[1, 1]], + '2': [[2, 2]], + '3': [[3, 3]], + '4': [[4, 4]], + '5': [[5, 5]], + '6': [[6, 6]], + }, + this.call(method='GET', path=['document'], cmd='diff')) + + self.assertEqual({ + 'q': [[1, 1]], + 'w': [[2, 3]], + 'e': [[4, 6]], + }, + this.call(method='GET', path=['document'], cmd='diff', key='prop')) + + def test_GroupedDiffLimit(self): + db_routes._GROUPED_DIFF_LIMIT = 2 + + class Document(db.Resource): + pass + + volume = db.Volume('.', [Document]) + router = Router(db.Routes(volume)) + + volume['document'].create({'guid': '1'}) + volume['document'].create({'guid': '2'}) + volume['document'].create({'guid': '3'}) + volume['document'].create({'guid': '4'}) + volume['document'].create({'guid': '5'}) + self.utime('db/document', 0) + + self.assertEqual({ + '1': [[1, 1]], + '2': [[2, 2]], + }, + this.call(method='GET', path=['document'], cmd='diff')) + + self.assertEqual({ + '3': [[3, 3]], + '4': [[4, 4]], + }, + this.call(method='GET', path=['document'], cmd='diff', environ={'HTTP_X_RANGE': json.dumps([[3, None]])})) + if __name__ == '__main__': tests.main() diff --git a/tests/units/db/volume.py b/tests/units/db/volume.py index 22d4782..a770a35 100755 --- a/tests/units/db/volume.py +++ b/tests/units/db/volume.py @@ -323,51 +323,6 @@ class VolumeTest(tests.Test): self.assertRaises(StopIteration, patch.next) self.assertEqual([[4, None]], r) - def test_clone(self): - - class Document(db.Resource): - - @db.stored_property() - def prop1(self, value): - return value - - @db.stored_property() - def prop2(self, value): - return value - - @db.stored_property(db.Blob) - def prop3(self, value): - return value - - @db.stored_property(db.Blob) - def prop4(self, value): - return value - - volume = db.Volume('.', [Document]) - - volume['document'].create({ - 'guid': 'guid', - 'prop1': '1', - 'prop2': 2, - 'prop3': volume.blobs.post('333', '3/3').digest, - 'prop4': volume.blobs.post('4444', '4/4').digest, - }) - self.utime('db/document/gu/guid', 1) - - self.assertEqual([ - {'content-type': '3/3', 'content-length': '3', 'x-seqno': '1'}, - {'content-type': '4/4', 'content-length': '4', 'x-seqno': '2'}, - {'resource': 'document'}, - {'guid': 'guid', 'patch': { - 'guid': {'value': 'guid', 'mtime': 1}, - 'prop1': {'value': '1', 'mtime': 1}, - 'prop2': {'value': 2, 'mtime': 1}, - 'prop3': {'value': hashlib.sha1('333').hexdigest(), 'mtime': 1}, - 'prop4': {'value': hashlib.sha1('4444').hexdigest(), 'mtime': 1}, - }}, - ], - [i.meta if isinstance(i, File) else i for i in volume.clone('document', 'guid')]) - def test_patch_New(self): class Document(db.Resource): @@ -973,6 +928,177 @@ class VolumeTest(tests.Test): [dict(i) for i in volume.diff(r, files=['foo'])]) self.assertEqual([[4, None]], r) + def test_DoNotShiftSeqnoForLocalProps(self): + + class Document(db.Resource): + + @db.stored_property() + def prop1(self, value): + return value + + @db.stored_property(acl=ACL.PUBLIC | ACL.LOCAL) + def prop2(self, value): + return value + + directory = db.Volume('.', [Document])['document'] + + directory.create({'guid': '1', 'prop1': '1', 'prop2': '1', 'ctime': 1, 'mtime': 1}) + self.utime('db/document', 0) + self.assertEqual( + {'seqno': 1, 'value': 1, 'mtime': 0}, + directory['1'].meta('seqno')) + self.assertEqual( + {'seqno': 1, 'value': '1', 'mtime': 0}, + directory['1'].meta('prop1')) + self.assertEqual( + {'value': '1', 'mtime': 0}, + directory['1'].meta('prop2')) + + directory.update('1', {'prop2': '2'}) + self.utime('db/document', 0) + self.assertEqual( + {'seqno': 1, 'value': 1, 'mtime': 0}, + directory['1'].meta('seqno')) + self.assertEqual( + {'seqno': 1, 'value': '1', 'mtime': 0}, + directory['1'].meta('prop1')) + self.assertEqual( + {'value': '2', 'mtime': 0}, + directory['1'].meta('prop2')) + + directory.update('1', {'prop1': '2'}) + self.utime('db/document', 0) + self.assertEqual( + {'seqno': 2, 'value': 2, 'mtime': 0}, + directory['1'].meta('seqno')) + self.assertEqual( + {'seqno': 2, 'value': '2', 'mtime': 0}, + directory['1'].meta('prop1')) + self.assertEqual( + {'value': '2', 'mtime': 0}, + directory['1'].meta('prop2')) + + def test_patch_SeqnoLess(self): + + class Document(db.Resource): + + @db.indexed_property(slot=1) + def prop(self, value): + return value + + volume1 = db.Volume('1', [Document]) + volume1['document'].create({'guid': '1', 'prop': '1', 'ctime': 1, 'mtime': 1}) + self.utime('1/db/document/1/1', 1) + volume1.blobs.post('1') + + volume2 = db.Volume('2', [Document]) + volume2.patch(volume1.diff([[1, None]]), shift_seqno=False) + + self.assertEqual( + [(1, '1', 1, '1')], + [(i['ctime'], i['prop'], i['mtime'], i['guid']) for i in volume2['document'].find()[0]]) + + doc = volume2['document'].get('1') + self.assertEqual(0, doc.get('seqno')) + assert 'seqno' not in doc.meta('guid') + assert 'seqno' not in doc.meta('ctime') + assert 'seqno' not in doc.meta('mtime') + assert 'seqno' not in doc.meta('prop') + + blob = volume2.blobs.get(hashlib.sha1('1').hexdigest()) + self.assertEqual({ + 'x-seqno': '0', + 'content-length': '1', + 'content-type': 'application/octet-stream', + }, + blob.meta) + self.assertEqual('1', file(blob.path).read()) + + def test_diff_IgnoreSeqnolessUpdates(self): + + class Document(db.Resource): + + @db.stored_property() + def prop1(self, value): + return value + + @db.stored_property(acl=ACL.PUBLIC | ACL.LOCAL) + def prop2(self, value): + return value + + volume = db.Volume('.', [Document]) + + volume['document'].create({'guid': '1', 'prop1': '1', 'prop2': '1', 'ctime': 1, 'mtime': 1}) + self.utime('db/document/1/1', 1) + + r = [[1, None]] + self.assertEqual([ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'prop1': {'value': '1', 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + }}, + {'commit': [[1, 1]]}, + ], + [i.meta if isinstance(i, File) else i for i in volume.diff(r)]) + self.assertEqual([[2, None]], r) + + volume['document'].update('1', {'prop2': '2'}) + self.utime('db/document/1/1', 1) + + r = [[1, None]] + self.assertEqual([ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'prop1': {'value': '1', 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + }}, + {'commit': [[1, 1]]}, + ], + [i.meta if isinstance(i, File) else i for i in volume.diff(r)]) + self.assertEqual([[2, None]], r) + + volume['document'].update('1', {'prop1': '2'}) + self.utime('db/document/1/1', 1) + + r = [[1, None]] + self.assertEqual([ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'prop1': {'value': '2', 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + }}, + {'commit': [[1, 2]]}, + ], + [i.meta if isinstance(i, File) else i for i in volume.diff(r)]) + self.assertEqual([[3, None]], r) + + self.assertEqual(False, volume['document'].patch('1', {'prop1': {'mtime': 2, 'value': '3'}}, seqno=False)) + self.assertEqual('3', volume['document']['1']['prop1']) + self.utime('db/document/1/1', 1) + + r = [[1, None]] + self.assertEqual([ + {'resource': 'document'}, + {'guid': '1', 'patch': { + 'guid': {'value': '1', 'mtime': 1}, + 'ctime': {'value': 1, 'mtime': 1}, + 'mtime': {'value': 1, 'mtime': 1}, + }}, + {'commit': [[1, 2]]}, + ], + [i.meta if isinstance(i, File) else i for i in volume.diff(r)]) + self.assertEqual([[3, None]], r) + + + + class _SessionSeqno(object): diff --git a/tests/units/node/slave.py b/tests/units/node/slave.py index 2b32b70..10e5742 100755 --- a/tests/units/node/slave.py +++ b/tests/units/node/slave.py @@ -107,7 +107,6 @@ class SlaveTest(tests.Test): self.assertEqual([[7, None]], json.load(file('slave/var/push.ranges'))) coroutine.sleep(1) - slave.put(['document', guid1], {'message': 'a'}) slave.put(['document', guid2], {'message': 'b'}) slave.put(['document', guid3], {'message': 'c'}) guid4 = slave.post(['document'], {'message': 'd', 'title': ''}) @@ -119,7 +118,7 @@ class SlaveTest(tests.Test): ]), sorted(master.get(['document'], reply=['guid', 'message'])['result'])) self.assertEqual([[6, None]], json.load(file('slave/var/pull.ranges'))) - self.assertEqual([[12, None]], json.load(file('slave/var/push.ranges'))) + self.assertEqual([[11, None]], json.load(file('slave/var/push.ranges'))) def test_online_sync_Pull(self): self.fork_master([User, self.Document]) @@ -172,7 +171,6 @@ class SlaveTest(tests.Test): self.assertEqual([[5, None]], json.load(file('slave/var/push.ranges'))) coroutine.sleep(1) - master.put(['document', guid1], {'message': 'a'}) master.put(['document', guid2], {'message': 'b'}) master.put(['document', guid3], {'message': 'c'}) guid4 = master.post(['document'], {'message': 'd', 'title': ''}) @@ -183,7 +181,7 @@ class SlaveTest(tests.Test): {'guid': guid4, 'message': 'd'}, ], slave.get(['document'], reply=['guid', 'message'])['result']) - self.assertEqual([[12, None]], json.load(file('slave/var/pull.ranges'))) + self.assertEqual([[11, None]], json.load(file('slave/var/pull.ranges'))) self.assertEqual([[6, None]], json.load(file('slave/var/push.ranges'))) def test_online_sync_PullBlobs(self): diff --git a/tests/units/toolkit/parcel.py b/tests/units/toolkit/parcel.py index 1a24a3f..17fa146 100755 --- a/tests/units/toolkit/parcel.py +++ b/tests/units/toolkit/parcel.py @@ -18,7 +18,7 @@ from sugar_network.toolkit import parcel, http, coroutine class ParcelTest(tests.Test): - def test_decode(self): + def test_decode_Zipped(self): stream = zips( json.dumps({'foo': 'bar'}) + '\n' ) @@ -96,7 +96,85 @@ class ParcelTest(tests.Test): self.assertRaises(StopIteration, packets_iter.next) self.assertEqual(len(stream.getvalue()), stream.tell()) - def test_decode_WithLimit(self): + def test_decode_NotZipped(self): + stream = StringIO( + json.dumps({'foo': 'bar'}) + '\n' + ) + packets_iter = parcel.decode(stream) + self.assertRaises(EOFError, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = StringIO( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + self.assertEqual('foo', packet['bar']) + packet_iter = iter(packet) + self.assertRaises(EOFError, packet_iter.next) + self.assertRaises(EOFError, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = StringIO( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' + + json.dumps({'payload': 1}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 1}, next(packet_iter)) + self.assertRaises(EOFError, packet_iter.next) + self.assertRaises(EOFError, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = StringIO( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'payload': 2}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 1}, next(packet_iter)) + self.assertRaises(StopIteration, packet_iter.next) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 2}, next(packet_iter)) + self.assertRaises(EOFError, packet_iter.next) + self.assertRaises(EOFError, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + stream = StringIO( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': 1, 'bar': 'foo'}) + '\n' + + json.dumps({'payload': 1}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({'payload': 2}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ) + packets_iter = parcel.decode(stream) + with next(packets_iter) as packet: + self.assertEqual(1, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 1}, next(packet_iter)) + self.assertRaises(StopIteration, packet_iter.next) + with next(packets_iter) as packet: + self.assertEqual(2, packet.name) + packet_iter = iter(packet) + self.assertEqual({'payload': 2}, next(packet_iter)) + self.assertRaises(StopIteration, packet_iter.next) + self.assertRaises(StopIteration, packets_iter.next) + self.assertEqual(len(stream.getvalue()), stream.tell()) + + def test_decode_ZippedWithLimit(self): payload = zips( json.dumps({}) + '\n' + json.dumps({'packet': 'first'}) + '\n' + @@ -114,6 +192,24 @@ class ParcelTest(tests.Test): pass self.assertEqual(len(payload), stream.tell()) + def test_decode_NotZippedWithLimit(self): + payload = StringIO( + json.dumps({}) + '\n' + + json.dumps({'packet': 'first'}) + '\n' + + json.dumps({'packet': 'last'}) + '\n' + ).getvalue() + tail = '.' * 100 + + stream = StringIO(payload + tail) + for i in parcel.decode(stream): + pass + self.assertEqual(len(payload + tail), stream.tell()) + + stream = StringIO(payload + tail) + for i in parcel.decode(stream, limit=len(payload)): + pass + self.assertEqual(len(payload), stream.tell()) + def test_decode_Empty(self): self.assertRaises(http.BadRequest, parcel.decode(StringIO()).next) @@ -254,7 +350,7 @@ class ParcelTest(tests.Test): self.assertRaises(StopIteration, packets_iter.next) self.assertEqual(len(stream.getvalue()), stream.tell()) - def test_encode(self): + def test_encode_Zipped(self): stream = ''.join([i for i in parcel.encode([])]) self.assertEqual( json.dumps({}) + '\n' + @@ -300,6 +396,52 @@ class ParcelTest(tests.Test): json.dumps({'packet': 'last'}) + '\n', unzips(stream)) + def test_encode_NotZipped(self): + stream = ''.join([i for i in parcel.encode([], compresslevel=0)]) + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + stream) + + stream = ''.join([i for i in parcel.encode([(None, None, None)], header={'foo': 'bar'}, compresslevel=0)]) + self.assertEqual( + json.dumps({'foo': 'bar'}) + '\n' + + json.dumps({'packet': None}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + stream) + + stream = ''.join([i for i in parcel.encode([ + (1, {}, None), + ('2', {'n': 2}, []), + ('3', {'n': 3}, iter([])), + ], compresslevel=0)]) + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({'packet': '2', 'n': 2}) + '\n' + + json.dumps({'packet': '3', 'n': 3}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + stream) + + stream = ''.join([i for i in parcel.encode([ + (1, None, [{1: 1}]), + (2, None, [{2: 2}, {2: 2}]), + (3, None, [{3: 3}, {3: 3}, {3: 3}]), + ], compresslevel=0)]) + self.assertEqual( + json.dumps({}) + '\n' + + json.dumps({'packet': 1}) + '\n' + + json.dumps({1: 1}) + '\n' + + json.dumps({'packet': 2}) + '\n' + + json.dumps({2: 2}) + '\n' + + json.dumps({2: 2}) + '\n' + + json.dumps({'packet': 3}) + '\n' + + json.dumps({3: 3}) + '\n' + + json.dumps({3: 3}) + '\n' + + json.dumps({3: 3}) + '\n' + + json.dumps({'packet': 'last'}) + '\n', + stream) + def test_limited_encode(self): RECORD = 1024 * 1024 diff --git a/tests/units/toolkit/router.py b/tests/units/toolkit/router.py index e9ee798..63d4646 100755 --- a/tests/units/toolkit/router.py +++ b/tests/units/toolkit/router.py @@ -535,7 +535,7 @@ class RouterTest(tests.Test): ['_afz'], [i for i in router({'REQUEST_METHOD': 'PROBE', 'PATH_INFO': '/'}, lambda *args: None)]) - def test_routes_Post(self): + def test_routes_Postroutes(self): postroutes = [] class A(object): @@ -551,24 +551,28 @@ class RouterTest(tests.Test): @postroute def _(self, result, exception): postroutes.append(('_', result, str(exception))) + return result class B1(A): @postroute def z(self, result, exception): postroutes.append(('z', result, str(exception))) + return result class B2(object): @postroute def f(self, result, exception): postroutes.append(('f', result, str(exception))) + return result class C(B1, B2): @postroute def a(self, result, exception): postroutes.append(('a', result, str(exception))) + return result router = Router(C()) @@ -595,6 +599,25 @@ class RouterTest(tests.Test): ], postroutes) + def test_routes_UpdateResultInPostroutes(self): + postroutes = [] + + class A(object): + + @route('OK') + def ok(self): + return 'ok' + + @postroute + def postroute(self, result, exception): + return result + '!' + + router = Router(A()) + + self.assertEqual( + ['ok!'], + [i for i in router({'REQUEST_METHOD': 'OK', 'PATH_INFO': '/'}, lambda *args: None)]) + def test_routes_WildcardsAsLastResort(self): class Routes(object): |