From ee64655f6a54a98adfa1eab832210a082d47945e Mon Sep 17 00:00:00 2001 From: anishmangal2002 Date: Thu, 20 May 2010 19:45:49 +0000 Subject: Replaced groupthink symbolic link with actual folder. --- diff --git a/groupthink b/groupthink deleted file mode 120000 index d5caf1e..0000000 --- a/groupthink +++ /dev/null @@ -1 +0,0 @@ -dobject/groupthink/ \ No newline at end of file diff --git a/groupthink/__init__.py b/groupthink/__init__.py new file mode 100644 index 0000000..628753a --- /dev/null +++ b/groupthink/__init__.py @@ -0,0 +1 @@ +from groupthink_base import * diff --git a/groupthink/aatree.py b/groupthink/aatree.py new file mode 100644 index 0000000..6676277 --- /dev/null +++ b/groupthink/aatree.py @@ -0,0 +1,598 @@ +class Node: + """Conventions: a nonexistent child or parent is None.""" + + parent = None + leftchild = None + rightchild = None + annotation = None + value = None + +class AANode(Node): + level = 1 + +class Walker: + """descend must return 0 if the node in question is the one desired, + -1 if the left child would be better, or 1 if the right child would be + better""" + def descend(self, node): + raise + def prepare_descend(self, *args): #optional method to prepare for descent + raise + """ascend should return True iff it should run again on the parent. + It should set the state such that a subsequent + descent would retrace these steps.""" + def ascend(self, node): + raise + def prepare_ascend(self, *args): #optional method to prepare for ascent + raise + +class SearchWalker(Walker): + """Convention: leftchild.annotation < annotation < rightchild.annotation""" + val = 0 + compare = cmp + def prepare_descend(self, val, comparator=cmp): + self.val = val + self.compare = comparator + def descend(self, node): + x = self.compare(node.annotation, self.val) + return x + def ascend(self, node): + self.val = node.annotation + return False + +class RandomWalker(Walker): + def prepare_descend(self): + from random import choice as choice + self.choice = choice + def descend(self, node): + return self.choice((-1,1)) + #ascend not implemented; it doesn't make sense because there is no + #state and no reproducibility + +def descend(node, walker): #move down from a root node + x = walker.descend(node) + while x != 0: + if x == 1: + if node.rightchild is None: + return (node, 1) + else: + node = node.rightchild + else: #x == -1 + if node.leftchild is None: + return (node, -1) + else: + node = node.leftchild + x = walker.descend(node) + return (node, 0) + +def ascend(node, walker): #Move up from a leaf node + while node is not None and walker.ascend(node): + node = node.parent + +def search(root, val): + """Searches a correctly sorted binary tree, starting with the root Node, for + val. Returns a node that contains val if val is present, otherwise a node + for which val would be an acceptable child value.""" + w = SearchWalker + w.prepared_descend(val) + return descend(root, w) + +def findmin(root): + while root.leftchild is not None: + root = root.leftchild + return root + +def findmax(root): + while root.rightchild is not None: + root = root.rightchild + return root + +class MonoidTree: + makenode = Node + """A Monoid Annotation Tree is a binary tree whose nodes are each annotated + by values from some monoid. The annotation of an internal node is computed + by applying the operation to the annotations of its children. The annotation of a leaf + node is specified by the user. Every node must either have two children or + be a leaf node. + + Each leaf node may also be associated with an arbitrary opaque value of the user's + choosing. This node and value will remain associated.""" + def __init__(self, operation, rootnode): + """The rootnode must have a valid annotation, and its parent must be None""" + self.op = operation + self.root = rootnode + def _update(self, node, sentinel=None): + """node must be an internal node""" + while node is not sentinel: + #oldval = node.annotation + node.annotation = self.op(node.leftchild.annotation, node.rightchild.annotation) + #if oldval == node.annotation: + # #this node has not changed, so nodes above it will also not have changed + # break + #else: + node = node.parent + _update_add = _update + _update_del = _update + def _split_link(self, node): + """Introduce and return a new node (newparent) between node and its parent""" + newparent = self.makenode() + newparent.parent = node.parent + if node.parent is not None: + if node.parent.leftchild is node: + node.parent.leftchild = newparent + else: + assert node.parent.rightchild is node + node.parent.rightchild = newparent + else: + self.root = newparent + node.parent = newparent + return newparent + def addleft(self, new, old): + """Add a new leaf node to the left of an old leaf node""" + newparent = self._split_link(old) + newparent.rightchild = old + newparent.leftchild = new + new.parent = newparent + self._update_add(newparent) + def addright(self, new, old): + """Add a new leaf node to the right of an old leaf node""" + newparent = self._split_link(old) + newparent.rightchild = new + newparent.leftchild = old + new.parent = newparent + self._update_add(newparent) + def add(self, new, walker): + leaf, position = descend(self.root, walker) + assert leaf.leftchild is None + assert leaf.rightchild is None + if position == 1: + self.addright(new, leaf) + else: #Makes left the default for duplicate values + self.addleft(new, leaf) + def remove(self, leaf): + p = leaf.parent + if p.leftchild is leaf: + sibling = p.rightchild + else: + assert p.rightchild is leaf + sibling = p.leftchild + gp = p.parent + if gp.leftchild is p: + gp.leftchild = sibling + elif gp.rightchild is p: + gp.rightchild = sibling + sibling.parent = gp + # The only remaining reference to p is now in leaf itself, and the only + # remaining reference to leaf is in the user's hands + self._update_del(gp) + def change_annotation(self, leaf, newann): + assert leaf.leftchild is None + assert leaf.rightchild is None + leaf.annotation = newann + self._update(leaf.parent) + def getnext(self, leaf, skip=None): + assert leaf.leftchild is None + assert leaf.rightchild is None + node = leaf + while ((node.parent is not None) and + ((node.parent.rightchild is node) or + ((skip is not None) and skip(node.parent.rightchild)))): + # Move up until you can move right + node = node.parent + if (node.parent is not None) and (node.parent.leftchild is node): + node = node.parent.rightchild + while node.leftchild is not None: + # Move down, staying as far left as possible. + assert node.rightchild is not None + if (skip is not None) and skip(node.leftchild): + node = node.rightchild + else: + node = node.leftchild + return node + else: + raise StopIteration("No next node") + + def _build_subtree(self, nodes): + #FIXME: This cannot be helpful because insertion of a subtree requires + #rebalancing the main tree by more than one level, which is not possible + #with a single invocation of skew and split + L = len(nodes) + if L == 1: + return nodes[0] + else: + next = [] + sentinel = 'g' #must not be None, since None is the root sentinel + if L % 2: + n2 = nodes.pop() + n1 = nodes.pop() + newnode = self.makenode() + newnode.parent=sentinel #totally arbitrary constant + newnode.leftchild = n1 + n1.parent = newnode + newnode.rightchild = n2 + n2.parent = newnode + self._update_add(newnode, sentinel) + nodes.append(newnode) + for i in xrange(0,L,2): + n1,n2 = nodes[i:(i+2)] + newnode = self.makenode() + newnode.parent=sentinel #totally arbitrary constant + newnode.leftchild = n1 + n1.parent = newnode + newnode.rightchild = n2 + n2.parent = newnode + self._update_add(newnode, sentinel) + + + +class SumWalker(Walker): + """SumWalker is designed to walk over full trees where each leaf has annotation 1 + and the monoid is +. Target is the zero-indexed position of the target node. + + There is one exception: the last node in every tree has annotation 0.""" + target = None + offset = None + def prepare_descend(self, target): + self.target = target + self.offset = 0 + def descend(self, node): + if node.annotation == 0: #empty leaf at the last position + assert self.target == self.offset + return -1 + elif node.leftchild is None: #leaf node case + assert node.rightchild is None + assert self.target == self.offset + return 0 + else: #internal node case + p = self.offset + node.leftchild.annotation + if p <= self.target: + self.offset = p + return 1 + else: + return -1 + def prepare_ascend(self): + self.target = 0 + def ascend(self, node): + if node.parent is not None: + if node.parent.rightchild is node: + self.target += node.parent.leftchild.annotation + else: + assert node.parent.leftchild is node + return True + else: + return False + +class TreeList: + """Implements a list-like interface, backed by a MonoidTree""" + _treetype = MonoidTree + def __init__(self): + self._makenode = self._treetype.makenode + r = self._makenode() + r.annotation = 0 + from operator import add + self._tree = self._treetype(add, r) + self._walker = SumWalker() + # We regard the fields of this walker as public API, and manipulate + # them directly + self._index = {} + def __len__(self): + return self._tree.root.annotation + def _getnode(self, i): + self._walker.prepare_descend(i) + node, pos = descend(self._tree.root, self._walker) + assert pos == 0 + return node + def __getitem__(self, s): + if isinstance(s, int): + node = self._getnode(s) + return node.value + else: + raise UnimplementedError + def __setitem__(self, s, v): + if isinstance(s, int): + if s < len(self): + node = self._getnode(s) + oldv = node.value + self._index[oldv].remove(node) + if not self._index[oldv]: + del self._index[oldv] + node.value = v + if v not in self._index: + self._index[v] = set() + self._index[v].add(node) + else: + self.insert(s, v) + else: + raise UnimplementedError + def __delitem__(self, s): + if isinstance(s, int): + if s < len(self): + node = self._getnode(s) + oldv = node.value + self._index[oldv].remove(node) + if not self._index[oldv]: + del self._index[oldv] + self._tree.remove(node) + else: + raise UnimplementedError + def insert(self, p, v): + if p > len(self): + raise IndexError("Index out of range") + self._walker.prepare_descend(p) + newnode = self._makenode() + newnode.annotation = 1 + newnode.value = v + self._tree.add(newnode, self._walker) + if v not in self._index: + self._index[v] = set() + self._index[v].add(newnode) + def index(self, v): + """index returns some index such that self[i] == v. No promises about ordering.""" + self._walker.prepare_ascend() + for node in self._index[v]: #Pull one arbitrary node out of the set + assert node.value == v + ascend(node, self._walker) + break + return self._walker.target + +class TreeHideList: + """Implements the EagerHideList interface, backed by a MonoidTree""" + _treetype = MonoidTree + class MultiSumWalker(Walker): + index = 0 + target = 0 + offset = 0 + def prepare_descend(self, target, index): + self.index = index + self.target = target + self.offset = 0 + def descend(self, node): + if node.annotation == (0,0): #empty leaf at the last position + assert self.target == self.offset + return -1 + elif node.leftchild is None: #leaf node case + assert node.rightchild is None + assert self.target == self.offset + return 0 + else: #internal node case + p = self.offset + node.leftchild.annotation[self.index] + if p <= self.target: + self.offset = p + return 1 + else: + return -1 + def prepare_ascend(self, index): + self.target = 0 + self.index = index + def ascend(self, node): + if node.parent is not None: + if node.parent.rightchild is node: + self.target += node.parent.leftchild.annotation[self.index] + else: + assert node.parent.leftchild is node + return True + else: + return False + + @staticmethod + def op(a,b): + # Convention: a[0] is visible elements. a[1] is all elements. + return (a[0] + b[0], a[1] + b[1]) + + @staticmethod + def skip(node): + return node.annotation[0] == 0 + + def __init__(self): + self._makenode = self._treetype.makenode + r = self._makenode() + r.annotation = (0, 0) + self._tree = self._treetype(self.op, r) + self._walker = self.MultiSumWalker() + # We regard the fields of this walker as public API, and manipulate + # them directly + self._index = {} + unique = True + if unique: + self._index_lookup = self._index.__getitem__ + self._index_assign = self._index.__setitem__ + else: + self._index_lookup = self._index_lookup_set + self._index_assign = self._index_assign_set + def _index_lookup_set(self, item): + for v in self._index[item]: + return v + def _index_assign_set(self, key, value): + if key not in self._index: + self._index[key] = set() + self._index[key].add(value) + def __len__(self): + return self._tree.root.annotation[0] + def _getnode(self, i, a): + self._walker.prepare_descend(i, a) + node, pos = descend(self._tree.root, self._walker) + assert (pos == 0) or ((pos == -1) and (i == len(self))) + return node + def __getitem__(self, s): + if isinstance(s, int): + if s < len(self): #FIXME: negative indices + node = self._getnode(s, 0) + return node.value + else: + raise IndexError("Index out of range") + else: + start, stop, stride = s.indices(len(self)) + if start == stop: + return [] + elif stride == 1: + # runs in k + log(N) (amortized) + nodes = [self._getnode(start,0)] + k = stop - start + while len(nodes) < k: + nodes.append(self._tree.getnext(nodes[-1],self.skip)) + return [n.value for n in nodes] + else: + #FIXME: runs in k*log(N), could be reduced to k*log(step) + log(N) + return [self[i] for i in xrange(start,stop,stride)] + def index(self, v, visible=True): + """index returns some index such that self[i] == v. No promises about ordering.""" + self._walker.prepare_ascend(0 if visible else 1) + node = self._index_lookup(v) #Pull one arbitrary node out of the set + assert node.value == v + ascend(node, self._walker) + return self._walker.target + def hide(self, position, length): + #self.__getitem__ is eager, so we acquire the list of nodes before + #acting on them + node = self._getnode(position,0) + for i in xrange(position+1,position+length): + self._tree.change_annotation(node,(0,1)) + node = self._tree.getnext(node, self.skip) + self._tree.change_annotation(node,(0,1)) + #FIXME: runs in length*log(N). Could be reduced using a priority queue, + #possibly to length + log(N) + def getitem_all(self, s): + if isinstance(s, int): + node = self._getnode(s, 1) + return node.value + else: + #FIXME: runs in k*log(N), could be reduced to k + log(N) by linked list + return [self.getitem_all(i) for i in xrange(*s.indices())] + def index_all(self, item): + return self.index(item, False) + def is_visible(self, i): + node = self._getnode(i, 1) + return node.annotation[0] == 1 + def is_visible_item(self, item): + node = self._index_lookup(item) + return node.annotation[0] == 1 + def insert_sequence_all(self, position, sequence, visibility): + node = self._getnode(position,1) + self._insert_sequence_leftofnode(node, sequence, visibility) + def insert_sequence_leftof(self, target, sequence, visibility): + node = self._index_lookup(target) + self._insert_sequence_leftofnode(node, sequence, visibility) + def _insert_sequence_leftofnode(self, node, sequence, visibility): + for i in xrange(len(sequence)): + v = sequence[i] + viz = visibility[i] + newnode = self._makenode() + newnode.annotation = (1 if viz else 0, 1) + newnode.value = v + self._tree.addleft(newnode, node) + self._index_assign(v, newnode) + +# Skew, split, and decrease_level are the AA balancing functions, as described +# at http://en.wikipedia.org/wiki/AA_tree . They have been modified +# substantially here to (1) maintain bidirectional linking and (2) maintain +# monoid annotations. +def skew(node, op=None): + L = node.leftchild + if (L is not None) and node.level == L.level: + node.leftchild = L.rightchild + if node.leftchild is not None: + node.leftchild.parent = node + L.rightchild = node + L.parent = node.parent + node.parent = L + if L.parent is not None: + if L.parent.leftchild is node: + L.parent.leftchild = L + else: + assert L.parent.rightchild is node + L.parent.rightchild = L + if op is not None: + L.annotation = node.annotation + node.annotation = op(node.leftchild.annotation, node.rightchild.annotation) + assert L.annotation == op(L.leftchild.annotation, L.rightchild.annotation) + # This assertion is the condition of associativity, guaranteed for any + # valid monoid operation. + return L + else: + return node + +def split(node, op=None): + R = node.rightchild + if ((R is not None) and + (R.rightchild is not None) and + (node.level == R.rightchild.level)): + node.rightchild = R.leftchild + node.rightchild.parent = node + + R.leftchild = node + R.parent = node.parent + node.parent = R + + R.level += 1 + + if R.parent is not None: + if R.parent.leftchild is node: + R.parent.leftchild = R + else: + assert R.parent.rightchild is node + R.parent.rightchild = R + + if op is not None: + R.annotation = node.annotation + node.annotation = op(node.leftchild.annotation, node.rightchild.annotation) + assert R.annotation == op(R.leftchild.annotation, R.rightchild.annotation) + # This assertion is the condition of associativity, guaranteed for any + # valid monoid operation. + + return R + else: + return node + +def decrease_level(node): + # Decrease the level of node if necessary. Returns true if a modification + # was made. + target = min(node.leftchild.level, node.rightchild.level) + 1 + if target < node.level: + node.level = target + if target < node.rightchild.level: + node.rightchild.level = target + return True + return False + +class AAMonoidTree(MonoidTree): + makenode = AANode + def _update_add(self, node, sentinel=None): + """node must be an internal node one level above the leaves, with + two leaves itself.""" + node.level = 2 + while node is not sentinel: + #oldval = node.annotation + node.annotation = self.op(node.leftchild.annotation, node.rightchild.annotation) + node = skew(node, self.op) + node = split(node, self.op) + if node.parent is None: + self.root = node + node = node.parent + def _update_del(self, node, sentinel=None): + while node is not sentinel: + #oldval = node.annotation + #oldlevel = node.level + node.annotation = self.op(node.leftchild.annotation, node.rightchild.annotation) + + decrease_level(node) + + node = skew(node, self.op) + node.rightchild = skew(node.rightchild, self.op) + if node.rightchild.rightchild is not None: + node.rightchild.rightchild = skew(node.rightchild.rightchild, self.op) + node = split(node, self.op) + node.rightchild = split(node.rightchild, self.op) + + #if (oldval == node.annotation) and (oldlevel == node.level): + # #Nodes above this point will not have changed + # break + + if node.parent is None: + self.root = node + node = node.parent + +class AATreeList(TreeList): + _treetype = AAMonoidTree + +class AATreeHideList(TreeHideList): + _treetype = AAMonoidTree diff --git a/groupthink/aatree_test.py b/groupthink/aatree_test.py new file mode 100644 index 0000000..643aa6c --- /dev/null +++ b/groupthink/aatree_test.py @@ -0,0 +1,16 @@ +from aatree import * +x = TreeList() +y = AATreeList() + +def test(a): + a[0] = 1 + a[1] = 2 + a[2] = 3 + a[3] = 4 + a[1] = 'b' + del a[2] + assert a.index('b') == 1 + assert a.index(4) == 2 + +test(x) +test(y) diff --git a/groupthink/dbus_tools.py b/groupthink/dbus_tools.py new file mode 100644 index 0000000..423e51a --- /dev/null +++ b/groupthink/dbus_tools.py @@ -0,0 +1,26 @@ +import dbus + +inttypes = (dbus.Int16, dbus.Int32, dbus.Int64, + dbus.Byte, dbus.UInt16, dbus.UInt32, dbus.UInt64) +booltypes = (dbus.Boolean) +floattypes = (dbus.Double) +strtypes = (dbus.ByteArray, dbus.String, dbus.UTF8String, dbus.Signature, + dbus.ObjectPath) + +def undbox(x): + if isinstance(x, inttypes): + return int(x) + elif isinstance(x, booltypes): + return bool(x) + elif isinstance(x, strtypes): + return str(x) + elif isinstance(x, floattypes): + return float(x) + elif isinstance(x, (dbus.Struct, tuple)): + return tuple(undbox(y) for y in x) + elif isinstance(x, (dbus.Array, list)): + return [undbox(y) for y in x] + elif isinstance(x, (dbus.Dictionary, dict)): + return dict((undbox(a),undbox(b)) for (a,b) in x.iteritems()) + else: + return x diff --git a/groupthink/groupthink_base.py b/groupthink/groupthink_base.py new file mode 100644 index 0000000..ca7d6a6 --- /dev/null +++ b/groupthink/groupthink_base.py @@ -0,0 +1,1727 @@ +""" +Copyright 2008 Benjamin M. Schwartz + +DOBject is LGPLv2+ + +DObject is free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation, either version 2 of the License, or +(at your option) any later version. + +DObject is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with DObject. If not, see . +""" +import dbus +import dbus.service +import dbus.gobject_service +import time +import logging +import threading +import thread +import random +from listset import ListSet +import stringtree +import cPickle +import dbus_tools +""" +DObject is a library of components useful for constructing distributed +applications that need to maintain coherent state while communicating over +Telepathy. The DObject tools are design to handle unexpected joins, leaves, +splits, and merges automatically, and always to leave each connected component +of users in a coherent state at quiescence. +""" + +def PassFunction(*args,**kargs): + logging.debug("args=%s, kargs=%s" % (str(args),str(kargs))) + pass + +def ReturnFunction(x): + return x + +class Group: + """A Group is a simple tool for organizing DObjects. Once it is set up + with a tubebox, the user may simply add objects to it, e.g. + + self.group = Group(tb) + ... + self.group['mydict1'] = HighScore('No one', 0) + + and the group will take care of assigning a handler to the object with + the specified name. + For a Group g, g['a'] is equivalent in almost all ways to g.a, for + programmer convenience. + """ + + tubebox = None + _locked = False + _d = None + + def __init__(self, tubebox): + self._logger = logging.getLogger('groupthink.Group') + self._logger.debug('new Group') + self.tubebox = tubebox + self._d = dict() + self._history = dict() + self._handlers = dict() + self._locked = True + + def __setitem__(self, name, dobj): + self._logger.debug("setitem(%s,%s)" % (name, str(dobj))) + if name in self.__dict__ or name in self._d: + raise #Cannot replace an existing attribute or object + h = dobj.HANDLER_TYPE(name, self.tubebox) + dobj.set_handler(h) + self.add_handler(h, dobj) + + def add_handler(self, h, o=None): + """This function is used to add a handler to the Group _after_ that + handler has already been registered to completion with its object.""" + name = h.get_name() + self._handlers[name] = h + if name in self._history: + h.object.add_history(self._history[name]) + del self._history[name] + if o is not None: + self._d[name] = o + else: + self._d[name] = h.object + for hc in h.get_copies(): #Recurse through a potential tree of handlers + self.add_handler(hc) + + def __setattr__(self, name, val): + if self._locked: + self.__setitem__(name, val) + else: + self.__dict__[name] = val + + def __getitem__(self, name): + if name in self._d: + return self._d[name] + else: + return self.__dict__[name] + + __getattr__ = __getitem__ + + def __delattr__(self, name): + raise #Deletion is not supported + + def dumps(self): + d = {} + for (name, handler) in self._handlers.iteritems(): + d[name] = dbus_tools.undbox(handler.object.get_history()) + d.update(self._history) #Include any "unclaimed history" thus far. + return cPickle.dumps(d) + + def loads(self, s): + if s: + d = cPickle.loads(s) + for (name,hist) in d.iteritems(): + if name in self._d: + handler = self._handlers[name] + handler.object.add_history(hist) + else: + self._history[name] = hist + +class TubeBox: + """ A TubeBox is a box that either contains a Tube or does not. + The purpose of a TubeBox is to solve this problem: Activities are not + provided with the sharing Tube until they are shared, but DObjects should + not have to care whether or not they have been shared. That means that the + DObject handler must know whether or not a Tube has been provided. This + could be implemented within the handlers, but then the Activity's sharing + code would have to know a list of all DObject handlers. + + Instead, the sharing code just needs to create a TubeBox and pass it to the + code that creates handlers. Once the tube arrives, it can be added to the + TubeBox with insert_tube. The handlers will then be notified automatically. + """ + def __init__(self): + self.tube = None + self.is_initiator = None + self._logger = logging.getLogger() + self._listeners = [] + + def register_listener(self, L): + """This method is used by the DObject handlers to add a callback + function that will be called after insert_tube""" + self._listeners.append(L) + if self.tube is not None: + L(self.tube, self.is_initiator) + + def insert_tube(self, tube, is_initiator=False): + """This method is used by the sharing code to provide the tube, once it + is ready, along with a boolean indicating whether or not this computer + is the initiator (who may have special duties, as the first + participant).""" + self._logger.debug("insert_tube, notifying %s" % str(self._listeners)) + self.tube = tube + self.is_initiator = is_initiator + for L in self._listeners: + L(tube, is_initiator) + +class TimeHandler(dbus.gobject_service.ExportedGObject): + """A TimeHandler provides a universal clock for a sharing instance. It is a + sort of cheap, decentralized synchronization system. The TimeHandler + determines the offset between local time and group time by sending a + broadcast and accepting the first response, and assuming that both transfer + displays were equal. The initiator's offset is 0.0, but once another group + member has synchronized, the initiator can leave and new members will still + be synchronized correctly. Errors at each synchronization are typically + between 0.1s and 2s. + + TimeHandler is not perfectly resilient to disappearances. If the group + splits, and one of the daughter groups does not contain any members that + have had a chance to synchronize, then they will not sync to each other. I + am not yet aware of any sensible synchronization system that avoids this + problem. + """ + IFACE = "org.dobject.TimeHandler" + BASEPATH = "/org/dobject/TimeHandler/" + + def __init__(self, name, tube_box, offset=0.0): + self.PATH = TimeHandler.BASEPATH + name + dbus.gobject_service.ExportedGObject.__init__(self) + self._logger = logging.getLogger(self.PATH) + self._tube_box = tube_box + self.tube = None + self.is_initiator = None + + self.offset = offset + self._know_offset = False + self._offset_lock = threading.Lock() + + self._tube_box.register_listener(self.get_tube) + + def get_tube(self, tube, is_initiator): + """Callback for the TubeBox""" + self._logger.debug("get_tube") + self._logger.debug(str(is_initiator)) + self.tube = tube + self.add_to_connection(self.tube, self.PATH) + self.is_initiator = is_initiator + self._know_offset = is_initiator + self.tube.add_signal_receiver(self.tell_time, signal_name='What_time_is_it', dbus_interface=TimeHandler.IFACE, sender_keyword='sender', path=self.PATH) + + if not self._know_offset: + self.ask_time() + + def time(self): + """Get the group time""" + return time.time() + self.offset + + def get_offset(self): + """Get the difference between local time and group time""" + self._logger.debug("get_offset " + str(self.offset)) + return self.offset + + def set_offset(self, offset): + """Set the difference between local time and group time, and assert that + this is correct""" + self._logger.debug("set_offset " + str(offset)) + self._offset_lock.acquire() + self.offset = offset + self._know_offset = True + self._offset_lock.release() + + @dbus.service.signal(dbus_interface=IFACE, signature='d') + def What_time_is_it(self, asktime): + return + + def ask_time(self): + self._logger.debug("ask_time") + self.What_time_is_it(time.time()) + + def tell_time(self, asktime, sender=None): + self._logger.debug("tell_time") + start_time = time.time() + try: + my_name = self.tube.get_unique_name() + if sender == my_name: + return + if self._know_offset: + self._logger.debug("telling offset") + remote = self.tube.get_object(sender, self.PATH) + start_time += self.offset + remote.receive_time(asktime, start_time, time.time() + self.offset, reply_handler=PassFunction, error_handler=PassFunction) + finally: + return + + @dbus.service.method(dbus_interface=IFACE, in_signature='ddd', out_signature='') + def receive_time(self, asktime, start_time, finish_time): + self._logger.debug("receive_time") + rtime = time.time() + thread.start_new_thread(self._handle_incoming_time, (asktime, start_time, finish_time, rtime)) + + def _handle_incoming_time(self, ask, start, finish, receive): + self._offset_lock.acquire() + if not self._know_offset: + self.offset = ((start + finish)/2) - ((ask + receive)/2) + self._know_offset = True + self._offset_lock.release() + +class UnorderedHandler(dbus.gobject_service.ExportedGObject): + """The UnorderedHandler serves as the interface between a local UnorderedObject + (a pure python entity) and the d-bus/network system. Each UnorderedObject + is associated with a single Handler, and vice-versa. It is the Handler that + is actually exposed over D-Bus. The purpose of this system is to minimize + the amount of networking code required for each additional UnorderedObject. + """ + IFACE = "org.dobject.Unordered" + BASEPATH = "/org/dobject/Unordered/" + + def __init__(self, name, tube_box): + """To construct a UO, the program must provide a name and a TubeBox. + The name is used to identify the UO; all UO with the same name on the + same Tube should be considered views into the same abstract distributed + object.""" + self._myname = name + self.PATH = UnorderedHandler.BASEPATH + name + dbus.gobject_service.ExportedGObject.__init__(self) + self._logger = logging.getLogger(self.PATH) + self._tube_box = tube_box + self.tube = None + self._copies = [] + + self.object = None + self._tube_box.register_listener(self.set_tube) + + def set_tube(self, tube, is_initiator): + self._logger.debug("set_tube(), is_initiator=%s" % str(is_initiator)) + """Callback for the TubeBox""" + self.tube = tube + self.add_to_connection(self.tube, self.PATH) + + self.tube.add_signal_receiver(self.receive_message, signal_name='send', dbus_interface=UnorderedHandler.IFACE, sender_keyword='sender', path=self.PATH) + self.tube.add_signal_receiver(self.tell_history, signal_name='ask_history', dbus_interface=UnorderedHandler.IFACE, sender_keyword='sender', path=self.PATH) + + # We need watch_participants because of the case in which several groups + # all having made changes, come together and need to update each other. + # There is no easy way to make this process more efficient without + # changing the Unordered interface dramatically to include per-message + # labels of some kind. + self.tube.watch_participants(self.members_changed) + + #Alternative implementation of members_changed (not yet working) + #self.tube.add_signal_receiver(self.members_changed, signal_name="MembersChanged", dbus_interface="org.freedesktop.Telepathy.Channel.Interface.Group") + + if self.object is not None: + self.ask_history() + + def register(self, obj): + self._logger.debug("register(%s)" % str(obj)) + """This method registers obj as the UnorderedObject being managed by + this Handler. It is called by obj after obj has initialized itself.""" + self.object = obj + if self.tube is not None: + self.ask_history() + + def get_path(self): + """Returns the DBus path of this handler. The path is the closest thing + to a unique identifier for each abstract DObject.""" + return self.PATH + + def get_tube(self): + """Returns the TubeBox used to create this handler. This method is + necessary if one DObject wishes to create another.""" + return self._tube_box + + @dbus.service.signal(dbus_interface=IFACE, signature='v') + def send(self, message): + self._logger.debug("send(%s)" % str(message)) + """This method broadcasts message to all other handlers for this UO""" + return + + def receive_message(self, message, sender=None): + self._logger.debug("receive_message(%s)" % str(message)) + if self.object is None: + self._logger.error("got message before registration") + elif sender == self.tube.get_unique_name(): + self._logger.debug("Ignoring message, because I am the sender.") + else: + self.object.receive_message(message) + + @dbus.service.signal(dbus_interface=IFACE, signature='') + def ask_history(self): + self._logger.debug("ask_history()") + return + + def tell_history(self, sender=None): + self._logger.debug("tell_history to " + str(sender)) + try: + if sender == self.tube.get_unique_name(): + self._logger.debug("tell_history aborted because I am" + str(sender)) + return + if self.object is None: + self._logger.error("object not registered before tell_history") + return + self._logger.debug("getting proxy object") + remote = self.tube.get_object(sender, self.PATH) + self._logger.debug("got proxy object, getting history") + h = self.object.get_history() + self._logger.debug("got history, initiating transfer") + remote.receive_history(h, reply_handler=PassFunction, error_handler=PassFunction) + self._logger.debug("history transfer initiated") + except Exception, E: + self._logger.debug("tell_history failed: " % repr(E)) + finally: + return + + @dbus.service.method(dbus_interface=IFACE, in_signature = 'v', out_signature='') + def receive_history(self, hist): + self._logger.debug("receive_history(%s)" % str(hist)) + if self.object is None: + self._logger.error("object not registered before receive_history") + return + self.object.add_history(hist) + + #Alternative implementation of a members_changed (not yet working) + """ + def members_changed(self, message, added, removed, local_pending, remote_pending, actor, reason): + added_names = self.tube.InspectHandles(telepathy.CONNECTION_HANDLE_TYPE_LIST, added) + for name in added_names: + self.tell_history(name) + """ + def members_changed(self, added, removed): + self._logger.debug("members_changed") + for (handle, name) in added: + self.tell_history(sender=name) + + def __repr__(self): + return 'UnorderedHandler(' + self._myname + ', ' + repr(self._tube_box) + ')' + + def copy(self, name): + """A convenience function for returning a new UnorderedHandler derived + from this one, with a new name. This is safe as long as copy() is called + with a different name every time.""" + h = UnorderedHandler(self._myname + "/" + name, self._tube_box) + self._copies.append(h) + return h + + def get_copies(self): + return self._copies + + def get_name(self): + return self._myname + +class HandlerAcceptor: + HANDLER_TYPE = NotImplementedError + def set_handler(self, handler): + raise NotImplementedError + +class UnorderedHandlerAcceptor(HandlerAcceptor): + HANDLER_TYPE = UnorderedHandler + +class UnorderedObject(UnorderedHandlerAcceptor): + """ The most basic DObject is the Unordered Object (UO). A UO has the + property that any changes to its state can be encapsulated as messages, and + these messages have no intrinsic ordering. Different instances of the same + UO, after receiving the same messages in different orders, should reach the + same state. + + Any UO could be implemented as a set of all messages received so far, and + coherency could be maintained by sending all messages ever transmitted to + each new joining member. However, many UOs will have the property that most + messages are obsolete, and need not be transmitted. Therefore, as an + optimization, UOs manage their own state structures for synchronizing state + with joining/merging users. + + The following code is an abstract class for UnorderedObject, serving + primarily as documentation for the concept. + """ + + handler = None + + def set_handler(self, handler): + """Each UO must accept an UnorderedHandler via set_handler + Whenever an action is taken on the local UO (e.g. a method call that changes + the object's state), the UO must call handler.send() with an appropriately + encoded message. + + Subclasses may override this method if they wish to perform more actions + when a handler is set.""" + if self.handler: + raise + else: + self.handler = handler + self.handler.register(self) + + + def receive_message(self,msg): + """This method accepts and processes a message sent via handler.send(). + Because objects are sent over DBus, it is advisable to DBus-ify the message + before calling send, and de-DBus-ify it inside receive_message.""" + raise NotImplementedError + + def get_history(self): + """This method returns an encoded copy of all non-obsolete state, ready to be + sent over DBus.""" + raise NotImplementedError + + def add_history(self, state): + """This method accepts and processes the state object returned by get_history()""" + raise NotImplementedError + + +def empty_translator(x, pack): + return x + +class HighScore(UnorderedObject): + """ A HighScore is the simplest nontrivial DObject. A HighScore's state consists + of a value and a score. The user may suggest a new value and score. If the new + score is higher than the current score, then the value and score are updated. + Otherwise, they are not. + + The value can be any object, and the score can be any comparable object. + + To ensure that serialization works correctly, the user may specify a + translator function that converts values or scores to and from a format that + can be serialized reliably by dbus-python. + + In the event of a tie, coherence cannot be guaranteed. If ties are likely + with the score of choice, the user may set break_ties=True, which appends a + random number to each message, and thereby reduces the probability of a tie + by a factor of 2**52. + """ + def __init__(self, initval, initscore, value_translator=empty_translator, score_translator=empty_translator, break_ties=False): + self._logger = logging.getLogger('stopwatch.HighScore') + self._lock = threading.Lock() + self._value = initval + self._score = initscore + + self._break_ties = break_ties + if self._break_ties: + self._tiebreaker = random.random() + else: + self._tiebreaker = None + + self._val_trans = value_translator + self._score_trans = score_translator + + self._listeners = [] + + + def _set_value_from_net(self, val, score, tiebreaker): + self._logger.debug("set_value_from_net " + str(val) + " " + str(score)) + if self._actually_set_value(val, score, tiebreaker): + self._trigger() + + def receive_message(self, message): + self._logger.debug("receive_message " + str(message)) + if len(message) == 2: #Remote has break_ties=False + self._set_value_from_net(self._val_trans(message[0], False), self._score_trans(message[1], False), None) + elif len(message) == 3: + self._set_value_from_net(self._val_trans(message[0], False), self._score_trans(message[1], False), float_translator(message[2], False)) + + + add_history = receive_message + + def set_value(self, val, score): + """This method suggests a value and score for this HighScore. If the + suggested score is higher than the current score, then both value and + score will be broadcast to all other participants. + """ + self._logger.debug("set_value " + str(val) + " " + str(score)) + if self._actually_set_value(val, score, None) and self.handler: + self.handler.send(self.get_history()) + + def _actually_set_value(self, value, score, tiebreaker): + self._logger.debug("_actually_set_value " + str(value)+ " " + str(score)) + if self._break_ties and (tiebreaker is None): + tiebreaker = random.random() + self._lock.acquire() + if self._break_ties: + if (self._score < score) or ((self._score == score) and (self._tiebreaker < tiebreaker)): + self._value = value + self._score = score + self._tiebreaker = tiebreaker + self._lock.release() + return True + else: + self._lock.release() + return False + elif self._score < score: + self._value = value + self._score = score + self._lock.release() + return True + else: + self._logger.debug("not changing value") + self._lock.release() + return False + + def get_value(self): + """ Get the current winning value.""" + return self._value + + def get_score(self): + """ Get the current winning score.""" + return self._score + + def get_pair(self): + """ Get the current value and score, returned as a tuple (value, score)""" + self._lock.acquire() + pair = (self._value, self._score) + self._lock.release() + return pair + + def _get_all(self): + if self._break_ties: + self._lock.acquire() + q = (self._value, self._score, self._tiebreaker) + self._lock.release() + return q + else: + return self.get_pair() + + def get_history(self): + p = self._get_all() + if self._break_ties: + return (self._val_trans(p[0], True), self._score_trans(p[1], True), float_translator(p[2], True)) + else: + return (self._val_trans(p[0], True), self._score_trans(p[1], True)) + + def register_listener(self, L): + """Register a function L that will be called whenever another user sets + a new record. L must have the form L(value, score).""" + self._lock.acquire() + self._listeners.append(L) + self._lock.release() + (v,s) = self.get_pair() + L(v,s) + + def _trigger(self): + (v,s) = self.get_pair() + for L in self._listeners: + L(v,s) + +def float_translator(f, pack): + """This translator packs and unpacks floats for dbus serialization""" + if pack: + return dbus.Double(f) + else: + return float(f) + +def uint_translator(f, pack): + """This translator packs and unpacks 64-bit uints for dbus serialization""" + if pack: + return dbus.UInt64(f) + else: + return int(f) + +def int_translator(f, pack): + """This translator packs and unpacks 32-bit ints for dbus serialization""" + if pack: + return dbus.Int32(f) + else: + return int(f) + +def string_translator(s, pack): + """This translator packs and unpacks unicode strings for dbus serialization""" + if pack: + return dbus.String(s) + else: + return str(s) + +class Latest(HandlerAcceptor): + """ Latest is a variation on HighScore, in which the score is the current + timestamp. Latest uses TimeHandler to provide a groupwide coherent clock. + Because TimeHandler's guarantees about synchronization and resilience are + weak, Latest is not as resilient to failures as a true DObject. + + The creator must provide the initial value. One may + optionally indicate the initial time (as a float in epoch-time), a + TimeHandler (otherwise a new one will be created), and a translator for + serialization of the values. + + Note that if time_handler is not provided, the object will not be functional + until set_handler is called. + """ + def __init__(self, initval, inittime=float('-inf'), time_handler=None, translator=empty_translator): + self._time_handler = time_handler + + self._listeners = [] + self._lock = threading.Lock() + + self._highscore = HighScore(initval, inittime, translator, float_translator) + self._highscore.register_listener(self._highscore_cb) + + def set_handler(self, handler): + if self.handler: + raise + else: + if self._time_handler is None: + self._time_handler = TimeHandler(handler.get_path(), handler.get_tube()) + self._highscore.set_handler(handler) + + def get_value(self): + """ Returns the latest value """ + return self._highscore.get_value() + + def set_value(self, val): + """ Suggest a new value """ + if self._time_handler: + self._highscore.set_value(val, self._time_handler.time()) + else: + raise #missing _time_handler + + def register_listener(self, L): + """ Register a listener L(value), to be called whenever another user + adds a new latest value.""" + self._lock.acquire() + self._listeners.append(L) + self._lock.release() + L(self.get_value()) + + def _highscore_cb(self, val, score): + for L in self._listeners: + L(val) + +class Recentest(HandlerAcceptor): + """ Recentest is like Latest, but without using a clock or TimeHandler. + As a result, it can only guarantee causality, not synchrony. + """ + def __init__(self, initval, translator=empty_translator): + self._listeners = [] + self._lock = threading.Lock() + + self._highscore = HighScore(initval, 0, translator, uint_translator, break_ties=True) + self._highscore.register_listener(self._highscore_cb) + + def set_handler(self, handler): + self._highscore.set_handler(handler) + + def get_value(self): + """ Returns the current value """ + return self._highscore.get_value() + + def set_value(self, val): + """ Set a new value """ + self._highscore.set_value(val, self._highscore.get_score() + 1) + + def register_listener(self, L): + """ Register a listener L(value), to be called whenever another user + adds a new latest value.""" + self._lock.acquire() + self._listeners.append(L) + self._lock.release() + L(self.get_value()) + + def _highscore_cb(self, val, score): + for L in self._listeners: + L(val) + +class AddOnlySet(UnorderedObject): + """The AddOnlySet is the archetypal UnorderedObject. It consists of a set, + supporting all the normal Python set operations except those that cause an + item to be removed from the set. Thanks to this restriction, a AddOnlySet + is perfectly coherent, since the order in which elements are added is not + important. + """ + def __init__(self, initset = (), translator=empty_translator): + self._logger = logging.getLogger('dobject.AddOnlySet') + self._set = set(initset) + + self._lock = threading.Lock() + + self._trans = translator + self._listeners = [] + + self.__and__ = self._set.__and__ + self.__cmp__ = self._set.__cmp__ + self.__contains__ = self._set.__contains__ + self.__eq__ = self._set.__eq__ + self.__ge__ = self._set.__ge__ + # Not implementing getattribute + self.__gt__ = self._set.__gt__ + self.__hash__ = self._set.__hash__ + # Not implementing iand (it can remove items) + # Special wrapper for ior to trigger events + # Not implementing isub (it can remove items) + self.__iter__ = self._set.__iter__ + # Not implementing ixor (it can remove items) + self.__le__ = self._set.__le__ + self.__len__ = self._set.__len__ + self.__lt__ = self._set.__lt__ + self.__ne__ = self._set.__ne__ + self.__or__ = self._set.__or__ + self.__rand__ = self._set.__rand__ + # Special implementation of repr + self.__ror__ = self._set.__ror__ + self.__rsub__ = self._set.__rsub__ + self.__rxor__ = self._set.__rxor__ + self.__sub__ = self._set.__sub__ + self.__xor__ = self._set.__xor__ + + # Special implementation of add to trigger events + # Not implementing clear + self.copy = self._set.copy + self.difference = self._set.difference + # Not implementing difference_update (it removes items) + # Not implementing discard (it removes items) + self.intersection = self._set.intersection + # Not implementing intersection_update (it removes items) + self.issubset = self._set.issubset + self.issuperset = self._set.issuperset + # Not implementing pop + # Not implementing remove + self.symmetric_difference = self._set.symmetric_difference + # Not implementing symmetric_difference_update + self.union = self._set.union + # Special implementation of update to trigger events + + def update(self, y): + """Add all the elements of an iterable y to the current set. If any of + these elements were not already present, they will be broadcast to all + other users.""" + s = set(y) + d = s - self._set + if len(d) > 0: + self._set.update(d) + self._send(d) + + __ior__ = update + + def add(self, y): + """ Add the single element y to the current set. If y is not already + present, it will be broadcast to all other users.""" + if y not in self._set: + self._set.add(y) + self._send((y,)) + + def _send(self, els): + if len(els) > 0 and self.handler is not None: + self.handler.send(dbus.Array([self._trans(el, True) for el in els])) + + def _net_update(self, y): + s = set(y) + d = s - self._set + if len(d) > 0: + self._set.update(d) + self._trigger(d) + + def receive_message(self, msg): + self._net_update((self._trans(el, False) for el in msg)) + + def get_history(self): + if len(self._set) > 0: + return dbus.Array([self._trans(el, True) for el in self._set]) + else: + return dbus.Array([], type=dbus.Boolean) #Prevent introspection of empty list, which fails + + add_history = receive_message + + def register_listener(self, L): + """Register a listener L(diffset). Every time another user adds items + to the set, L will be called with the set of new items.""" + self._listeners.append(L) + L(self._set.copy()) + + def _trigger(self, s): + for L in self._listeners: + L(s) + + def __repr__(self): + return 'AddOnlySet(' + repr(self.handler) + ', ' + repr(self._set) + ', ' + repr(self._trans) + ')' + +class AddOnlySortedSet(UnorderedObject): + """ AddOnlySortedSet is much like AddOnlySet, only backed by a ListSet, which + provides a set for objects that are ordered under cmp(). Items are maintained + in order. This approach is most useful in cases where each item is a message, + and the messages are subject to a time-like ordering. Messages may still + arrive out of order, but they will be stored in the same order on each + computer. + """ + def __init__(self, initset = (), translatohr=empty_translator): + self._logger = logging.getLogger('dobject.AddOnlySortedSet') + self._set = ListSet(initset) + + self._lock = threading.Lock() + + self._trans = translator + self._listeners = [] + + self.__and__ = self._set.__and__ + self.__contains__ = self._set.__contains__ + # No self.__delitem__ + self.__eq__ = self._set.__eq__ + self.__ge__ = self._set.__ge__ + # Not implementing getattribute + self.__getitem__ = self._set.__getitem__ + self.__gt__ = self._set.__gt__ + # Not implementing iand (it can remove items) + # Special wrapper for ior to trigger events + # Not implementing isub (it can remove items) + self.__iter__ = self._set.__iter__ + # Not implementing ixor (it can remove items) + self.__le__ = self._set.__le__ + self.__len__ = self._set.__len__ + self.__lt__ = self._set.__lt__ + self.__ne__ = self._set.__ne__ + self.__or__ = self._set.__or__ + self.__rand__ = self._set.__rand__ + # Special implementation of repr + self.__ror__ = self._set.__ror__ + self.__rsub__ = self._set.__rsub__ + self.__rxor__ = self._set.__rxor__ + self.__sub__ = self._set.__sub__ + self.__xor__ = self._set.__xor__ + + # Special implementation of add to trigger events + # Not implementing clear + self.copy = self._set.copy + self.difference = self._set.difference + # Not implementing difference_update (it removes items) + # Not implementing discard (it removes items) + self.first = self._set.first + self.headset = self._set.headset + self.index = self._set.index + self.intersection = self._set.intersection + # Not implementing intersection_update (it removes items) + self.issubset = self._set.issubset + self.issuperset = self._set.issuperset + self.last = self._set.last + # Not implementing pop + self.position = self._set.position + # Not implementing remove + self.subset = self._set.subset + self.symmetric_difference = self._set.symmetric_difference + # Not implementing symmetric_difference_update + self.tailset = self._set.tailset + self.union = self._set.union + # Special implementation of update to trigger events + + def update(self, y): + """Add all the elements of an iterable y to the current set. If any of + these elements were not already present, they will be broadcast to all + other users.""" + d = ListSet(y) + d -= self._set + if len(d) > 0: + self._set.update(d) + self._send(d) + + __ior__ = update + + def add(self, y): + """ Add the single element y to the current set. If y is not already + present, it will be broadcast to all other users.""" + if y not in self._set: + self._set.add(y) + self._send((y,)) + + def _send(self, els): + if len(els) > 0 and self.handler is not None: + self.handler.send(dbus.Array([self._trans(el, True) for el in els])) + + def _net_update(self, y): + d = ListSet() + d._list = y + d -= self._set + if len(d) > 0: + self._set |= d + self._trigger(d) + + def receive_message(self, msg): + self._net_update([self._trans(el, False) for el in msg]) + + def get_history(self): + if len(self._set._list) > 0: + return dbus.Array([self._trans(el, True) for el in self._set._list]) + else: + return dbus.Array([], type=dbus.Boolean) #prevent introspection of empty list, which fails + + add_history = receive_message + + def register_listener(self, L): + """Register a listener L(diffset). Every time another user adds items + to the set, L will be called with the set of new items as a SortedSet.""" + self._listeners.append(L) + L(self._set.copy()) + + def _trigger(self, s): + for L in self._listeners: + L(s) + + def __repr__(self): + return 'AddOnlySortedSet(' + repr(self.handler) + ', ' + repr(self._set) + ', ' + repr(self._trans) + ')' + +class CausalHandler: + """The CausalHandler is analogous to the UnorderedHandler, in that it + presents an interface with which to build a wide variety of objects with + distributed state. The CausalHandler is different from the Unordered in two + ways: + + 1. The send() method of an CausalHandler returns an index, which must be + stored by the CausalObject in connection with the information that was sent. + This index is a universal, fully-ordered, strictly causal identifier + for each message. + + 2. A CausalObject's receive_message method takes two arguments: the message + and its index. + + As a convenience, there is also + + 3. A get_index() method, which provides a new index on each call, always + higher than all previous indexes. + + CausalObjects are responsible for including index information in the + return value of get_history, and processing index information in add_history. + + It is noteworthy that CausalHandler is in fact implemented on _top_ of + UnorderedHandler. The imposition of ordering does not require lower-level + access to the network. This fact of implementation may change in the + future, but CausalObjects will not be able to tell the difference. + """ + + ZERO_INDEX = (0,0) + + def __init__(self, name, tube_box): + self._myname = name + self._tube_box = tube_box + self._unordered = UnorderedHandler(name, tube_box) + self._counter = 0 + self._copies = [] + + self.object = None + + def register(self, obj): + self.object = obj + self._unordered.register(self) + + def get_index(self): + """get_index returns a new index, higher than all previous indexes. + The primary reason to use get_index is if you wish two know the index + of an item _before_ calling send()""" + self._counter += 1 + return (self._counter, random.getrandbits(64)) + + def index_trans(self, index, pack): + """index_trans is a standard serialization translator for the index + format. Thanks to this translator, a CausalObject can and should treat + each index as an opaque, comparable object.""" + if pack: + return dbus.Struct((dbus.UInt64(index[0]), dbus.UInt64(index[1])), signature='tt') + else: + return (int(index[0]), int(index[1])) + + def send(self, msg, index=None): + """send() broadcasts a message to all other participants. If called + with one argument, send() broadcasts that message, along with a new + index, and returns the index. If called with two arguments, the second + may be an index, which will be used for this message. The index must + have been acquired using get_index(). In this case, the index must be + acquired immediately prior to calling send(). Otherwise, another + message may arrive in the interim, causing a violation of causality.""" + if index is None: + index = self.get_index() + self._unordered.send(dbus.Struct((msg, self.index_trans(index, True)))) + return index + + def receive_message(self, msg): + m = msg[0] + index = self.index_trans(msg[1], False) + self._counter = max(self._counter, index[0]) + self.object.receive_message(m, index) + + def add_history(self, hist): + h = hist[0] + index = self.index_trans(hist[1], False) + self._counter = max(self._counter, index[0]) + self.object.add_history(h) + + def get_history(self): + h = self.object.get_history() + hist = dbus.Struct((h, self.index_trans(self.get_index(), True))) + return + + def copy(self, name): + """A convenience function for returning a new CausalHandler derived + from this one, with a new name. This is safe as long as copy() is called + with a different name every time.""" + h = CausalHandler(self._myname + "/" + name, self._tube_box) + self._copies.append(h) + return h + + def get_copies(self): + return self._copies + + def get_name(self): + return self._myname + + +class CausalHandlerAcceptor(HandlerAcceptor): + HANDLER_TYPE = CausalHandler + def set_handler(self, handler): + raise NotImplementedError + +class CausalObject(CausalHandlerAcceptor): + """A CausalObject is almost precisely like an UnorderedObject, except + that whereas an UnorderedObject is completely specified by a set of messages, + a CausalObject is completely specified by an ordered list of messages, + sorted according to an opaque index associated with each message. + This index must be monotonically increasing in time for new messages as they + are created, but old messages may arrive long after they were created, and + are then inserted into the middle of the timestream. + + The following code is an abstract class for CausalObject, serving + primarily as documentation for the concept. + """ + + handler = None + + def set_handler(self, handler): + """Each CO must accept a CausalHandler via set_handler. + + Subclasses may override this method if they wish to perform more actions + when a handler is set.""" + if self.handler: + raise + else: + self.handler = handler + self.handler.register(self) + + def receive_message(self, msg, index): + """This method accepts and processes a message sent via handler.send(). + Because objects are sent over DBus, it is advisable to DBus-ify the message + before calling send, and de-DBus-ify it inside receive_message. + + The index argument is an opaque index used for determining the ordering.""" + raise NotImplementedError + + def get_history(self): + """This method returns an encoded copy of all non-obsolete state, ready to be + sent over DBus.""" + raise NotImplementedError + + def add_history(self, state): + """This method accepts and processes the state object returned by get_history()""" + raise NotImplementedError + +class CausalDict(CausalObject): + """NOTE: CausalDict is UNTESTED. Other things may be buggy, but CausalDict + PROBABLY DOES NOT WORK. A CausalDict WILL NOT WORK UNTIL set_handler IS CALLED. + + CausalDict is a distributed version of a Dict (hash table). All users keep + a copy of the entire table, so this is not a "Distributed Hash Table" + according to the terminology of the field. + + CausalDict permits all Dict operations, including removing keys and + modifying the value of existing keys. This would not be possible using an + Unordered approach, because two value assignments to the same key could + arrive in different orders for different users, leaving them in different + states at quiescence. + + To solve this problem, every assignment and removal is given a monotonically + increasing unique index, and whenever there is a conflict, the higher-index + operation wins. + + One side effect of this design is that deleted keys cannot be forgotten. If + an assignment operation is received whose index is lower than + the deletion's, then that assignment is considered obsolete and must not be + executed. + + To provide a mechanism for reducing memory usage, the clear() method has + been interpreted to remove not only all entries received so far, but also + all entries that will ever be received with index less than the current + index. + """ + ADD = 0 + DELETE = 1 + CLEAR = 2 + + def __init__(self, initdict=(), key_translator=empty_translator, value_translator=empty_translator): + self._dict = dict(initdict) + self._listeners = [] + + self._key_trans = key_translator + self._val_trans = value_translator + + self.__contains__ = self._dict.__contains__ + #Special __delitem__ + self.__eq__ = self._dict.__eq__ + self.__ge__ = self._dict.__ge__ + self.__getitem__ = self._dict.__getitem__ + self.__gt__ = self._dict.__gt__ + self.__le__ = self._dict.__le__ + self.__len__ = self._dict.__len__ + self.__lt__ = self._dict.__lt__ + self.__ne__ = self._dict.__ne__ + # special __setitem__ + + #Special clear + self.copy = self._dict.copy + self.get = self._dict.get + self.has_key = self._dict.has_key + self.items = self._dict.items + self.iteritems = self._dict.iteritems + self.iterkeys = self._dict.iterkeys + self.itervalues = self._dict.itervalues + self.keys = self._dict.keys + #Special pop + #Special popitem + #special setdefault + #special update + self.values = self._dict.values + + def set_handler(self, handler): + if self.handler is not None: + raise + else: + self.handler = handler + self._clear = self.handler.get_index() #this must happen before index_dict initialization, so that self._clear is less than any index in index_dict + self._index_dict = dict(((k, self.handler.get_index()) for k in self._dict)) + + self.handler.register(self) + + def __delitem__(self, key): + """Same as for dict""" + del self._dict[key] + n = self.handler.send(((dbus.Int32(CausalDict.DELETE), self._key_trans(key, True)))) + self._index_dict[key] = n + + def __setitem__(self, key, value): + """Same as for dict""" + self._dict[key] = value + n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(key, True), self._val_trans(value, True))])) + self._index_dict[key] = n + + def clear(self): + """Same as for dict""" + self._dict.clear() + self._index_dict.clear() + n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.CLEAR))])) + self._clear = n + + def pop(self, key, x=None): + """Same as for dict""" + t = (key in self._dict) + if x is None: + r = self._dict.pop(key) + else: + r = self._dict.pop(key, x) + + if t: + n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))])) + self._index_dict[key] = n + + return r + + def popitem(self): + """Same as for dict""" + p = self._dict.popitem() + key = p[0] + n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))])) + self._index_dict[key] = n + return p + + def setdefault(self, key, x): + """Same as for dict""" + if key not in self._dict: + self._dict[key] = x + n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(key, True), self._val_trans(value, True))])) + self._index_dict[key] = n + + def update(*args,**kargs): + """Same as for dict""" + d = dict() + d.update(*args,**kargs) + newpairs = [] + for p in d.items(): + if (p[0] not in self._dict) or (self._dict[p[0]] != p[1]): + newpairs.append(p) + self._dict[p[0]] = p[1] + n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(p[0], True), self._val_trans(p[1], True)) for p in newpairs])) + + for p in newpairs: + self._index_dict[p[0]] = n + + def receive_message(self, msg, n): + if n > self._clear: + a = dict() + r = dict() + for m in msg: + flag = int(m[0]) #don't know length of m without checking flag + if flag == CausalDict.ADD: + key = self._key_trans(m[1], False) + if (key not in self._index_dict) or (self._index_dict[key] < n): + val = self._val_trans(m[2], False) + if key in self._dict: + r[key] = self._dict[key] + self._dict[key] = val + a[key] = val + self._index_dict[key] = n + elif flag == CausalDict.DELETE: + key = self._key_trans(m[1], False) + if key not in self._index_dict: + self._index_dict[key] = n + elif (self._index_dict[key] < n): + self._index_dict[key] = n + if key in self._dict: + r[key] = self._dict[key] + del self._dict[key] + elif flag == CausalDict.CLEAR: + self._clear = n + for (k, ind) in self._index_dict.items(): + if ind < self._clear: + del self._index_dict[k] + if k in self._dict: + r[k] = self._dict[k] + del self._dict[k] + if (len(a) > 0) or (len(r) > 0): + self._trigger(a,r) + + def get_history(self): + c = self.handler.index_trans(self._clear, True) + d = dbus.Array([(self._key_trans(p[0], True), self._val_trans(p[1], True)) for p in self._dict.items()]) + i = dbus.Array([(self._key_trans(p[0], True), self.handler.index_trans(p[1], True)) for p in self._index_dict.items()]) + return dbus.Struct((c,d,i),signature='itt') + + def add_history(self, hist): + c = self.handler.index_trans(hist[0], False) + d = dict(((self._key_trans(p[0], False), self._val_trans(p[1], False)) for p in hist[1])) + i = [(self._key_trans(p[0], False), self.handler.index_trans(p[1], False)) for p in hist[2]] + + a = dict() + r = dict() + + if c > self._clear: + self._clear = c + for (k, n) in self._index_dict.items(): + if n < self._clear: + del self._index_dict[k] + if k in self._dict: + r[k] = self._dict[k] + del self._dict[k] + + k_changed = [] + for (k, n) in i: + if (((k not in self._index_dict) and (n > self._clear)) or + ((k in self._index_dict) and (n > self._index_dict[k]))): + k_changed.append(k) + self._index_dict[k] = n + + for k in k_changed: + if k in d: + if (k in self._dict) and (self._dict[k] != d[k]): + r[k] = self._dict[k] + a[k] = d[k] + elif k not in self._dict: + a[k] = d[k] + self._dict[k] = d[k] + else: + if k in self._dict: + r[k] = self._dict[k] + del self._dict[k] + + if (len(a) > 0) or (len(r) > 0): + self._trigger(a,r) + + def register_listener(self, L): + """Register a change-listener L. Whenever another user makes a change + to this dict, L will be called with L(dict_added, dict_removed). The + two arguments are the dict of new entries, and the dict of entries that + have been deleted or overwritten.""" + self._listeners.append(L) + L(self._dict.copy(), dict()) + + def _trigger(self, added, removed): + for L in self._listeners: + L(added, removed) + +class UserDict(dbus.gobject_service.ExportedGObject): + IFACE = "org.dobject.UserDict" + BASEPATH = "/org/dobject/UserDict/" + + def __init__(self, name, tubebox, myval, translator = empty_translator): + self._myname = name + self.PATH = UserDict.BASEPATH + name + dbus.gobject_service.ExportedGObject.__init__(self) + self._logger = logging.getLogger(self.PATH) + self._tube_box = tube_box + self.tube = None + + self._dict = dict() + self._myval = myval + self._trans = translator + + self._tube_box.register_listener(self.set_tube) + + self.__contains__ = self._dict.__contains__ + #No __delitem__ + self.__eq__ = self._dict.__eq__ + self.__ge__ = self._dict.__ge__ + self.__getitem__ = self._dict.__getitem__ + self.__gt__ = self._dict.__gt__ + self.__le__ = self._dict.__le__ + self.__len__ = self._dict.__len__ + self.__lt__ = self._dict.__lt__ + self.__ne__ = self._dict.__ne__ + #No __setitem__ + + #No clear + self.copy = self._dict.copy + self.get = self._dict.get + self.has_key = self._dict.has_key + self.items = self._dict.items + self.iteritems = self._dict.iteritems + self.iterkeys = self._dict.iterkeys + self.itervalues = self._dict.itervalues + self.keys = self._dict.keys + #No pop + #No popitem + #No setdefault + #No update + self.values = self._dict.values + + def set_tube(self, tube, is_initiator): + """Callback for the TubeBox""" + self.tube = tube + self.add_to_connection(self.tube, self.PATH) + + self.tube.add_signal_receiver(self.receive_value, signal_name='send_value', dbus_interface=UserDict.IFACE, sender_keyword='sender', path=self.PATH) + self.tube.add_signal_receiver(self.tell_value, signal_name='ask_values', dbus_interface=UserDict.IFACE, sender_keyword='sender', path=self.PATH) + self.tube.watch_participants(self.members_changed) + + #Alternative implementation of members_changed (not yet working) + #self.tube.add_signal_receiver(self.members_changed, signal_name="MembersChanged", dbus_interface="org.freedesktop.Telepathy.Channel.Interface.Group") + + self.ask_values() + + def get_path(self): + """Returns the DBus path of this handler. The path is the closest thing + to a unique identifier for each abstract DObject.""" + return self.PATH + + def get_tube(self): + """Returns the TubeBox used to create this handler. This method is + necessary if one DObject wishes to create another.""" + return self._tube_box + + @dbus.service.signal(dbus_interface=IFACE, signature='v') + def send_value(self, value): + """This method broadcasts message to all other handlers for this UO""" + return + + @dbus.service.signal(dbus_interface=IFACE, signature='') + def ask_values(self): + return + + def tell_value(self, sender=None): + self._logger.debug("tell_history to " + str(sender)) + try: + if sender == self.tube.get_unique_name(): + return + remote = self.tube.get_object(sender, self.PATH) + remote.receive_value(self._myval, sender_keyword='sender', reply_handler=PassFunction, error_handler=PassFunction) + finally: + return + + @dbus.service.method(dbus_interface=IFACE, in_signature = 'v', out_signature='', sender_keyword = 'sender') + def receive_value(self, value, sender=None): + self._dict[sender] = self._trans(value, False) + + #Alternative implementation of a members_changed (not yet working) + """ + def members_changed(self, message, added, removed, local_pending, remote_pending, actor, reason): + added_names = self.tube.InspectHandles(telepathy.CONNECTION_HANDLE_TYPE_LIST, added) + for name in added_names: + self.tell_history(name) + """ + def members_changed(self, added, removed): + self._logger.debug("members_changed") + for (handle, name) in removed: + if name in self._dict: + del self._dict[name] + for (handle, name) in added: + self.tell_value(sender=name) + +class UnorderedString(UnorderedObject): + + def __init__(self,initstring=''): + self._tree = stringtree.SimpleStringTree() + self._listeners = [] + self._newbuffer = [] + if initstring: + self.insert(initstring, 0) + + def insert(self, text, pos): + x = self._tree.insert(text,pos) + if self.handler is not None: + self.handler.send(dbus.Array(stringtree.translator(i,True) for i in x)) + + def delete(self, k, n): + x = self._tree.delete(k,n) + if self.handler is not None: + self.handler.send(dbus.Array(stringtree.translator(i,True) for i in x)) + + def _net_update(self, L): + transformed_list = [] + self._newbuffer.append(L) + for li in self._newbuffer[::-1]: + if self._tree.is_ready(li[0]): #each update from the net is required to + #obey the rule that if the tree is ready for the first Change, + #then it is ready for all the changes. This may be a sort of + #violation of the Unordered abstraction... + for c in li: + transformed_list.extend(self._tree.add_change(c)) + self._newbuffer.pop() #Having handled the contents of li, we + #should make sure it doesn't come up for consideration again + self._trigger(transformed_list) + + def get_history(self): + return dbus.Array((stringtree.translator(c, True) + for c in self._tree.get_changes()), + signature = 'v') + + def add_history(self, msg): + L = [] + for el in msg: + change = stringtree.translator(el, False) + if change.unique_id not in self._tree._id2rec: + L.append(change) + if L: + self._net_update(L) + + receive_message = add_history + + def register_listener(self, L): + """Register a listener L(editlist). Every time another user modifies + the string, L will be called with a set of edits that represent those + changes on the local version of the string. Note that the edits must + be performed in order.""" + self._listeners.append(L) + + def _trigger(self, editlist): + for L in self._listeners: + L(editlist) + +class CausalTree(CausalObject): + #SET_PARENT and DELETE_NODE are opcodes to be sent over the wire, and also + #to the trigger. MAJOR_CHANGE is sent only to the trigger, and it is not + #an opcode. It represents a significant but undefined changed in the tree. + SET_PARENT = 0 + DELETE_NODE = 1 + CLEAR = 2 + MAJOR_CHANGE = -1 + + ROOT = 0 + + def __init__(self): + self._timeline = ListSet() + self._reverse = {} + self._listeners = [] + self._reset() + + def _reset(self): + self._parent = {} + self._children = {self.ROOT:set()} + + def __contains__(self, node): + return node in self._children + + def get_parent(self,node): + if node == self.ROOT: + return self.ROOT + else: + return self._parent[node] + + def get_children(self, node): + return frozenset(self._children[node]) + + def _process_local_cmd(self,cmd): + i = self.handler.get_index() + self._timeline.add((i,cmd)) + rev = self._step(cmd) + self._reverse[(i,cmd)] = rev + self.handler.send(self._cmd_trans(cmd,True),i) + + def change_parent(self,node,newparent): + if (node in self._parent) and (newparent in self._children): + if self._parent[node] != newparent: + cmd = (self.SET_PARENT, node, newparent) + self._process_local_cmd(cmd) + else: + raise KeyError("One or both nodes is not present") + + def new_child(self,parent): + node = random.getrandbits(64) + cmd = (self.SET_PARENT, node, parent) + self._process_local_cmd(cmd) + return node + + def delete(self,node): + if node == self.ROOT: + raise KeyError("You cannot delete the root node.") + if node not in self._children: + raise KeyError("No such node.") + cmd = (self.DELETE_NODE, node) + self._process_local_cmd(cmd) + + def clear(self): + cmd = (self.CLEAR,) + self._process_local_cmd(cmd) + + def _step(self, cmd): + # Returns () if the command failed or had no effect + # If the command succeeded, returns an iterable of the commands necessary + # to undo this command + if cmd[0] == self.SET_PARENT: + if cmd[2] in self._children: #if newparent is known + if cmd[1] in self._parent: #if node is known + if self._parent[cmd[1]] == cmd[2]: + return () #No change necessary. This SET_PARENT is redundant + if cmd[1] in self._allparents(cmd[2]): #if node is above newparent + #This command would create a loop. It is therefore illegal + #and should be ignored + return () + else: + #remove node from under its current parent + oldp = self._parent[cmd[1]] + self._children[oldp].remove(cmd[1]) + self._children[cmd[2]].add(cmd[1]) + self._parent[cmd[1]] = cmd[2] + return ((self.SET_PARENT, cmd[1], oldp),) + else: + #Node is unknown, so it must be added + self._children[cmd[1]] = set() + self._children[cmd[2]].add(cmd[1]) + self._parent[cmd[1]] = cmd[2] + return ((self.DELETE_NODE, cmd[1]),) #the command executed successfully + else: + #The new parent is unknown, so the command is illegal and should + #be ignored. + return () + elif cmd[0] == self.DELETE_NODE: + if cmd[1] == self.ROOT: + #Deleting the root node is not allowed, so this command is illegal and should be ignored + return () + if cmd[1] in self._children: + p = self._parent[cmd[1]] + self._children[p].remove(cmd[1]) + cmds = [(self.SET_PARENT, cmd[1], p)] + for c in self._children[cmd[1]]: + self._children[p].add(c) + self._parent[c] = p + cmds.append((self.SET_PARENT,c,cmd[1])) + del self._children[cmd[1]] + del self._parent[cmd[1]] + return cmds #The command completed successfully + else: + #cmd[1] is an unknown node, so this command should be ignored + return () + elif cmd[0] == self.CLEAR: + deleted = self._parent.keys() #relies on self.ROOT not being in _parent + cmds = [] + stack = [self.ROOT] + while len(stack) > 0: + n = stack.pop() + for c in self._children[n]: + cmds.append((self.SET_PARENT, c, n)) + stack.append(c) + self._reset() + return cmds + + def _allparents(self, node): + s = set() + while node != self.ROOT: + s.add(node) + node = self._parent[node] + s.add(self.ROOT) + return s + + def _cmd_trans(self,cmd,pack): + #This code does not completely specify the dbus typing because it avoids + #calling dbus.Struct. The tuple will be introspected. + if len(cmd) == 1: #CLEAR + return (self._instruction_trans(cmd[0],pack),) + if len(cmd) == 2: #DELETE_NODE + return (self._instruction_trans(cmd[0],pack), self.node_trans(cmd[1],pack)) + elif len(cmd) == 3: #SET_PARENT + return (self._instruction_trans(cmd[0],pack), + self.node_trans(cmd[1],pack), + self.node_trans(cmd[2],pack)) + + def _instruction_trans(self,ins,pack): + return int_translator(ins,pack) + + def node_trans(self,node,pack): + return uint_translator(node,pack) + + def register_listener(self, L): + self._listeners.append(L) + + def receive_message(self, cmd, i): + cmd = self._cmd_trans(cmd,False) + elt = (i, cmd) + if elt > self._timeline.last(): + self._timeline.add(elt) + s = self._step(cmd) + self._reverse[elt] = s + if s: + self._trigger((cmd,),s) + else: + (forward, reverse) = self._antestep((elt,)) + if forward: + self._trigger(forward, reverse) + + def _antestep(self, elts): + #_antestep accepts an iterable of (i, cmd)s that may have + # occurred at previous times. It incorporates these changes into the + # timeline and state. It also returns a two-element tuple: + # a list of cmds that would have the same effect as the inclusion of elts, and a + # list of cmds that would reverse this effect. + newelts = [e for e in elts if e not in self._timeline] + if len(newelts) == 0: + return (False, False) + affected = [e for e in self._timeline.tailset(newelts[0]) if self._reverse[e]] + rollback = [] + for l in affected[::-1]: + rollback.extend(self._reverse[l]) + for cmd in rollback: + self._step(cmd) + # We have now rolled back to the point where newelts[0] is inserted + self._timeline.update(newelts) + new_effective = [] + reversers = [] + for (i,cmd) in self._timeline.tailset(newelts[0]): + rev = self._step(cmd) + self._reverse[(i,cmd)] = rev + if rev: #If the command had any effect + reversers.append(rev) + new_effective.append(cmd) + reversers.reverse() + reversenew = [] + for l in reversers: + reversenew.extend(l) + forward = rollback + forward.extend(new_effective) + reverse = reversenew + reverse.extend(affected) + return (forward, reverse) + #This implementation is extremely suboptimal. An ideal implementation + #would use some knowledge about the commutativity of different commands + #to shorten forward and reverse substantially. As is, they will likely + #contain mostly redundant undo-and-then-redo. + + def get_history(self): + return dbus.Array( + (self.handler.index_trans(i,True), self._cmd_trans(cmd,True)) + for (i,cmd) in self._timeline) + + def add_history(self,h): + elts = ((self.handler.index_trans(i,False), self._cmd_trans(cmd,False)) + for (i,cmd) in h) + (forward, reverse) = self._antestep(elts) + if forward: + self._trigger(forward, reverse) + + def _trigger(self, info): + # info is either (added, removed, affected) if that info is available, + # or False if there has been a change but no info is available + for L in self._listeners: + L(info) diff --git a/groupthink/gtk_tools.py b/groupthink/gtk_tools.py new file mode 100644 index 0000000..0cd4029 --- /dev/null +++ b/groupthink/gtk_tools.py @@ -0,0 +1,338 @@ +import gtk +import groupthink_base as groupthink +import logging +import stringtree + +class RecentEntry(groupthink.UnorderedHandlerAcceptor, gtk.Entry): + """RecentEntry is an extension of gtk.Entry that, when attached to a group, + creates a unified Entry field for all participants""" + def __init__(self, *args, **kargs): + gtk.Entry.__init__(self, *args, **kargs) + self.logger = logging.getLogger('RecentEntry') + self.add_events(gtk.gdk.PROPERTY_CHANGE_MASK) + self._text_changed_handler = self.connect('changed', self._local_change_cb) + self._recent = groupthink.Recentest(self.get_text(), groupthink.string_translator) + self._recent.register_listener(self._remote_change_cb) + + def _local_change_cb(self, widget): + self.logger.debug("_local_change_cb()") + self._recent.set_value(self.get_text()) + + def set_handler(self, handler): + self.logger.debug("set_handler") + self._recent.set_handler(handler) + + def _remote_change_cb(self, text): + self.logger.debug("_remote_change_cb(%s)" % text) + if self.get_text() != text: + #The following code will break if running in any thread other than + #the main thread. I do not know how to make code that works with + #both multithreaded gtk _and_ single-threaded gtk. + self.handler_block(self._text_changed_handler) + self.set_text(text) + self.handler_unblock(self._text_changed_handler) + +class SharedTreeStore(groupthink.CausalHandlerAcceptor, gtk.GenericTreeModel): + def __init__(self, columntypes=(), translators=()): + self._columntypes = columntypes + self._causaltree = groupthink.CausalTree() + if len(translators) != 0 and len(translators) != len(columntypes): + raise #Error: translators must be empty or match columntypes in length + if len(translators) == len(self._columntypes): + self._columndicts = [groupthink.CausalDict( + key_translator = self._causaltree.node_trans, + value_translator = translators[i]) + for i in xrange(len(translators))] + else: + self._columndicts = [groupthink.CausalDict( + key_translator = self._causaltree.node_trans) + for i in xrange(len(translators))] + self._causaltree.register_listener(self._tree_listener) + for i in xrange(len(self._columndicts)): + self._columndicts[i].register_listener(self._generate_dictlistener(i)) + + def set_handler(self, handler): + self._causaltree.set_handler(handler) + for i in xrange(len(self._columndicts)): + #Make a new handler for each columndict + #Not very future-proof: how do we serialize out and reconstitute + #objects that GroupActivity.cloud is not even aware of? + h = handler.copy(str(i)) + self._columndicts[i].set_handler(h) + + ### Methods necessary to implement gtk.GenericTreeModel ### + + def on_get_flags(self): + return gtk.TREE_MODEL_ITERS_PERSIST + + def on_get_n_columns(self): + return len(self._columntypes) + + def on_get_column_type(self, index): + return self._columntypes[index] + + def on_get_iter(self, path): + node = self._causaltree.ROOT + for k in path: + c = list(self._causaltree.get_children(node)) + if len(c) <= k: + return None #Invalid path + else: + c.sort() + node = c[k] + return node + + def on_get_path(self, rowref): + revpath = [] + node = rowref + if rowref in self._causaltree: + while node != self._causaltree.ROOT: + p = self._causaltree.get_parent(node) + c = list(self._causaltree.get_children(p)) + c.sort() + revpath.append(c.index(node)) # could be done "faster" using bisect + node = p + return tuple(revpath[::-1]) + else: + return None + + def on_get_value(self, rowref, column): + return self._columndicts[column][rowref] + + def on_iter_next(self, rowref): + p = self._causaltree.get_parent(rowref) + c = list(self._causaltree.get_children(p)) + c.sort() + i = c.index(rowref) + 1 + if i < len(c): + return c[i] + else: + return None + + def on_iter_children(self, parent): + if parent is None: + parent = self._causaltree.ROOT + c = self._causaltree.get_children(parent) + if len(c) > 0: + return min(c) + else: + return None + + def on_iter_has_child(self, rowref): + return len(self._causaltree.get_children(rowref)) > 0 + + def on_iter_n_children(self, rowref): + return len(self._causaltree.get_children(rowref)) + + def on_iter_nth_child(self, parent, n): + if parent is None: + parent = self._causaltree.ROOT + c = self._causaltree.get_children(parent) + if len(c) > n: + c = list(c) + c.sort() + return c[n] + else: + return None + + def on_iter_parent(self, child): + p = self._causaltree.get_parent(child) + if p == self._causaltree.ROOT: + return None + else: + return p + + ### Methods for passing changes from remote users ### + + def _dict_listener(self, i, added, removed): + s = set() + s.update(added.keys()) + s.update(removed.keys()) + for node in s: + path = self.on_get_path(node) + if path is not None: + it = self.create_tree_iter(node) + self.row_changed(path, it) + self.emit('changed') + + def _generate_dict_listener(self, i): + def temp(added,removed): + self._dict_listener(i,added,removed) + return temp + + def _tree_listener(self, forward, reverse): + #forward is the list of commands representing the change, and + #reverse is the list representing their inverse. Together, these + #lists represent a total description of the change. However, deriving + #sufficient information to fill in the signals would require replicating + #the entire CausalTree state machine. Therefore, for the moment, we make only a modest + #attempt, and if it fails, throw up an "unknown-change" flag + deleted = set() #unused, since we can only safely handle a single deletion with this method + haschild = set() #All signals may be sent spuriously, but this one especially so + inserted = set() + unknown_change = False + # no reordered, since there is no ordering choice + + for cmd in forward: + if cmd[0] == self._causaltree.SET_PARENT: + if cmd[2] in self._causaltree: + haschild.add(cmd[2]) + else: + unknown_change = True + if cmd[1] in self._causaltree: + inserted.add(cmd[1]) + else: + unknown_change = True + for cmd in reverse: + clean = True + if cmd[0] == self._causaltree.SET_PARENT: + if (clean and + cmd[2] in self._causaltree and + (cmd[1] not in self._causaltree or + cmd[2] != self._causaltree.get_parent(cmd[1]))): + + clean = False + haschild.add((cmd[2], cmd[1])) + c = self._causaltree.get_children(cmd[2]) + c = list(c) + c.append(cmd[1]) + c.sort() + i = c.index(cmd[1]) + p = self.on_get_path(cmd[2]) + p = list(p) + p.append(i) + p = tuple(p) + self.row_deleted(p) + else: + unknown_change = True + if unknown_change: + self.emit('unknown-change') + for node in inserted: + path = self.on_get_path(node) + if path is not None: + it = self.create_tree_iter(node) + self.row_inserted(path, it) + for node in haschild: + path = self.on_get_path(node) + if path is not None: + it = self.create_tree_iter(node) + self.row_has_child_toggled(path, it) + self.emit('changed') + + ### Methods for resembling gtk.TreeStore ### + + def set_value(self, it, column, value): + node = self.get_user_data(it) + self._columndicts[i][node] = value + + def set(self, it, *args): + for i in xrange(0,len(args),2): + self.set_value(it,args[i],args[i+1]) + + def remove(self, it): + node = self.get_user_data(it) + self._causaltree.delete(node) + for d in self._columndicts: + if node in d: + del d[node] + + def append(self, parent, row=None): + if parent is not None: + node = self.get_user_data(it) + else: + node = self._causaltree.ROOT + node = self._causaltree.new_child(node) + if row is not None: + if len(row) != len(columndicts): + raise IndexError("row had the wrong length") + else: + for i in xrange(len(row)): + self._columndicts[i][node] = row[i] + return self.create_tree_iter(node) + + def is_ancestor(self, it, descendant): + node = self.get_user_data(it) + d = self.get_user_data(descendant) + d = self._causaltree.get_parent(d) + while d != self._causaltree.ROOT: + if d == node: + return True + else: + d = self._causaltree.get_parent(d) + return False + + def iter_depth(self, it): + node = self.get_user_data(it) + i = 0 + node = self._causaltree.get_parent(node) + while node != self._causaltree.ROOT: + i = i + 1 + node = self._causaltree.get_parent(node) + return i + + def clear(self): + self._causaltree.clear() + for d in self._columndicts: + d.clear() + + def iter_is_valid(self, it): + node = self.get_user_data(it) + return node in self._causaltree + + ### Additional Methods ### + def move(self, it, newparent): + node = self.get_user_data(row) + p = self.get_user_data(newparent) + self._causaltree.change_parent(node,p) + +class TextBufferUnorderedStringLinker: + def __init__(self,tb,us): + self._tb = tb + self._us = us + self._us.register_listener(self._netupdate_cb) + self._insert_handler = tb.connect('insert-text', self._insert_cb) + self._delete_handler = tb.connect('delete-range', self._delete_cb) + self._logger = logging.getLogger('the Linker') + + def _insert_cb(self, tb, itr, text, length): + self._logger.debug('user insert: %s' % text) + pos = itr.get_offset() + self._us.insert(text,pos) + + def _delete_cb(self, tb, start_itr, end_itr): + self._logger.debug('user delete') + k = start_itr.get_offset() + n = end_itr.get_offset()-k + self._us.delete(k,n) + + def _netupdate_cb(self, edits): + self._logger.debug('update from network: %s' % str(edits)) + self._tb.handler_block(self._insert_handler) + self._tb.handler_block(self._delete_handler) + for e in edits: + if isinstance(e, stringtree.Insertion): + itr = self._tb.get_iter_at_offset(e.position) + self._tb.insert(itr, e.text) + elif isinstance(e, stringtree.Deletion): + itr1 = self._tb.get_iter_at_offset(e.position) + itr2 = self._tb.get_iter_at_offset(e.position + e.length) + self._tb.delete(itr1,itr2) + self._tb.handler_unblock(self._insert_handler) + self._tb.handler_unblock(self._delete_handler) + + +class TextBufferSharePoint(groupthink.UnorderedHandlerAcceptor): + def __init__(self, buff): + self._us = groupthink.UnorderedString(buff.get_text(buff.get_start_iter(), buff.get_end_iter())) + self._linker = TextBufferUnorderedStringLinker(buff, self._us) + + def set_handler(self, handler): + self._us.set_handler(handler) + +class SharedTextView(groupthink.UnorderedHandlerAcceptor, gtk.TextView): + def __init__(self, *args, **kargs): + gtk.TextView.__init__(self, *args, **kargs) + self._link = TextBufferSharePoint(self.get_buffer()) + + def set_handler(self, handler): + self._link.set_handler(handler) diff --git a/groupthink/listset.py b/groupthink/listset.py new file mode 100644 index 0000000..cdb25ef --- /dev/null +++ b/groupthink/listset.py @@ -0,0 +1,783 @@ +""" +Copyright 2008 Benjamin M. Schwartz + +This file is LGPLv2+. + +listset.py is free software: you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation, either version 2 of the License, or +(at your option) any later version. + +DObject is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with DObject. If not, see . +""" + +import bisect +from collections import defaultdict + +""" +dobject_helpers is a collection of functions and data structures that are useful +to DObject, but are not specific to DBus or networked applications. +""" + +def merge(a, b, l=True, g=True, e=True): + """Internal helper function for combining sets represented as sorted lists""" + x = 0 + X = len(a) + if X == 0: + if g: + return list(b) + else: + return [] + y = 0 + Y = len(b) + if Y == 0: + if l: + return list(a) + else: + return [] + out = [] + p = a[x] + q = b[y] + while x < X and y < Y: + if p < q: + if l: out.append(p) + x += 1 + if x < X: p = a[x] + elif p > q: + if g: out.append(q) + y += 1 + if y < Y: q = b[y] + else: + if e: out.append(p) + x += 1 + if x < X: p = a[x] + y += 1 + if y < Y: q = b[y] + if x < X: + if l: out.extend(a[x:]) + else: + if g: out.extend(b[y:]) + return out + +def merge_or(a,b): + return merge(a,b, True, True, True) + +def merge_xor(a,b): + return merge(a, b, True, True, False) + +def merge_and(a,b): + return merge(a, b, False, False, True) + +def merge_sub(a,b): + return merge(a, b, True, False, False) + +def kill_dupes(a): #assumes a is sorted + """Internal helper function for removing duplicates in a sorted list""" + prev = a[0] + out = [prev] + for item in a: + if item != prev: #always throws out item 0, but that's ok + out.append(item) + prev = item + return out + +class Comparable: + """Currently, ListSet does not provide a mechanism for specifying a + comparator. Users who would like to specify a comparator other than the one + native to the item may do so by wrapping the item in a Comparable. + """ + def __init__(self, item, comparator): + self.item = item + self._cmp = comparator + + def __cmp__(self, other): + return self._cmp(self.item, other) + +class ListSet: + """ListSet is a sorted set for comparable items. It is inspired by the + Java Standard Library's TreeSet. However, it is implemented by a sorted + list. This implementation is much slower than a balanced binary tree, but + has the distinct advantage that I can actually implement it. + + The methods of ListSet are all drawn directly from Python's set API, + Python's list API, and Java's SortedSet API. + """ + def __init__(self, seq=[]): + L = list(seq) + if len(L) > 1: + L.sort() + L = kill_dupes(L) + self._list = L + + def __and__(self, someset): + if isinstance(someset, ListSet): + L = merge_and(self._list, someset._list) + else: + L = [] + for x in self._list: + if x in someset: + L.append(x) + a = ListSet() + a._list = L + return a + + def __contains__(self, item): + if not self._list: + return False + if self._list[0] <= item <= self._list[-1]: + a = bisect.bisect_left(self._list, item) + return item == self._list[a] + else: + return False + + def __eq__(self, someset): + if isinstance(someset, ListSet): + return self._list == someset._list + else: + return len(self.symmetric_difference(someset)) == 0 + + def __ge__(self, someset): + if isinstance(someset, ListSet): + return len(merge_or(self._list, someset._list)) == len(self._list) + else: + a = len(someset) + k = 0 + for i in self._list: + if i in someset: + k += 1 + return k == a + + def __gt__(self, someset): + return (len(self) > len(someset)) and (self >= someset) + + def __iand__(self, someset): + if isinstance(someset, ListSet): + self._list = merge_and(self._list, someset._list) + else: + L = [] + for i in self._list: + if i in someset: + L.append(i) + self._list = L + return self + + def __ior__(self, someset): + if isinstance(someset, ListSet): + self._list = merge_or(self._list, someset._list) + else: + self.update(someset) + return self + + def __isub__(self, someset): + if isinstance(someset, ListSet): + self._list = merge_sub(self._list, someset._list) + else: + L = [] + for i in self._list: + if i not in someset: + L.append(i) + self._list = L + return self + + def __iter__(self): + return iter(self._list) + + def __ixor__(self, someset): + if isinstance(someset, ListSet): + self._list = merge_xor(self._list, someset._list) + else: + self.symmetric_difference_update(someset) + return self + + def __le__(self, someset): + if isinstance(someset, ListSet): + return len(merge_or(self._list, someset._list)) == len(someset._list) + else: + for i in self._list: + if i not in someset: + return False + return True + + def __lt__(self, someset): + return (len(self) < len(someset)) and (self <= someset) + + def __ne__(self, someset): + return not (self == someset) + + def __len__(self): + return len(self._list) + + def __nonzero__(self): + #ugly, but faster than bool(self_list) + return not not self._list + + def __or__(self, someset): + a = ListSet() + if isinstance(someset, ListSet): + a._list = merge_or(self._list, someset._list) + else: + a._list = self._list + a.update(someset) + return a + + __rand__ = __and__ + + def __repr__(self): + return "ListSet(" + repr(self._list) +")" + + __ror__ = __or__ + + def __rsub__(self, someset): + if isinstance(someset, ListSet): + a = ListSet() + a._list = merge_sub(someset._list, self._list) + else: + a = ListSet(someset) + a._list = merge_sub(a._list, self._list) + return a + + def __sub__(self, someset): + a = ListSet() + if isinstance(someset, ListSet): + a._list = merge_sub(self._list, someset._list) + else: + L = [] + for i in self._list: + if i not in someset: + L.append(i) + a._list = L + return a + + def __xor__(self, someset): + if isinstance(someset, ListSet): + a = ListSet() + a._list = merge_xor(self._list, someset._list) + else: + a = self.symmetric_difference(someset) + return a + + __rxor__ = __xor__ + + def add(self, item): + a = bisect.bisect_left(self._list, item) + if (a == len(self._list)) or (self._list[a] != item): + self._list.insert(a, item) + + def clear(self): + self._list = [] + + def copy(self): + a = ListSet() + a._list = list(self._list) #shallow copy + return a + + def difference(self, iterable): + L = list(iterable) + L.sort() + a = ListSet() + a._list = merge_sub(self._list, kill_dupes(L)) + return a + + def difference_update(self, iterable): + L = list(iterable) + L.sort() + self._list = merge_sub(self._list, kill_dupes(L)) + + def discard(self, item): + if self._list and (item <= self._list[-1]): + a = bisect.bisect_left(self._list, item) + if self._list[a] == item: + self._list.remove(a) + + def intersection(self, iterable): + L = list(iterable) + L.sort() + a = ListSet() + a._list = merge_and(self._list, kill_dupes(L)) + + def intersection_update(self, iterable): + L = list(iterable) + L.sort() + self._list = merge_and(self._list, kill_dupes(L)) + + def issuperset(self, iterable): + L = list(iterable) + L.sort() + m = merge_or(self._list, kill_dupes(L)) + return len(m) == len(self._list) + + def issubset(self, iterable): + L = list(iterable) + L.sort() + L = kill_dupes(L) + m = merge_or(self._list, L) + return len(m) == len(L) + + def pop(self, i = None): + if i == None: + return self._list.pop() + else: + return self._list.pop(i) + + def remove(self, item): + if self._list and (item <= self._list[-1]): + a = bisect.bisect_left(self._list, item) + if self._list[a] == item: + self._list.remove(a) + return + raise KeyError("Item is not in the set") + + def symmetric_difference(self, iterable): + L = list(iterable) + L.sort() + a = ListSet() + a._list = merge_xor(self._list, kill_dupes(L)) + return a + + def symmetric_difference_update(self, iterable): + L = list(iterable) + L.sort() + self._list = merge_xor(self._list, kill_dupes(L)) + + def union(self, iterable): + L = list(iterable) + L.sort() + a = ListSet() + a._list = merge_or(self._list, kill_dupes(L)) + + def update(self, iterable): + L = list(iterable) + L.sort() + self._list = merge_or(self._list, kill_dupes(L)) + + def __getitem__(self, key): + if type(key) is int: + return self._list[key] + elif type(key) is slice: + a = ListSet() + L = self._list[key] + if key.step is not None and key.step < 0: + L.reverse() + a._list = L + return a + + def __delitem__(self, key): + del self._list[key] + + def index(self, x, i=0, j=-1): + if self._list and (x <= self._list[-1]): + a = bisect.bisect_left(self._list, x, i, j) + if self._list[a] == x: + return a + raise ValueError("Item not found") + + def position(self, x, i=0, j=-1): + return bisect.bisect_left(self._list, x, i, j) + + def _subrange(self, x, y, includehead=True, includetail=False, i=0, j=-1): + if includehead: + a = bisect.bisect_left(self._list, x, i, j) + else: + a = bisect.bisect_right(self._list, x, i, j) + if includetail: + b = bisect.bisect_right(self._list, y, a, j) + else: + b = bisect.bisect_left(self._list, y, a, j) + return (a, b) + + # From Java SortedSet + def subset(self, x, y, includehead=True, includetail=False, i=0, j=-1): + (a,b) = self._subrange(x, y, includehead, includetail, i, j) + s = ListSet() + s._list = self._list[a:b] + return s + + def iterslice(self, slic): + L = len(self._list) + return (self._list[i] for i in xrange(*slic.indices(L))) + + def subiter(self, x, y, includehead=True, includetail=False, i=0, j=-1): + (a,b) = self._subrange(x, y, includehead, includetail, i, j) + return (self._list[i] for i in xrange(a,b)) + + def first(self): + return self._list[0] + + def last(self): + return self._list[-1] + + def headset(self, x, include=False, i=0, j=-1): + if include: + a = bisect.bisect_right(self._list, x, i, j) + else: + a = bisect.bisect_left(self._list, x, i, j) + return self[:a] + + def tailset(self, x, include=True, i=0, j=-1): + if include: + a = bisect.bisect_left(self._list, x, i, j) + else: + a = bisect.bisect_right(self._list, x, i, j) + return self[a:] + + #From Java's NavigableSet + def ceiling(self, x, i=0, j=-1): + a = bisect.bisect_left(self._list, x, i, j) + return self[a] + + def floor(self, x, i=0, j=-1): + a = bisect.bisect_right(self._list, x, i, j) + return self[a-1] + + def higher(self, x, i=0, j=-1): + a = bisect.bisect_right(self._list, x, i, j) + return self[a] + + def lower(self, x, i=0, j=-1): + a = bisect.bisect_left(self._list, x, i, j) + return self[a-1] + +class ListDict: + """ListDict is a map whose keys are comparable. It is based on ListSet. + Its API is drawn from python's defaultdict and Java's SortedMap.""" + def __init__(self, *args, **kwargs): + self._dict = defaultdict(*args, **kwargs) + self._set = ListSet(self._dict) + + # Dict methods + + def __copy__(self): + return self.copy() + + def __repr__(self): + return 'ListDict({'+', '.join( + (': '.join((repr(k), repr(self._dict[k]))) + for k in self._set))+'})' + + def copy(self): + D = ListDict() + D._dict = self._dict.copy() + D._set = self._set.copy() + return D + + def __contains__(self, k): + return k in self._dict + + def __delitem__(self, k): + del self._dict[k] + self._set.remove(k) + + def __eq__(self, d): + if isinstance(d, ListDict): + return self._dict == d._dict + else: + return self._dict == d + + def __ge__(self, d): + if isinstance(d, ListDict): + return self._dict >= d._dict + else: + return self._dict >= d + + def __getitem__(self, k): + x = self._dict[k] + if self._dict.default_factory is not None: + self._set.add(k) + return x + + def __gt__(self, d): + if isinstance(d, ListDict): + return self._dict > d._dict + else: + return self._dict > d + + def __hash__(self): + return self._dict.__hash__() + + def __iter__(): + return self.iterkeys() + + def __le__(self, d): + if isinstance(d, ListDict): + return self._dict <= d._dict + else: + return self._dict <= d + + def __len__(self): + return len(self._dict) + + def __lt__(self, d): + if isinstance(d, ListDict): + return self._dict < d._dict + else: + return self._dict < d + + def __ne__(self, d): + if isinstance(d, ListDict): + return self._dict != d._dict + else: + return self._dict != d + + def __nonzero__(self): + return not not self._dict + + def __setitem__(self, k, v): + self._dict[k] = v + self._set.add(k) + + def clear(self): + self._dict.clear() + self._set.clear() + + def get(self, k, d=None): + return self._dict.get(k, d) + + def has_key(self, k): + return self._dict.has_key(k) + + def items(self, *args, **kwargs): + if not (args or kwargs): + return [(k, self._dict[k]) for k in self._set] + else: + return [(k, self._dict[k]) for k in self._set.subiter(*args, **kwargs)] + + def iteritems(self, *args, **kwargs): + if not (args or kwargs): + return ((k, self._dict[k]) for k in self._set) + else: + return ((k, self._dict[k]) for k in self._set.subiter(*args, **kwargs)) + + def iterkeys(self, *args, **kwargs): + if not (args or kwargs): + return iter(self._set) + else: + return self._set.subiter(*args, **kwargs) + + def itervalues(self, *args, **kwargs): + if not (args or kwargs): + return (self._dict[k] for k in self._set) + else: + return (self._dict[k] for k in self._set.subiter(*args, **kwargs)) + + def keys(self, *args, **kwargs): + if not (args or kwargs): + return self._set.copy() + else: + return self._set.subset(*args, **kwargs) + + def pop(self, *args): + present = args[0] in self._dict + v_or_d = self._dict.pop(*args) + if present: + self._set.remove(args[0]) + return v_or_d + + def popitem(self, i = None): + if self._dict: + k = self._set.pop(i) + return (k, self._dict.pop(k)) + else: + return self._dict.popitem() # Just to raise the appropriate KeyError + + def setdefault(self, k, x=None): + self._set.add(k) + return self._dict.setdefault(k, x) + + def update(self, E, **F): + #I'm not sure how to distinguish between dict-like and non-dict-like E + if isinstance(E, ListDict): + self._set |= E._set + self._dict.update(E._dict) + else: + try: + keys = E.keys() + self._set.update(keys) + self._dict.update(E) + except: + self._dict.update(E,**F) + self._set.update(self._dict) + + def values(self, *args, **kwargs): + if not (args or kwargs): + return [self._dict[k] for k in self._set] + else: + return [self._dict[k] for k in self._set.subiter(*args, **kwargs)] + + def fromkeys(*args): + return ListDict(dict.fromkeys(*args)) + + #SortedMap methods + def firstkey(self): + return self._set.first() + + def lastkey(self): + return self._set.last() + + def headdict(self, k, include=False, i=0, j=-1): + return self._copysubdict(self._set.headset(k, include, i, j)) + + def taildict(self, k, include=True, i=0, j=-1): + return self._copysubdict(self._set.tailset(k, include, i, j)) + + def subdict(self, fromkey, tokey, + includehead=True, includetail=False, i=0, j=-1): + return self._copysubdict(self._set.subset(fromkey, tokey, includehead, + includetail, i, j)) + + def _copysubdict(self, s): + L = ListDict() + L._set = s + L._dict.default_factory = self._dict.default_factory + for k in s: + L._dict[k] = self._dict[k] + return L + + #NavigableMap methods + def ceilingkey(self, k): + return self._set.ceiling(k) + + def floorkey(self, k): + return self._set.floor(k) + + def higherkey(self, k): + return self._set.higher(k) + + def lowerkey(self, k): + return self._set.lower(k) + + #ListSet methods + def index(self, k, i=0, j=-1): + return self._set.index(k, i, j) + + def position(self, k, i=0, j=-1): + return self._set.position(k, i, j) + + def nthkey(self, ind): + #ind can be an int or a slice + return self._set[ind] + + def nthvalue(self, ind): + if type(ind) is int: + return self._dict[self._set[ind]] + else: + return [self._dict[k] for k in self._set[ind]] + + def nthdict(self, ind): + s = self._set[ind] + if type(s) is not ListSet: + try: + s = ListSet(s) + except: + s = ListSet((s,)) + return self._copysubdict(s) + +class Overlap1D: + """Overlap1D is a structure for determining quickly whether two intervals + overlap.""" + + def __init__(self): + # _present is a dict of (position,set(objects)) pairs. Each key is + # the leftmost point of an object, and each value is the + # set of all objects present at that point + self._present = ListDict() + # _rightend is a dict of (position,set(objects)). Each key is the + # rightmost point of one or more objects, and each value is a set of + # only those objects that end at this point. + self._rightend = ListDict(set) + # _objects is a dict of (object, (left, right)). It remembers where + # objects start and stop. + self._objects = dict() + + def add(self, obj, left, right): + if (not self._present) or left < self._present.firstkey(): + self._present[left] = set((obj,)) + elif left not in self._present: + # We are adding a new marker to _present. Start with the nearest + # marker to the left of the new one. + prev = self._present[self._present.lowerkey(left)] + # and keep only the objects that are still present at the new + # location, i.e. whose rightmost point is further right than + # the leftmost point of this new object. We take a closed-left, + # open-right convention. + newsetgen = (o for o in prev if self._objects[o][1] > left) + self._present[left] = set(newsetgen) + + intermediates = self._present.itervalues(left, right) + for s in intermediates: + # add the object to each set that is inside its interval + s.add(obj) + self._objects[obj] = (left,right) + self._rightend[right].add(obj) + + def remove(self, obj): + (left, right) = self._objects.pop(obj) + intermediates = self._present.itervalues(left, right) + for s in intermediates: + s.remove(obj) + #boolean tests whether self._present[left] is an empty set() + if ((not self._present[left]) or + (self._present[left] <= self._present[self._present.lowerkey(left)])): + del self._present[left] + self._rightend[right].remove(obj) + if not self._rightend[right]: + del self._rightend[right] + + def overlaps(self, left, right, closed = False): + intermediates = self._present.itervalues(left, right) + outset = set() + for s in intermediates: + outset |= s + + if ((left not in self._present) and self._present and + (left > self._present.firstkey())): + preleft = self._present.floorkey(left) + prev = self._present[preleft] + newsetgen = (o for o in prev if self._objects[o][1] > left) + outset.update(newsetgen) + if closed: + if left in self._rightend: + outset |= self._rightend[left] + if right in self._present: + outset |= self.present[right] + return outset + + def collides(self, obj, closed=False): + (left, right) = self._objects[obj] + return self.overlaps(left, right, closed) + + def get_interval(self, obj): + return self._objects[obj] + +class Overlap2D: + def __init__(self): + self._x = Overlap1D() + self._y = Overlap1D() + + def add(self, obj, x1, x2, y1, y2): + self._x.add(obj, x1, x2) + self._y.add(obj, y1, y2) + + def remove(self, obj): + self._x.remove(obj) + self._y.remove(obj) + + def overlaps(self, x1, x2, y1, y2, closed = False): + xset = self._x.overlaps(x1,x2,closed) + yset = self._y.overlaps(y1,y2,closed) + return xset & yset + + def collides(self, obj, closed = False): + xset = self._x.collides(obj, closed) + yset = self._y.collides(obj, closed) + return xset & yset + + def get_rectangle(self, obj): + (x1, x2) = self._x.get_interval(obj) + (y1, y2) = self._y.get_interval(obj) + return (x1, x2, y1, y2) diff --git a/groupthink/stringtree.py b/groupthink/stringtree.py new file mode 100644 index 0000000..5701636 --- /dev/null +++ b/groupthink/stringtree.py @@ -0,0 +1,555 @@ +import random +random.seed() #Works around some mysterious bug in the Journal or Rainbow that + #causes the random number generator to be seeded with the same + #constant value if a Journal entry is re-launched from the + #Journal. +from collections import defaultdict +import dbus # We do dbus (de)serialization in this file to minimize abstraction breakage +import logging +import aatree + +inf = float('inf') + +class Change: + """Each Change represents a chanage to the StringTree. + """ + def __init__(self, unique_id, parent, edit): + """unique_id is a unique identifier for this Change + parent is the unique_id that is affected by this + Change. Parent unique_id are always associated with an Insertion edit. + edit is an Insertion, Deletion, or Removal. It's what is to be done to + the parent""" + self.unique_id = unique_id + self.parent = parent + self.edit = edit + + def __repr__(self): + return "Change(%d, %d, %s)" % (self.unique_id, self.parent, str(self.edit)) + +class Insertion: + """Represents the action of inserting a particular bit of text + """ + def __init__(self, position, text): + """position is the point at which to insert text, in the unmodified + coordinates of the parent node (other insertions and deletions + to that node do not affect these coordinates) + text is the string to insert""" + self.position = position + self.text = text + + def __repr__(self): + return "Insertion(%d, %s)" % (self.position, self.text) + +class Deletion: + """Represents the deletion of only those characters present in the parent in + this range. Characters that have been inserted into the parent by + an Insertion are not affected. + """ + def __init__(self, position, length): + """position is the point, in the unmodified coordinates of the parent + node, at which to start deletion + length is an integer greater than 0 representing the number of + characters to delete""" + self.position = position + self.length = length + + def __repr__(self): + return "Deletion(%d, %d)" % (self.position, self.length) + +class Removal: + """Represents the deletion of all characters ever inserted in this range. + Insertions at the endpoints are _not_ included in the Removal, and must + be Removed separately. + """ + def __init__(self, position, length): + """position is the point at which to start Removal + length is the number of points in the unmodified parent coordinate to + the end of the Removal. Note that many more than length characters + can be removed if there are Insertions in this range.""" + self.position = position + self.length = length + + def __repr__(self): + return "Removal(%d, %d)" % (self.position, self.length) + +class Record: + """Each Record is used to store one Change inside the StringTree. + The purpose of a Record is contain both the Change itself, and any cached + information about the current effect of that Change. + """ + def __init__(self, change, depth): + self.change = change + self.depth = depth + + def __str__(self): + return "Record(%s, %d)" % (str(self.change), self.depth) + + __repr__ = __str__ + +def flatten(L): + o = [] + for x in L: + if isinstance(x,list): + o.extend(flatten(x)) #recursive + else: + o.append(x) + return o + +def my_rand(): + return random.getrandbits(63) + +def translator(c, pack): + if pack: + if isinstance(c.edit, Insertion): + return dbus.Struct((dbus.Int64(c.unique_id), + dbus.Int64(c.parent), + dbus.Int16(0), + dbus.Int64(c.edit.position), + dbus.UTF8String(c.edit.text)), + signature='xxnxs') + elif isinstance(c.edit, Deletion): + return dbus.Struct((dbus.Int64(c.unique_id), + dbus.Int64(c.parent), + dbus.Int16(1), + dbus.Int64(c.edit.position), + dbus.Int64(c.edit.length)), + signature='xxnxx') + elif isinstance(c.edit, Removal): + return dbus.Struct((dbus.Int64(c.unique_id), + dbus.Int64(c.parent), + dbus.Int16(2), + dbus.Int64(c.edit.position), + dbus.Int64(c.edit.length)), + signature='xxnxx') + else: + raise Unimplemented + else: + if c[2] == 0: + ed = Insertion(int(c[3]),str(c[4])) + elif c[2] == 1: + ed = Deletion(int(c[3]),int(c[4])) + elif c[2] == 2: + ed = Removal(int(c[3]),int(c[4])) + else: + raise "unknown type" + return Change(int(c[0]), int(c[1]), ed) + +class EagerHideList: + """EagerHideList provides a list with hidden elements. The standard index + considers only the visible elements, but the 'all' index considers all + elements. The standard index of an invisible element is considered to be + the index of the next visible element.""" + def __init__(self): + self._sourcelist = [] + self._poslist = [] + self._posmap = {} + def __len__(self): + return len(self._sourcelist) + def __iter__(self): + return self._sourcelist.__iter__() + def __getitem__(self, s): + return self._sourcelist[s] + def index(self, item): + x = self._poslist[self._posmap[item]] + return x[2] + def hide(self, position, length): + """Given the position in _sourcelist, and the length of the deletion, + perform the deletion in the lists""" + pfirst = self._sourcelist[position] + plast = self._sourcelist[position+length-1] + ifirst = self._posmap[pfirst] + ilast = self._posmap[plast] + for i in xrange(ifirst, ilast+1): + L = self._poslist[i] + L[1] = False #No longer visible, if it was visible before + L[2] = position #collapse positions + for L in self._poslist[ilast+1:]: + L[2] -= length #move all subsequent character up by length + + del self._sourcelist[position:(position+length)] + #self._check_invariants() + def getitem_all(self, s): + if isinstance(s, int): + return self._poslist[s][0] + else: + return [x[0] for x in self._poslist[s]] + def index_all(self, item): + return self._posmap[item] + def is_visible(self, i): + return self._poslist[i][1] + def is_visible_item(self, item): + return self._poslist[self._posmap[item]][1] + def insert_sequence_all(self, position, sequence, visibility): + """Insert sequence with visibility into the all-coordinates at position""" + if position > 0: + psource = self._poslist[position][2] + else: + psource = 0 + length = len(sequence) + newlist = [] + newlistsource = [] + i = psource + for elt, viz in zip(sequence, visibility): + newlist.append([elt, viz, i]) + if viz: + newlistsource.append(elt) + i += 1 + self._poslist[position:position] = newlist + for i in xrange(position,position+length): + L = self._poslist[i] + self._posmap[L[0]] = i + num_viz = len(newlistsource) + for i in xrange(position+length,len(self._poslist)): + L = self._poslist[i] + L[2] += num_viz + self._posmap[L[0]] = i + self._sourcelist[psource:psource] = newlistsource + #self._check_invariants() + def insert_sequence_leftof(self, target, sequence, visibility): + self.insert_sequence_all(self._posmap[target], sequence, visibility) + + def _check_invariants(self): + assert len(self._posmap) == len(self._poslist) + for i in xrange(len(self._poslist)): + assert self._posmap[self._poslist[i][0]] == i + if self._poslist[i][1]: + assert self._sourcelist[self._poslist[i][2]] == self._poslist[i][0] + if i > 0: + if self._poslist[i-1][1]: + assert self._poslist[i-1][2] + 1 == self._poslist[i][2] + else: + assert self._poslist[i-1][2] == self._poslist[i][2] + +class SimpleStringTree: + """SimpleStringTree is a StringTree that supports only Insertions and + Deletions. Handling References, while valuable, has proven quite difficult, + and so will not be addressed by this data structure. + + Code for handling Removals will be left in for the moment, since it is + easy enough, even though it is not presently used.""" + + def __init__(self, initstring=""): + self.ROOT = Record(Change(-1, -1, Insertion(0, "")), 0) + self._id2rec = dict() #unique_id: Record(change) | change.unique_id = unique_id + self._id2rec[-1] = self.ROOT + self._parent2children = defaultdict(set) # unique_id: {Record(change) | change.parent == unique_id} + self._cursor = 0 + self._listing = aatree.AATreeHideList() + self._listing.insert_sequence_all(0,[(-1,0)],[False]) + if initstring: + self.insert(initstring, 0) + + def __repr__(self): + return "\n".join((str(v) for v in self._id2rec.values())) + + def getvalue(self, r = None): + if r is None: + r = self.ROOT + s = "".join(self._id2rec[x[0]].change.edit.text[x[1]] for x in self._listing) + return s + + def next(self): + raise + + def flush(self): + # This could be used to implement lazy evaluation of input by not + # re-evaluating the string until flush (or read?) + pass + + def close(self): + raise + + def read(self, size=float('inf')): + #Efficiency: This method should use size to avoid calling getvalue and rendering the entire string + s = self.getvalue() + outpoint = min(len(s), self._cursor + size) + inpoint = self._cursor + self._cursor = outpoint + return s[inpoint:outpoint] + + def readline(self, size=float('inf')): + #Efficiency: This method should use size to avoid rendering the whole string + s = self.getvalue() + outpoint = min(len(s), self._cursor + size) + inpoint = self._cursor + i = s.find("\n", inpoint, outpoint) + if i == -1 or i >= outpoint: + self._cursor = outpoint + return s[inpoint:outpoint] + else: + self._cursor = i + 1 + return s[inpoint:(i+1)] + + def readlines(self, sizehint=None): + #Efficiency: use sizehint + s = self.getvalue() + t = s[self._cursor:] + self._cursor = len(s) + return t.splitlines(True) + + def seek(self, offset, whence=0): + if whence == 0: + self._cursor = offset + elif whence == 1: + self._cursor += offset + elif whence == 2: + self._cursor = len(self._listing) + offset + + def tell(self): + return self._cursor + + def truncate(self, size=None): + if size is None: + size = self._cursor + return self.delete(size, len(self._listing) - size) + + def write(self, text): + L = min(len(self._listing) - self._cursor, len(text)) + changelist = [] + if L > 0: + changelist.extend(self.delete(self._cursor, L)) + changelist.extend(self.insert(text)) + return changelist + + def writelines(self, sequence): + s = "".join(sequence) + return self.write(s) + + # Non-filelike text editing methods + + def insert(self, text, k=None): + if k is None: + k = self._cursor + + if len(self._listing) == 0: + r = self.ROOT + uid = -1 + inspoint = 0 + elif k == 0: + (uid, inspoint) = self._listing[k] + r = self._id2rec[uid] + elif k < len(self._listing): + # When not inserting at the endpoints, we have to be sure to + # check if we are at the boundary between a parent and one of its + # descendants. If so, we must make our insertion in the descendant, + # not the parent, because the insertions would "conflict" (occur at + # the same location) in the parent, which would produce an ordering + # ambiguity, which will resolve in our favor only 50% of the time. + pL, pR = self._listing[(k-1):(k+1)] + (uidL, inspointL) = pL + (uidR, inspointR) = pR + rR = self._id2rec[uidR] + if uidL == uidR: #There's no boundary here at all (at least not one + # of any importance to us. Therefore, we have to insert to the + # left of the character at k, as usual. + r = rR + uid = uidR + inspoint = inspointR + else: #There's a boundary of some sort here. We always choose to + # insert at the deeper node (breaking left in case of a tie). + # (In the case that neither node is the ancestor of the other, + # either choice would be acceptable, regardless of depth. This + # logic is therefore acceptable, and has the advantage of being + # simple and fast.) + rL = self._id2rec[uidL] + if rR.depth > rL.depth: + r = rR + uid = uidR + inspoint = inspointR + else: + r = rL + uid = uidL + inspoint = inspointL + 1 + elif k == len(self._listing): + (uid,i) = self._listing[k-1] + r = self._id2rec[uid] + inspoint = i + 1 + else: + raise + + e = Insertion(inspoint, text) + c = Change(my_rand(), r.change.unique_id, e) + self._add_change_treeonly(c) + target = (uid, inspoint) + self._insert_listonly(c.unique_id, target, len(text)) + self._cursor = k + len(text) + return [c] + + def delete(self, k, n): + """Starting at a point k (0-indexed), delete n characters""" + if k + n > len(self._listing): + raise + p = self._listing[k] + contigs = [[p[0],p[1],p[1]]] + for (uid, index) in self._listing[(k+1):(k+n)]: + #This logic produces deletions that span any missing chunks. This + #produces a smaller number of deletions than making sure that they + #are actually "contiguous", but it might interact badly with a + #hypothetical undo system. + if contigs[-1][0] == uid: + contigs[-1][2] = index + else: + contigs.append([uid,index,index]) + changelist = [Change(my_rand(), c[0], Deletion(c[1],1 + c[2]-c[1])) for c in contigs] + for c in changelist: + self._add_change_treeonly(c) + self._delete_listonly(k,n) + return changelist + + def get_range(self, rec, point, m): + todo = [(rec, point, m, None)] # None is a dummy value since p is unused + ranges = [] + while len(todo) > 0: + (rec, point, m, p) = todo[0] + h = self._range_helper(point, m) + self._step(h, rec) + if h.outpoint is not None: + ranges.append((rec, h.point_parent, h.outpoint - h.point_parent)) + #print rec, h.point_parent, h.outpoint - h.point_parent + todo.extend(h.todo) + #print todo + del todo[0] + return ranges + + def move(self, rempoint, n, inspoint): + """In StringTree, move() should coherently copy a section of text, + such that any conflicting edits appear in the new location, not the old. + In SimpleStringTree, this is not possible, so move() just falls back to + Deletion and Insertion.""" + self.seek(rempoint) + t = self.read(n) + + if rempoint > inspoint: + L = self.delete(rempoint,n) + L.extend(self.insert(t, inspoint)) + else: + L = self.insert(t, inspoint) + L.extend(self.delete(rempoint,n)) + return L + + # Patch management methods + + def add_change(self, c): + if c.unique_id in self._id2rec: + return [] + if isinstance(c.edit, Insertion): + p = self._effective_parent(c.unique_id, c.parent, c.edit.position) + i = self._listing.index(p) + d = len(c.edit.text) + self._insert_listonly(c.unique_id, p, d) + flist = [Insertion(i, c.edit.text)] + elif isinstance(c.edit, Deletion): + flist = [] + start = None + end = None + for i in xrange(c.edit.position,c.edit.position + c.edit.length): + p = (c.parent, i) + if self._listing.is_visible_item(p): + q = self._listing.index(p) + if end == q: + end += 1 + else: + if end is not None: + n = end - start + flist.append(Deletion(start,n)) + self._delete_listonly(start,n) + start = q + end = start + 1 + if end is not None: # the last contig won't be handled by the loop + n = end - start + flist.append(Deletion(start,n)) + self._delete_listonly(start,n) + else: + raise + self._add_change_treeonly(c) + return flist + + def _effective_parent(self, uid, parentid, position): + """The effective parent of an insertion is defined as the (uid, loc) pair + that causes it to be inserted in the right location in the string. This + is only relevant in the event of a conflict, in which case the conflict + edits are required to be ordered from least uid to greatest uid. The + effective parent of a conflicted edit, then, is either the original pair, + or (u_next, 0), where u_next is the uid of the sibling directly to the right + of the input uid. That is to say, it is the least u greater than uid + that has the same position.""" + if parentid in self._parent2children: + u_next = None + for r in self._parent2children[parentid]: + u = r.change.unique_id + p = r.change.edit.position + if (p == position) and isinstance(r.change.edit, Insertion) and (u > uid): + if (u_next is None) or (u < u_next): + u_next = u + if u_next is not None: + return (u_next, 0) + return (parentid, position) + + def _delete_listonly(self, position, length): + """Given the position in _sourcelist, and the length of the deletion, + perform the deletion in the lists""" + self._listing.hide(position,length) + + def _insert_listonly(self, uid, target, length): + """Make a new insertion into the lists with uid and length at position + in _poslist""" + elts = [(uid,i) for i in xrange(length+1)] + visibility = [True] * length + visibility.append(False) + self._listing.insert_sequence_leftof(target, elts, visibility) + + def _add_change_treeonly(self, c): + if c.unique_id in self._id2rec: + return + d = self._id2rec[c.parent].depth + 1 + r = Record(c,d) + self._id2rec[c.unique_id] = r + if c.parent not in self._parent2children: + self._parent2children[c.parent] = set() + self._parent2children[c.parent].add(r) + + def get_insert(self, k, n = 0, parent=None): + #FIXME: This method could be useful, but it no longer works + """get_insert finds and returns the youngest insertion containing positions + k to k + n in the total coordinates of parent. If parent is unspecified, + then it is taken to be the root, meaning the coordinates in question + are the current document coordinates. + + The return value is a tuple (rec, k_unmodified, k_modified), meaning + the record containing k and k + n, the position of k in rec's + unmodified coordinates, and the position of k in rec's + modified coordinates. + + "containing" is defined so that a record with n characters + (labeled 0...n-1) can still be returned as containing k...k+n. In + other words, each Insertion contains both endpoints. However, an + Insertion that has been totally deleted is ignored entirely.""" + if parent is None: + parent = self.ROOT + + h = self._get_insert_helper(k, n) + self._step(h, parent) + while h.child is not None: + parent = h.child + h = self._get_insert_helper(h.child_k, n) + self._step(h, parent) + + return (parent, h.k_in_parent, h.k) + + def get_changes(self): #TODO: add arguments to get only changes in a certain range + """get_changes provides a depth-first topologically sorted list of all + Changes in this Tree""" + L = [] + stack = [-1] + while stack: + x = self._id2rec[stack.pop()].change + L.append(x) + if x.unique_id in self._parent2children: + stack.extend(r.change.unique_id + for r in self._parent2children[x.unique_id]) + return L + + def is_ready(self, c): + """Returns a boolean indicating whether a Change c may safely be added + to the Tree. Specifically, it checks whether c.parent is already known.""" + return c.parent in self._id2rec diff --git a/groupthink/sugar_tools.py b/groupthink/sugar_tools.py new file mode 100644 index 0000000..0292a0b --- /dev/null +++ b/groupthink/sugar_tools.py @@ -0,0 +1,288 @@ +# Copyright 2007 Collabora Ltd. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging +import telepathy + +from sugar.activity.activity import Activity, ActivityToolbox +from sugar.presence import presenceservice + +from sugar.presence.tubeconn import TubeConnection +from sugar.graphics.window import Window + +import gtk +import gobject + +import groupthink_base as groupthink + +def exhaust_event_loop(): + while gtk.events_pending(): + gtk.main_iteration() + +class GroupActivity(Activity): + + message_preparing = "Preparing user interface" + message_loading = "Loading object from Journal" + message_joining = "Joining shared activity" + + """Abstract Class for Activities using Groupthink""" + def __init__(self, handle): + # self.initiating indicates whether this instance has initiated sharing + # it always starts false, but will be set to true if this activity + # initiates sharing. In particular, if Activity.__init__ calls + # self.share(), self.initiating will be set to True. + self.initiating = False + # self._processed_share indicates whether when_shared() has been called + self._processed_share = False + # self.initialized tracks whether the Activity's display is up and running + self.initialized = False + + self.early_setup() + + super(GroupActivity, self).__init__(handle) + self.dbus_name = self.get_bundle_id() + self.logger = logging.getLogger(self.dbus_name) + + self._handle = handle + + ##gobject.threads_init() + + self._sharing_completed = not self._shared_activity + self._readfile_completed = not handle.object_id + if self._shared_activity: + self.message = self.message_joining + elif handle.object_id: + self.message = self.message_loading + else: + self.message = self.message_preparing + + # top toolbar with share and close buttons: + toolbox = ActivityToolbox(self) + self.set_toolbox(toolbox) + toolbox.show() + + v = gtk.VBox() + self.startup_label = gtk.Label(self.message) + v.pack_start(self.startup_label) + Window.set_canvas(self,v) + self.show_all() + + # The show_all method queues up draw events, but they aren't executed + # until the mainloop has a chance to process them. We want to process + # them immediately, because we need to show the waiting screen + # before the waiting starts, not after. + exhaust_event_loop() + # exhaust_event_loop() provides the possibility that write_file could + # be called at this time, so write_file is designed to trigger read_file + # itself if that path occurs. + + self.tubebox = groupthink.TubeBox() + self.timer = groupthink.TimeHandler("main", self.tubebox) + self.cloud = groupthink.Group(self.tubebox) + # self.cloud is extremely important. It is the unified reference point + # that contains all state in the system. Everything else is stateless. + # self.cloud has to be defined before the call to self.set_canvas, because + # set_canvas can trigger almost anything, including pending calls to read_file, + # which relies on self.cloud. + + # get the Presence Service + self.pservice = presenceservice.get_instance() + # Buddy object for you + owner = self.pservice.get_owner() + self.owner = owner + + self.connect('shared', self._shared_cb) + self.connect('joined', self._joined_cb) + if self.get_shared(): + if self.initiating: + self._shared_cb(self) + else: + self._joined_cb(self) + + self.add_events(gtk.gdk.VISIBILITY_NOTIFY_MASK) + self.connect("visibility-notify-event", self._visible_cb) + self.connect("notify::active", self._active_cb) + + if not self._readfile_completed: + self.read_file(self._jobject.file_path) + elif not self._shared_activity: + gobject.idle_add(self._initialize_cleanstart) + + def _initialize_cleanstart(self): + self.initialize_cleanstart() + self._initialize_display() + return False + + def initialize_cleanstart(self): + """Any subclass that needs to take any extra action in the case where + the activity is launched locally without a sharing context or input + file should override this method""" + pass + + def early_setup(self): + """Any subclass that needs to take an action before any external interaction + (e.g. read_file, write_file) occurs should place that code in early_setup""" + pass + + def _initialize_display(self): + main_widget = self.initialize_display() + Window.set_canvas(self, main_widget) + self.initialized = True + if self._shared_activity and not self._processed_share: + # We are joining a shared activity, but when_shared has not yet + # been called + self.when_shared() + self._processed_share = True + self.show_all() + + def initialize_display(self): + """All subclasses must override this method, in order to display + their GUI using self.set_canvas()""" + raise NotImplementedError + + def share(self, private=False): + """The purpose of this function is solely to permit us to determine + whether share() has been called. This is necessary because share() may + be called during Activity.__init__, and thus emit the 'shared' signal + before we have a chance to connect any signal handlers.""" + self.initiating = True + super(GroupActivity, self).share(private) + if self.initialized and not self._processed_share: + self.when_shared() + self._processed_share = True + + def when_shared(self): + """Inheritors should override this method to perform any special + operations when the user shares the session""" + pass + + def _shared_cb(self, activity): + self.logger.debug('My activity was shared') + self.initiating = True + self._sharing_setup() + + self.logger.debug('This is my activity: making a tube...') + id = self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube( + self.dbus_name, {}) + + def _sharing_setup(self): + if self._shared_activity is None: + self.logger.error('Failed to share or join activity') + return + + self.conn = self._shared_activity.telepathy_conn + self.tubes_chan = self._shared_activity.telepathy_tubes_chan + self.text_chan = self._shared_activity.telepathy_text_chan + + self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube', + self._new_tube_cb) + + def _list_tubes_reply_cb(self, tubes): + self.logger.debug('Got %d tubes from ListTubes' % len(tubes)) + for tube_info in tubes: + self._new_tube_cb(*tube_info) + + def _list_tubes_error_cb(self, e): + self.logger.error('ListTubes() failed: %s', e) + + def _joined_cb(self, activity): + if not self._shared_activity: + return + + self.logger.debug('Joined an existing shared activity') + self.initiating = False + self._sharing_setup() + + self.logger.debug('This is not my activity: waiting for a tube...') + self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes( + reply_handler=self._list_tubes_reply_cb, + error_handler=self._list_tubes_error_cb) + + def _new_tube_cb(self, id, initiator, type, service, params, state): + self.logger.debug('New tube: ID=%d initator=%d type=%d service=%s ' + 'params=%r state=%d', id, initiator, type, service, + params, state) + if (type == telepathy.TUBE_TYPE_DBUS and + service == self.dbus_name): + if state == telepathy.TUBE_STATE_LOCAL_PENDING: + self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id) + tube_conn = TubeConnection(self.conn, + self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES], + id, group_iface=self.text_chan[telepathy.CHANNEL_INTERFACE_GROUP]) + self.tubebox.insert_tube(tube_conn, self.initiating) + self._sharing_completed = True + if self._readfile_completed and not self.initialized: + self._initialize_display() + + def read_file(self, file_path): + self.cloud.loads(self.load_from_journal(file_path)) + self._readfile_completed = True + if self._sharing_completed and not self.initialized: + self._initialize_display() + pass + + def load_from_journal(self, file_path): + """This implementation of load_from_journal simply returns the contents + of the file. Any inheritor overriding this method must return the + string provided to save_to_journal as cloudstring.""" + if file_path: + f = file(file_path,'rb') + s = f.read() + f.close() + return s + + def write_file(self, file_path): + # There is a possibility that the user could trigger a write_file + # action before read_file has occurred. This could be dangerous, + # potentially overwriting the journal entry with blank state. To avoid + # this, we ensure that read_file has been called (if there is a file to + # read) before writing. + if not self._readfile_completed: + self.read_file(self._jobject.file_path) + self.save_to_journal(file_path, self.cloud.dumps()) + + def save_to_journal(self, file_path, cloudstring): + """This implementation of save_to_journal simply dumps the output of + self.cloud.dumps() to disk. Any inheritor who wishes to control file + output should override this method, and must + be sure to include cloudstring in its write_file.""" + f = file(file_path, 'wb') + f.write(cloudstring) + f.close() + + def _active_cb(self, widget, event): + self.logger.debug("_active_cb") + if self.props.active: + self.resume() + else: + self.pause() + + def _visible_cb(self, widget, event): + self.logger.debug("_visible_cb") + if event.state == gtk.gdk.VISIBILITY_FULLY_OBSCURED: + self.pause() + else: + self.resume() + + def pause(self): + """Subclasses should override this function to stop updating the display + since it is not visible.""" + pass + + def resume(self): + """Subclasses should override this function to resume updating the + display, since it is now visible""" + pass -- cgit v0.9.1