Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/groupthink/groupthink_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'groupthink/groupthink_base.py')
-rw-r--r--groupthink/groupthink_base.py1727
1 files changed, 1727 insertions, 0 deletions
diff --git a/groupthink/groupthink_base.py b/groupthink/groupthink_base.py
new file mode 100644
index 0000000..ca7d6a6
--- /dev/null
+++ b/groupthink/groupthink_base.py
@@ -0,0 +1,1727 @@
+"""
+Copyright 2008 Benjamin M. Schwartz
+
+DOBject is LGPLv2+
+
+DObject is free software: you can redistribute it and/or modify
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation, either version 2 of the License, or
+(at your option) any later version.
+
+DObject is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License
+along with DObject. If not, see <http://www.gnu.org/licenses/>.
+"""
+import dbus
+import dbus.service
+import dbus.gobject_service
+import time
+import logging
+import threading
+import thread
+import random
+from listset import ListSet
+import stringtree
+import cPickle
+import dbus_tools
+"""
+DObject is a library of components useful for constructing distributed
+applications that need to maintain coherent state while communicating over
+Telepathy. The DObject tools are design to handle unexpected joins, leaves,
+splits, and merges automatically, and always to leave each connected component
+of users in a coherent state at quiescence.
+"""
+
+def PassFunction(*args,**kargs):
+ logging.debug("args=%s, kargs=%s" % (str(args),str(kargs)))
+ pass
+
+def ReturnFunction(x):
+ return x
+
+class Group:
+ """A Group is a simple tool for organizing DObjects. Once it is set up
+ with a tubebox, the user may simply add objects to it, e.g.
+
+ self.group = Group(tb)
+ ...
+ self.group['mydict1'] = HighScore('No one', 0)
+
+ and the group will take care of assigning a handler to the object with
+ the specified name.
+ For a Group g, g['a'] is equivalent in almost all ways to g.a, for
+ programmer convenience.
+ """
+
+ tubebox = None
+ _locked = False
+ _d = None
+
+ def __init__(self, tubebox):
+ self._logger = logging.getLogger('groupthink.Group')
+ self._logger.debug('new Group')
+ self.tubebox = tubebox
+ self._d = dict()
+ self._history = dict()
+ self._handlers = dict()
+ self._locked = True
+
+ def __setitem__(self, name, dobj):
+ self._logger.debug("setitem(%s,%s)" % (name, str(dobj)))
+ if name in self.__dict__ or name in self._d:
+ raise #Cannot replace an existing attribute or object
+ h = dobj.HANDLER_TYPE(name, self.tubebox)
+ dobj.set_handler(h)
+ self.add_handler(h, dobj)
+
+ def add_handler(self, h, o=None):
+ """This function is used to add a handler to the Group _after_ that
+ handler has already been registered to completion with its object."""
+ name = h.get_name()
+ self._handlers[name] = h
+ if name in self._history:
+ h.object.add_history(self._history[name])
+ del self._history[name]
+ if o is not None:
+ self._d[name] = o
+ else:
+ self._d[name] = h.object
+ for hc in h.get_copies(): #Recurse through a potential tree of handlers
+ self.add_handler(hc)
+
+ def __setattr__(self, name, val):
+ if self._locked:
+ self.__setitem__(name, val)
+ else:
+ self.__dict__[name] = val
+
+ def __getitem__(self, name):
+ if name in self._d:
+ return self._d[name]
+ else:
+ return self.__dict__[name]
+
+ __getattr__ = __getitem__
+
+ def __delattr__(self, name):
+ raise #Deletion is not supported
+
+ def dumps(self):
+ d = {}
+ for (name, handler) in self._handlers.iteritems():
+ d[name] = dbus_tools.undbox(handler.object.get_history())
+ d.update(self._history) #Include any "unclaimed history" thus far.
+ return cPickle.dumps(d)
+
+ def loads(self, s):
+ if s:
+ d = cPickle.loads(s)
+ for (name,hist) in d.iteritems():
+ if name in self._d:
+ handler = self._handlers[name]
+ handler.object.add_history(hist)
+ else:
+ self._history[name] = hist
+
+class TubeBox:
+ """ A TubeBox is a box that either contains a Tube or does not.
+ The purpose of a TubeBox is to solve this problem: Activities are not
+ provided with the sharing Tube until they are shared, but DObjects should
+ not have to care whether or not they have been shared. That means that the
+ DObject handler must know whether or not a Tube has been provided. This
+ could be implemented within the handlers, but then the Activity's sharing
+ code would have to know a list of all DObject handlers.
+
+ Instead, the sharing code just needs to create a TubeBox and pass it to the
+ code that creates handlers. Once the tube arrives, it can be added to the
+ TubeBox with insert_tube. The handlers will then be notified automatically.
+ """
+ def __init__(self):
+ self.tube = None
+ self.is_initiator = None
+ self._logger = logging.getLogger()
+ self._listeners = []
+
+ def register_listener(self, L):
+ """This method is used by the DObject handlers to add a callback
+ function that will be called after insert_tube"""
+ self._listeners.append(L)
+ if self.tube is not None:
+ L(self.tube, self.is_initiator)
+
+ def insert_tube(self, tube, is_initiator=False):
+ """This method is used by the sharing code to provide the tube, once it
+ is ready, along with a boolean indicating whether or not this computer
+ is the initiator (who may have special duties, as the first
+ participant)."""
+ self._logger.debug("insert_tube, notifying %s" % str(self._listeners))
+ self.tube = tube
+ self.is_initiator = is_initiator
+ for L in self._listeners:
+ L(tube, is_initiator)
+
+class TimeHandler(dbus.gobject_service.ExportedGObject):
+ """A TimeHandler provides a universal clock for a sharing instance. It is a
+ sort of cheap, decentralized synchronization system. The TimeHandler
+ determines the offset between local time and group time by sending a
+ broadcast and accepting the first response, and assuming that both transfer
+ displays were equal. The initiator's offset is 0.0, but once another group
+ member has synchronized, the initiator can leave and new members will still
+ be synchronized correctly. Errors at each synchronization are typically
+ between 0.1s and 2s.
+
+ TimeHandler is not perfectly resilient to disappearances. If the group
+ splits, and one of the daughter groups does not contain any members that
+ have had a chance to synchronize, then they will not sync to each other. I
+ am not yet aware of any sensible synchronization system that avoids this
+ problem.
+ """
+ IFACE = "org.dobject.TimeHandler"
+ BASEPATH = "/org/dobject/TimeHandler/"
+
+ def __init__(self, name, tube_box, offset=0.0):
+ self.PATH = TimeHandler.BASEPATH + name
+ dbus.gobject_service.ExportedGObject.__init__(self)
+ self._logger = logging.getLogger(self.PATH)
+ self._tube_box = tube_box
+ self.tube = None
+ self.is_initiator = None
+
+ self.offset = offset
+ self._know_offset = False
+ self._offset_lock = threading.Lock()
+
+ self._tube_box.register_listener(self.get_tube)
+
+ def get_tube(self, tube, is_initiator):
+ """Callback for the TubeBox"""
+ self._logger.debug("get_tube")
+ self._logger.debug(str(is_initiator))
+ self.tube = tube
+ self.add_to_connection(self.tube, self.PATH)
+ self.is_initiator = is_initiator
+ self._know_offset = is_initiator
+ self.tube.add_signal_receiver(self.tell_time, signal_name='What_time_is_it', dbus_interface=TimeHandler.IFACE, sender_keyword='sender', path=self.PATH)
+
+ if not self._know_offset:
+ self.ask_time()
+
+ def time(self):
+ """Get the group time"""
+ return time.time() + self.offset
+
+ def get_offset(self):
+ """Get the difference between local time and group time"""
+ self._logger.debug("get_offset " + str(self.offset))
+ return self.offset
+
+ def set_offset(self, offset):
+ """Set the difference between local time and group time, and assert that
+ this is correct"""
+ self._logger.debug("set_offset " + str(offset))
+ self._offset_lock.acquire()
+ self.offset = offset
+ self._know_offset = True
+ self._offset_lock.release()
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='d')
+ def What_time_is_it(self, asktime):
+ return
+
+ def ask_time(self):
+ self._logger.debug("ask_time")
+ self.What_time_is_it(time.time())
+
+ def tell_time(self, asktime, sender=None):
+ self._logger.debug("tell_time")
+ start_time = time.time()
+ try:
+ my_name = self.tube.get_unique_name()
+ if sender == my_name:
+ return
+ if self._know_offset:
+ self._logger.debug("telling offset")
+ remote = self.tube.get_object(sender, self.PATH)
+ start_time += self.offset
+ remote.receive_time(asktime, start_time, time.time() + self.offset, reply_handler=PassFunction, error_handler=PassFunction)
+ finally:
+ return
+
+ @dbus.service.method(dbus_interface=IFACE, in_signature='ddd', out_signature='')
+ def receive_time(self, asktime, start_time, finish_time):
+ self._logger.debug("receive_time")
+ rtime = time.time()
+ thread.start_new_thread(self._handle_incoming_time, (asktime, start_time, finish_time, rtime))
+
+ def _handle_incoming_time(self, ask, start, finish, receive):
+ self._offset_lock.acquire()
+ if not self._know_offset:
+ self.offset = ((start + finish)/2) - ((ask + receive)/2)
+ self._know_offset = True
+ self._offset_lock.release()
+
+class UnorderedHandler(dbus.gobject_service.ExportedGObject):
+ """The UnorderedHandler serves as the interface between a local UnorderedObject
+ (a pure python entity) and the d-bus/network system. Each UnorderedObject
+ is associated with a single Handler, and vice-versa. It is the Handler that
+ is actually exposed over D-Bus. The purpose of this system is to minimize
+ the amount of networking code required for each additional UnorderedObject.
+ """
+ IFACE = "org.dobject.Unordered"
+ BASEPATH = "/org/dobject/Unordered/"
+
+ def __init__(self, name, tube_box):
+ """To construct a UO, the program must provide a name and a TubeBox.
+ The name is used to identify the UO; all UO with the same name on the
+ same Tube should be considered views into the same abstract distributed
+ object."""
+ self._myname = name
+ self.PATH = UnorderedHandler.BASEPATH + name
+ dbus.gobject_service.ExportedGObject.__init__(self)
+ self._logger = logging.getLogger(self.PATH)
+ self._tube_box = tube_box
+ self.tube = None
+ self._copies = []
+
+ self.object = None
+ self._tube_box.register_listener(self.set_tube)
+
+ def set_tube(self, tube, is_initiator):
+ self._logger.debug("set_tube(), is_initiator=%s" % str(is_initiator))
+ """Callback for the TubeBox"""
+ self.tube = tube
+ self.add_to_connection(self.tube, self.PATH)
+
+ self.tube.add_signal_receiver(self.receive_message, signal_name='send', dbus_interface=UnorderedHandler.IFACE, sender_keyword='sender', path=self.PATH)
+ self.tube.add_signal_receiver(self.tell_history, signal_name='ask_history', dbus_interface=UnorderedHandler.IFACE, sender_keyword='sender', path=self.PATH)
+
+ # We need watch_participants because of the case in which several groups
+ # all having made changes, come together and need to update each other.
+ # There is no easy way to make this process more efficient without
+ # changing the Unordered interface dramatically to include per-message
+ # labels of some kind.
+ self.tube.watch_participants(self.members_changed)
+
+ #Alternative implementation of members_changed (not yet working)
+ #self.tube.add_signal_receiver(self.members_changed, signal_name="MembersChanged", dbus_interface="org.freedesktop.Telepathy.Channel.Interface.Group")
+
+ if self.object is not None:
+ self.ask_history()
+
+ def register(self, obj):
+ self._logger.debug("register(%s)" % str(obj))
+ """This method registers obj as the UnorderedObject being managed by
+ this Handler. It is called by obj after obj has initialized itself."""
+ self.object = obj
+ if self.tube is not None:
+ self.ask_history()
+
+ def get_path(self):
+ """Returns the DBus path of this handler. The path is the closest thing
+ to a unique identifier for each abstract DObject."""
+ return self.PATH
+
+ def get_tube(self):
+ """Returns the TubeBox used to create this handler. This method is
+ necessary if one DObject wishes to create another."""
+ return self._tube_box
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='v')
+ def send(self, message):
+ self._logger.debug("send(%s)" % str(message))
+ """This method broadcasts message to all other handlers for this UO"""
+ return
+
+ def receive_message(self, message, sender=None):
+ self._logger.debug("receive_message(%s)" % str(message))
+ if self.object is None:
+ self._logger.error("got message before registration")
+ elif sender == self.tube.get_unique_name():
+ self._logger.debug("Ignoring message, because I am the sender.")
+ else:
+ self.object.receive_message(message)
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='')
+ def ask_history(self):
+ self._logger.debug("ask_history()")
+ return
+
+ def tell_history(self, sender=None):
+ self._logger.debug("tell_history to " + str(sender))
+ try:
+ if sender == self.tube.get_unique_name():
+ self._logger.debug("tell_history aborted because I am" + str(sender))
+ return
+ if self.object is None:
+ self._logger.error("object not registered before tell_history")
+ return
+ self._logger.debug("getting proxy object")
+ remote = self.tube.get_object(sender, self.PATH)
+ self._logger.debug("got proxy object, getting history")
+ h = self.object.get_history()
+ self._logger.debug("got history, initiating transfer")
+ remote.receive_history(h, reply_handler=PassFunction, error_handler=PassFunction)
+ self._logger.debug("history transfer initiated")
+ except Exception, E:
+ self._logger.debug("tell_history failed: " % repr(E))
+ finally:
+ return
+
+ @dbus.service.method(dbus_interface=IFACE, in_signature = 'v', out_signature='')
+ def receive_history(self, hist):
+ self._logger.debug("receive_history(%s)" % str(hist))
+ if self.object is None:
+ self._logger.error("object not registered before receive_history")
+ return
+ self.object.add_history(hist)
+
+ #Alternative implementation of a members_changed (not yet working)
+ """
+ def members_changed(self, message, added, removed, local_pending, remote_pending, actor, reason):
+ added_names = self.tube.InspectHandles(telepathy.CONNECTION_HANDLE_TYPE_LIST, added)
+ for name in added_names:
+ self.tell_history(name)
+ """
+ def members_changed(self, added, removed):
+ self._logger.debug("members_changed")
+ for (handle, name) in added:
+ self.tell_history(sender=name)
+
+ def __repr__(self):
+ return 'UnorderedHandler(' + self._myname + ', ' + repr(self._tube_box) + ')'
+
+ def copy(self, name):
+ """A convenience function for returning a new UnorderedHandler derived
+ from this one, with a new name. This is safe as long as copy() is called
+ with a different name every time."""
+ h = UnorderedHandler(self._myname + "/" + name, self._tube_box)
+ self._copies.append(h)
+ return h
+
+ def get_copies(self):
+ return self._copies
+
+ def get_name(self):
+ return self._myname
+
+class HandlerAcceptor:
+ HANDLER_TYPE = NotImplementedError
+ def set_handler(self, handler):
+ raise NotImplementedError
+
+class UnorderedHandlerAcceptor(HandlerAcceptor):
+ HANDLER_TYPE = UnorderedHandler
+
+class UnorderedObject(UnorderedHandlerAcceptor):
+ """ The most basic DObject is the Unordered Object (UO). A UO has the
+ property that any changes to its state can be encapsulated as messages, and
+ these messages have no intrinsic ordering. Different instances of the same
+ UO, after receiving the same messages in different orders, should reach the
+ same state.
+
+ Any UO could be implemented as a set of all messages received so far, and
+ coherency could be maintained by sending all messages ever transmitted to
+ each new joining member. However, many UOs will have the property that most
+ messages are obsolete, and need not be transmitted. Therefore, as an
+ optimization, UOs manage their own state structures for synchronizing state
+ with joining/merging users.
+
+ The following code is an abstract class for UnorderedObject, serving
+ primarily as documentation for the concept.
+ """
+
+ handler = None
+
+ def set_handler(self, handler):
+ """Each UO must accept an UnorderedHandler via set_handler
+ Whenever an action is taken on the local UO (e.g. a method call that changes
+ the object's state), the UO must call handler.send() with an appropriately
+ encoded message.
+
+ Subclasses may override this method if they wish to perform more actions
+ when a handler is set."""
+ if self.handler:
+ raise
+ else:
+ self.handler = handler
+ self.handler.register(self)
+
+
+ def receive_message(self,msg):
+ """This method accepts and processes a message sent via handler.send().
+ Because objects are sent over DBus, it is advisable to DBus-ify the message
+ before calling send, and de-DBus-ify it inside receive_message."""
+ raise NotImplementedError
+
+ def get_history(self):
+ """This method returns an encoded copy of all non-obsolete state, ready to be
+ sent over DBus."""
+ raise NotImplementedError
+
+ def add_history(self, state):
+ """This method accepts and processes the state object returned by get_history()"""
+ raise NotImplementedError
+
+
+def empty_translator(x, pack):
+ return x
+
+class HighScore(UnorderedObject):
+ """ A HighScore is the simplest nontrivial DObject. A HighScore's state consists
+ of a value and a score. The user may suggest a new value and score. If the new
+ score is higher than the current score, then the value and score are updated.
+ Otherwise, they are not.
+
+ The value can be any object, and the score can be any comparable object.
+
+ To ensure that serialization works correctly, the user may specify a
+ translator function that converts values or scores to and from a format that
+ can be serialized reliably by dbus-python.
+
+ In the event of a tie, coherence cannot be guaranteed. If ties are likely
+ with the score of choice, the user may set break_ties=True, which appends a
+ random number to each message, and thereby reduces the probability of a tie
+ by a factor of 2**52.
+ """
+ def __init__(self, initval, initscore, value_translator=empty_translator, score_translator=empty_translator, break_ties=False):
+ self._logger = logging.getLogger('stopwatch.HighScore')
+ self._lock = threading.Lock()
+ self._value = initval
+ self._score = initscore
+
+ self._break_ties = break_ties
+ if self._break_ties:
+ self._tiebreaker = random.random()
+ else:
+ self._tiebreaker = None
+
+ self._val_trans = value_translator
+ self._score_trans = score_translator
+
+ self._listeners = []
+
+
+ def _set_value_from_net(self, val, score, tiebreaker):
+ self._logger.debug("set_value_from_net " + str(val) + " " + str(score))
+ if self._actually_set_value(val, score, tiebreaker):
+ self._trigger()
+
+ def receive_message(self, message):
+ self._logger.debug("receive_message " + str(message))
+ if len(message) == 2: #Remote has break_ties=False
+ self._set_value_from_net(self._val_trans(message[0], False), self._score_trans(message[1], False), None)
+ elif len(message) == 3:
+ self._set_value_from_net(self._val_trans(message[0], False), self._score_trans(message[1], False), float_translator(message[2], False))
+
+
+ add_history = receive_message
+
+ def set_value(self, val, score):
+ """This method suggests a value and score for this HighScore. If the
+ suggested score is higher than the current score, then both value and
+ score will be broadcast to all other participants.
+ """
+ self._logger.debug("set_value " + str(val) + " " + str(score))
+ if self._actually_set_value(val, score, None) and self.handler:
+ self.handler.send(self.get_history())
+
+ def _actually_set_value(self, value, score, tiebreaker):
+ self._logger.debug("_actually_set_value " + str(value)+ " " + str(score))
+ if self._break_ties and (tiebreaker is None):
+ tiebreaker = random.random()
+ self._lock.acquire()
+ if self._break_ties:
+ if (self._score < score) or ((self._score == score) and (self._tiebreaker < tiebreaker)):
+ self._value = value
+ self._score = score
+ self._tiebreaker = tiebreaker
+ self._lock.release()
+ return True
+ else:
+ self._lock.release()
+ return False
+ elif self._score < score:
+ self._value = value
+ self._score = score
+ self._lock.release()
+ return True
+ else:
+ self._logger.debug("not changing value")
+ self._lock.release()
+ return False
+
+ def get_value(self):
+ """ Get the current winning value."""
+ return self._value
+
+ def get_score(self):
+ """ Get the current winning score."""
+ return self._score
+
+ def get_pair(self):
+ """ Get the current value and score, returned as a tuple (value, score)"""
+ self._lock.acquire()
+ pair = (self._value, self._score)
+ self._lock.release()
+ return pair
+
+ def _get_all(self):
+ if self._break_ties:
+ self._lock.acquire()
+ q = (self._value, self._score, self._tiebreaker)
+ self._lock.release()
+ return q
+ else:
+ return self.get_pair()
+
+ def get_history(self):
+ p = self._get_all()
+ if self._break_ties:
+ return (self._val_trans(p[0], True), self._score_trans(p[1], True), float_translator(p[2], True))
+ else:
+ return (self._val_trans(p[0], True), self._score_trans(p[1], True))
+
+ def register_listener(self, L):
+ """Register a function L that will be called whenever another user sets
+ a new record. L must have the form L(value, score)."""
+ self._lock.acquire()
+ self._listeners.append(L)
+ self._lock.release()
+ (v,s) = self.get_pair()
+ L(v,s)
+
+ def _trigger(self):
+ (v,s) = self.get_pair()
+ for L in self._listeners:
+ L(v,s)
+
+def float_translator(f, pack):
+ """This translator packs and unpacks floats for dbus serialization"""
+ if pack:
+ return dbus.Double(f)
+ else:
+ return float(f)
+
+def uint_translator(f, pack):
+ """This translator packs and unpacks 64-bit uints for dbus serialization"""
+ if pack:
+ return dbus.UInt64(f)
+ else:
+ return int(f)
+
+def int_translator(f, pack):
+ """This translator packs and unpacks 32-bit ints for dbus serialization"""
+ if pack:
+ return dbus.Int32(f)
+ else:
+ return int(f)
+
+def string_translator(s, pack):
+ """This translator packs and unpacks unicode strings for dbus serialization"""
+ if pack:
+ return dbus.String(s)
+ else:
+ return str(s)
+
+class Latest(HandlerAcceptor):
+ """ Latest is a variation on HighScore, in which the score is the current
+ timestamp. Latest uses TimeHandler to provide a groupwide coherent clock.
+ Because TimeHandler's guarantees about synchronization and resilience are
+ weak, Latest is not as resilient to failures as a true DObject.
+
+ The creator must provide the initial value. One may
+ optionally indicate the initial time (as a float in epoch-time), a
+ TimeHandler (otherwise a new one will be created), and a translator for
+ serialization of the values.
+
+ Note that if time_handler is not provided, the object will not be functional
+ until set_handler is called.
+ """
+ def __init__(self, initval, inittime=float('-inf'), time_handler=None, translator=empty_translator):
+ self._time_handler = time_handler
+
+ self._listeners = []
+ self._lock = threading.Lock()
+
+ self._highscore = HighScore(initval, inittime, translator, float_translator)
+ self._highscore.register_listener(self._highscore_cb)
+
+ def set_handler(self, handler):
+ if self.handler:
+ raise
+ else:
+ if self._time_handler is None:
+ self._time_handler = TimeHandler(handler.get_path(), handler.get_tube())
+ self._highscore.set_handler(handler)
+
+ def get_value(self):
+ """ Returns the latest value """
+ return self._highscore.get_value()
+
+ def set_value(self, val):
+ """ Suggest a new value """
+ if self._time_handler:
+ self._highscore.set_value(val, self._time_handler.time())
+ else:
+ raise #missing _time_handler
+
+ def register_listener(self, L):
+ """ Register a listener L(value), to be called whenever another user
+ adds a new latest value."""
+ self._lock.acquire()
+ self._listeners.append(L)
+ self._lock.release()
+ L(self.get_value())
+
+ def _highscore_cb(self, val, score):
+ for L in self._listeners:
+ L(val)
+
+class Recentest(HandlerAcceptor):
+ """ Recentest is like Latest, but without using a clock or TimeHandler.
+ As a result, it can only guarantee causality, not synchrony.
+ """
+ def __init__(self, initval, translator=empty_translator):
+ self._listeners = []
+ self._lock = threading.Lock()
+
+ self._highscore = HighScore(initval, 0, translator, uint_translator, break_ties=True)
+ self._highscore.register_listener(self._highscore_cb)
+
+ def set_handler(self, handler):
+ self._highscore.set_handler(handler)
+
+ def get_value(self):
+ """ Returns the current value """
+ return self._highscore.get_value()
+
+ def set_value(self, val):
+ """ Set a new value """
+ self._highscore.set_value(val, self._highscore.get_score() + 1)
+
+ def register_listener(self, L):
+ """ Register a listener L(value), to be called whenever another user
+ adds a new latest value."""
+ self._lock.acquire()
+ self._listeners.append(L)
+ self._lock.release()
+ L(self.get_value())
+
+ def _highscore_cb(self, val, score):
+ for L in self._listeners:
+ L(val)
+
+class AddOnlySet(UnorderedObject):
+ """The AddOnlySet is the archetypal UnorderedObject. It consists of a set,
+ supporting all the normal Python set operations except those that cause an
+ item to be removed from the set. Thanks to this restriction, a AddOnlySet
+ is perfectly coherent, since the order in which elements are added is not
+ important.
+ """
+ def __init__(self, initset = (), translator=empty_translator):
+ self._logger = logging.getLogger('dobject.AddOnlySet')
+ self._set = set(initset)
+
+ self._lock = threading.Lock()
+
+ self._trans = translator
+ self._listeners = []
+
+ self.__and__ = self._set.__and__
+ self.__cmp__ = self._set.__cmp__
+ self.__contains__ = self._set.__contains__
+ self.__eq__ = self._set.__eq__
+ self.__ge__ = self._set.__ge__
+ # Not implementing getattribute
+ self.__gt__ = self._set.__gt__
+ self.__hash__ = self._set.__hash__
+ # Not implementing iand (it can remove items)
+ # Special wrapper for ior to trigger events
+ # Not implementing isub (it can remove items)
+ self.__iter__ = self._set.__iter__
+ # Not implementing ixor (it can remove items)
+ self.__le__ = self._set.__le__
+ self.__len__ = self._set.__len__
+ self.__lt__ = self._set.__lt__
+ self.__ne__ = self._set.__ne__
+ self.__or__ = self._set.__or__
+ self.__rand__ = self._set.__rand__
+ # Special implementation of repr
+ self.__ror__ = self._set.__ror__
+ self.__rsub__ = self._set.__rsub__
+ self.__rxor__ = self._set.__rxor__
+ self.__sub__ = self._set.__sub__
+ self.__xor__ = self._set.__xor__
+
+ # Special implementation of add to trigger events
+ # Not implementing clear
+ self.copy = self._set.copy
+ self.difference = self._set.difference
+ # Not implementing difference_update (it removes items)
+ # Not implementing discard (it removes items)
+ self.intersection = self._set.intersection
+ # Not implementing intersection_update (it removes items)
+ self.issubset = self._set.issubset
+ self.issuperset = self._set.issuperset
+ # Not implementing pop
+ # Not implementing remove
+ self.symmetric_difference = self._set.symmetric_difference
+ # Not implementing symmetric_difference_update
+ self.union = self._set.union
+ # Special implementation of update to trigger events
+
+ def update(self, y):
+ """Add all the elements of an iterable y to the current set. If any of
+ these elements were not already present, they will be broadcast to all
+ other users."""
+ s = set(y)
+ d = s - self._set
+ if len(d) > 0:
+ self._set.update(d)
+ self._send(d)
+
+ __ior__ = update
+
+ def add(self, y):
+ """ Add the single element y to the current set. If y is not already
+ present, it will be broadcast to all other users."""
+ if y not in self._set:
+ self._set.add(y)
+ self._send((y,))
+
+ def _send(self, els):
+ if len(els) > 0 and self.handler is not None:
+ self.handler.send(dbus.Array([self._trans(el, True) for el in els]))
+
+ def _net_update(self, y):
+ s = set(y)
+ d = s - self._set
+ if len(d) > 0:
+ self._set.update(d)
+ self._trigger(d)
+
+ def receive_message(self, msg):
+ self._net_update((self._trans(el, False) for el in msg))
+
+ def get_history(self):
+ if len(self._set) > 0:
+ return dbus.Array([self._trans(el, True) for el in self._set])
+ else:
+ return dbus.Array([], type=dbus.Boolean) #Prevent introspection of empty list, which fails
+
+ add_history = receive_message
+
+ def register_listener(self, L):
+ """Register a listener L(diffset). Every time another user adds items
+ to the set, L will be called with the set of new items."""
+ self._listeners.append(L)
+ L(self._set.copy())
+
+ def _trigger(self, s):
+ for L in self._listeners:
+ L(s)
+
+ def __repr__(self):
+ return 'AddOnlySet(' + repr(self.handler) + ', ' + repr(self._set) + ', ' + repr(self._trans) + ')'
+
+class AddOnlySortedSet(UnorderedObject):
+ """ AddOnlySortedSet is much like AddOnlySet, only backed by a ListSet, which
+ provides a set for objects that are ordered under cmp(). Items are maintained
+ in order. This approach is most useful in cases where each item is a message,
+ and the messages are subject to a time-like ordering. Messages may still
+ arrive out of order, but they will be stored in the same order on each
+ computer.
+ """
+ def __init__(self, initset = (), translatohr=empty_translator):
+ self._logger = logging.getLogger('dobject.AddOnlySortedSet')
+ self._set = ListSet(initset)
+
+ self._lock = threading.Lock()
+
+ self._trans = translator
+ self._listeners = []
+
+ self.__and__ = self._set.__and__
+ self.__contains__ = self._set.__contains__
+ # No self.__delitem__
+ self.__eq__ = self._set.__eq__
+ self.__ge__ = self._set.__ge__
+ # Not implementing getattribute
+ self.__getitem__ = self._set.__getitem__
+ self.__gt__ = self._set.__gt__
+ # Not implementing iand (it can remove items)
+ # Special wrapper for ior to trigger events
+ # Not implementing isub (it can remove items)
+ self.__iter__ = self._set.__iter__
+ # Not implementing ixor (it can remove items)
+ self.__le__ = self._set.__le__
+ self.__len__ = self._set.__len__
+ self.__lt__ = self._set.__lt__
+ self.__ne__ = self._set.__ne__
+ self.__or__ = self._set.__or__
+ self.__rand__ = self._set.__rand__
+ # Special implementation of repr
+ self.__ror__ = self._set.__ror__
+ self.__rsub__ = self._set.__rsub__
+ self.__rxor__ = self._set.__rxor__
+ self.__sub__ = self._set.__sub__
+ self.__xor__ = self._set.__xor__
+
+ # Special implementation of add to trigger events
+ # Not implementing clear
+ self.copy = self._set.copy
+ self.difference = self._set.difference
+ # Not implementing difference_update (it removes items)
+ # Not implementing discard (it removes items)
+ self.first = self._set.first
+ self.headset = self._set.headset
+ self.index = self._set.index
+ self.intersection = self._set.intersection
+ # Not implementing intersection_update (it removes items)
+ self.issubset = self._set.issubset
+ self.issuperset = self._set.issuperset
+ self.last = self._set.last
+ # Not implementing pop
+ self.position = self._set.position
+ # Not implementing remove
+ self.subset = self._set.subset
+ self.symmetric_difference = self._set.symmetric_difference
+ # Not implementing symmetric_difference_update
+ self.tailset = self._set.tailset
+ self.union = self._set.union
+ # Special implementation of update to trigger events
+
+ def update(self, y):
+ """Add all the elements of an iterable y to the current set. If any of
+ these elements were not already present, they will be broadcast to all
+ other users."""
+ d = ListSet(y)
+ d -= self._set
+ if len(d) > 0:
+ self._set.update(d)
+ self._send(d)
+
+ __ior__ = update
+
+ def add(self, y):
+ """ Add the single element y to the current set. If y is not already
+ present, it will be broadcast to all other users."""
+ if y not in self._set:
+ self._set.add(y)
+ self._send((y,))
+
+ def _send(self, els):
+ if len(els) > 0 and self.handler is not None:
+ self.handler.send(dbus.Array([self._trans(el, True) for el in els]))
+
+ def _net_update(self, y):
+ d = ListSet()
+ d._list = y
+ d -= self._set
+ if len(d) > 0:
+ self._set |= d
+ self._trigger(d)
+
+ def receive_message(self, msg):
+ self._net_update([self._trans(el, False) for el in msg])
+
+ def get_history(self):
+ if len(self._set._list) > 0:
+ return dbus.Array([self._trans(el, True) for el in self._set._list])
+ else:
+ return dbus.Array([], type=dbus.Boolean) #prevent introspection of empty list, which fails
+
+ add_history = receive_message
+
+ def register_listener(self, L):
+ """Register a listener L(diffset). Every time another user adds items
+ to the set, L will be called with the set of new items as a SortedSet."""
+ self._listeners.append(L)
+ L(self._set.copy())
+
+ def _trigger(self, s):
+ for L in self._listeners:
+ L(s)
+
+ def __repr__(self):
+ return 'AddOnlySortedSet(' + repr(self.handler) + ', ' + repr(self._set) + ', ' + repr(self._trans) + ')'
+
+class CausalHandler:
+ """The CausalHandler is analogous to the UnorderedHandler, in that it
+ presents an interface with which to build a wide variety of objects with
+ distributed state. The CausalHandler is different from the Unordered in two
+ ways:
+
+ 1. The send() method of an CausalHandler returns an index, which must be
+ stored by the CausalObject in connection with the information that was sent.
+ This index is a universal, fully-ordered, strictly causal identifier
+ for each message.
+
+ 2. A CausalObject's receive_message method takes two arguments: the message
+ and its index.
+
+ As a convenience, there is also
+
+ 3. A get_index() method, which provides a new index on each call, always
+ higher than all previous indexes.
+
+ CausalObjects are responsible for including index information in the
+ return value of get_history, and processing index information in add_history.
+
+ It is noteworthy that CausalHandler is in fact implemented on _top_ of
+ UnorderedHandler. The imposition of ordering does not require lower-level
+ access to the network. This fact of implementation may change in the
+ future, but CausalObjects will not be able to tell the difference.
+ """
+
+ ZERO_INDEX = (0,0)
+
+ def __init__(self, name, tube_box):
+ self._myname = name
+ self._tube_box = tube_box
+ self._unordered = UnorderedHandler(name, tube_box)
+ self._counter = 0
+ self._copies = []
+
+ self.object = None
+
+ def register(self, obj):
+ self.object = obj
+ self._unordered.register(self)
+
+ def get_index(self):
+ """get_index returns a new index, higher than all previous indexes.
+ The primary reason to use get_index is if you wish two know the index
+ of an item _before_ calling send()"""
+ self._counter += 1
+ return (self._counter, random.getrandbits(64))
+
+ def index_trans(self, index, pack):
+ """index_trans is a standard serialization translator for the index
+ format. Thanks to this translator, a CausalObject can and should treat
+ each index as an opaque, comparable object."""
+ if pack:
+ return dbus.Struct((dbus.UInt64(index[0]), dbus.UInt64(index[1])), signature='tt')
+ else:
+ return (int(index[0]), int(index[1]))
+
+ def send(self, msg, index=None):
+ """send() broadcasts a message to all other participants. If called
+ with one argument, send() broadcasts that message, along with a new
+ index, and returns the index. If called with two arguments, the second
+ may be an index, which will be used for this message. The index must
+ have been acquired using get_index(). In this case, the index must be
+ acquired immediately prior to calling send(). Otherwise, another
+ message may arrive in the interim, causing a violation of causality."""
+ if index is None:
+ index = self.get_index()
+ self._unordered.send(dbus.Struct((msg, self.index_trans(index, True))))
+ return index
+
+ def receive_message(self, msg):
+ m = msg[0]
+ index = self.index_trans(msg[1], False)
+ self._counter = max(self._counter, index[0])
+ self.object.receive_message(m, index)
+
+ def add_history(self, hist):
+ h = hist[0]
+ index = self.index_trans(hist[1], False)
+ self._counter = max(self._counter, index[0])
+ self.object.add_history(h)
+
+ def get_history(self):
+ h = self.object.get_history()
+ hist = dbus.Struct((h, self.index_trans(self.get_index(), True)))
+ return
+
+ def copy(self, name):
+ """A convenience function for returning a new CausalHandler derived
+ from this one, with a new name. This is safe as long as copy() is called
+ with a different name every time."""
+ h = CausalHandler(self._myname + "/" + name, self._tube_box)
+ self._copies.append(h)
+ return h
+
+ def get_copies(self):
+ return self._copies
+
+ def get_name(self):
+ return self._myname
+
+
+class CausalHandlerAcceptor(HandlerAcceptor):
+ HANDLER_TYPE = CausalHandler
+ def set_handler(self, handler):
+ raise NotImplementedError
+
+class CausalObject(CausalHandlerAcceptor):
+ """A CausalObject is almost precisely like an UnorderedObject, except
+ that whereas an UnorderedObject is completely specified by a set of messages,
+ a CausalObject is completely specified by an ordered list of messages,
+ sorted according to an opaque index associated with each message.
+ This index must be monotonically increasing in time for new messages as they
+ are created, but old messages may arrive long after they were created, and
+ are then inserted into the middle of the timestream.
+
+ The following code is an abstract class for CausalObject, serving
+ primarily as documentation for the concept.
+ """
+
+ handler = None
+
+ def set_handler(self, handler):
+ """Each CO must accept a CausalHandler via set_handler.
+
+ Subclasses may override this method if they wish to perform more actions
+ when a handler is set."""
+ if self.handler:
+ raise
+ else:
+ self.handler = handler
+ self.handler.register(self)
+
+ def receive_message(self, msg, index):
+ """This method accepts and processes a message sent via handler.send().
+ Because objects are sent over DBus, it is advisable to DBus-ify the message
+ before calling send, and de-DBus-ify it inside receive_message.
+
+ The index argument is an opaque index used for determining the ordering."""
+ raise NotImplementedError
+
+ def get_history(self):
+ """This method returns an encoded copy of all non-obsolete state, ready to be
+ sent over DBus."""
+ raise NotImplementedError
+
+ def add_history(self, state):
+ """This method accepts and processes the state object returned by get_history()"""
+ raise NotImplementedError
+
+class CausalDict(CausalObject):
+ """NOTE: CausalDict is UNTESTED. Other things may be buggy, but CausalDict
+ PROBABLY DOES NOT WORK. A CausalDict WILL NOT WORK UNTIL set_handler IS CALLED.
+
+ CausalDict is a distributed version of a Dict (hash table). All users keep
+ a copy of the entire table, so this is not a "Distributed Hash Table"
+ according to the terminology of the field.
+
+ CausalDict permits all Dict operations, including removing keys and
+ modifying the value of existing keys. This would not be possible using an
+ Unordered approach, because two value assignments to the same key could
+ arrive in different orders for different users, leaving them in different
+ states at quiescence.
+
+ To solve this problem, every assignment and removal is given a monotonically
+ increasing unique index, and whenever there is a conflict, the higher-index
+ operation wins.
+
+ One side effect of this design is that deleted keys cannot be forgotten. If
+ an assignment operation is received whose index is lower than
+ the deletion's, then that assignment is considered obsolete and must not be
+ executed.
+
+ To provide a mechanism for reducing memory usage, the clear() method has
+ been interpreted to remove not only all entries received so far, but also
+ all entries that will ever be received with index less than the current
+ index.
+ """
+ ADD = 0
+ DELETE = 1
+ CLEAR = 2
+
+ def __init__(self, initdict=(), key_translator=empty_translator, value_translator=empty_translator):
+ self._dict = dict(initdict)
+ self._listeners = []
+
+ self._key_trans = key_translator
+ self._val_trans = value_translator
+
+ self.__contains__ = self._dict.__contains__
+ #Special __delitem__
+ self.__eq__ = self._dict.__eq__
+ self.__ge__ = self._dict.__ge__
+ self.__getitem__ = self._dict.__getitem__
+ self.__gt__ = self._dict.__gt__
+ self.__le__ = self._dict.__le__
+ self.__len__ = self._dict.__len__
+ self.__lt__ = self._dict.__lt__
+ self.__ne__ = self._dict.__ne__
+ # special __setitem__
+
+ #Special clear
+ self.copy = self._dict.copy
+ self.get = self._dict.get
+ self.has_key = self._dict.has_key
+ self.items = self._dict.items
+ self.iteritems = self._dict.iteritems
+ self.iterkeys = self._dict.iterkeys
+ self.itervalues = self._dict.itervalues
+ self.keys = self._dict.keys
+ #Special pop
+ #Special popitem
+ #special setdefault
+ #special update
+ self.values = self._dict.values
+
+ def set_handler(self, handler):
+ if self.handler is not None:
+ raise
+ else:
+ self.handler = handler
+ self._clear = self.handler.get_index() #this must happen before index_dict initialization, so that self._clear is less than any index in index_dict
+ self._index_dict = dict(((k, self.handler.get_index()) for k in self._dict))
+
+ self.handler.register(self)
+
+ def __delitem__(self, key):
+ """Same as for dict"""
+ del self._dict[key]
+ n = self.handler.send(((dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))))
+ self._index_dict[key] = n
+
+ def __setitem__(self, key, value):
+ """Same as for dict"""
+ self._dict[key] = value
+ n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(key, True), self._val_trans(value, True))]))
+ self._index_dict[key] = n
+
+ def clear(self):
+ """Same as for dict"""
+ self._dict.clear()
+ self._index_dict.clear()
+ n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.CLEAR))]))
+ self._clear = n
+
+ def pop(self, key, x=None):
+ """Same as for dict"""
+ t = (key in self._dict)
+ if x is None:
+ r = self._dict.pop(key)
+ else:
+ r = self._dict.pop(key, x)
+
+ if t:
+ n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))]))
+ self._index_dict[key] = n
+
+ return r
+
+ def popitem(self):
+ """Same as for dict"""
+ p = self._dict.popitem()
+ key = p[0]
+ n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))]))
+ self._index_dict[key] = n
+ return p
+
+ def setdefault(self, key, x):
+ """Same as for dict"""
+ if key not in self._dict:
+ self._dict[key] = x
+ n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(key, True), self._val_trans(value, True))]))
+ self._index_dict[key] = n
+
+ def update(*args,**kargs):
+ """Same as for dict"""
+ d = dict()
+ d.update(*args,**kargs)
+ newpairs = []
+ for p in d.items():
+ if (p[0] not in self._dict) or (self._dict[p[0]] != p[1]):
+ newpairs.append(p)
+ self._dict[p[0]] = p[1]
+ n = self.handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(p[0], True), self._val_trans(p[1], True)) for p in newpairs]))
+
+ for p in newpairs:
+ self._index_dict[p[0]] = n
+
+ def receive_message(self, msg, n):
+ if n > self._clear:
+ a = dict()
+ r = dict()
+ for m in msg:
+ flag = int(m[0]) #don't know length of m without checking flag
+ if flag == CausalDict.ADD:
+ key = self._key_trans(m[1], False)
+ if (key not in self._index_dict) or (self._index_dict[key] < n):
+ val = self._val_trans(m[2], False)
+ if key in self._dict:
+ r[key] = self._dict[key]
+ self._dict[key] = val
+ a[key] = val
+ self._index_dict[key] = n
+ elif flag == CausalDict.DELETE:
+ key = self._key_trans(m[1], False)
+ if key not in self._index_dict:
+ self._index_dict[key] = n
+ elif (self._index_dict[key] < n):
+ self._index_dict[key] = n
+ if key in self._dict:
+ r[key] = self._dict[key]
+ del self._dict[key]
+ elif flag == CausalDict.CLEAR:
+ self._clear = n
+ for (k, ind) in self._index_dict.items():
+ if ind < self._clear:
+ del self._index_dict[k]
+ if k in self._dict:
+ r[k] = self._dict[k]
+ del self._dict[k]
+ if (len(a) > 0) or (len(r) > 0):
+ self._trigger(a,r)
+
+ def get_history(self):
+ c = self.handler.index_trans(self._clear, True)
+ d = dbus.Array([(self._key_trans(p[0], True), self._val_trans(p[1], True)) for p in self._dict.items()])
+ i = dbus.Array([(self._key_trans(p[0], True), self.handler.index_trans(p[1], True)) for p in self._index_dict.items()])
+ return dbus.Struct((c,d,i),signature='itt')
+
+ def add_history(self, hist):
+ c = self.handler.index_trans(hist[0], False)
+ d = dict(((self._key_trans(p[0], False), self._val_trans(p[1], False)) for p in hist[1]))
+ i = [(self._key_trans(p[0], False), self.handler.index_trans(p[1], False)) for p in hist[2]]
+
+ a = dict()
+ r = dict()
+
+ if c > self._clear:
+ self._clear = c
+ for (k, n) in self._index_dict.items():
+ if n < self._clear:
+ del self._index_dict[k]
+ if k in self._dict:
+ r[k] = self._dict[k]
+ del self._dict[k]
+
+ k_changed = []
+ for (k, n) in i:
+ if (((k not in self._index_dict) and (n > self._clear)) or
+ ((k in self._index_dict) and (n > self._index_dict[k]))):
+ k_changed.append(k)
+ self._index_dict[k] = n
+
+ for k in k_changed:
+ if k in d:
+ if (k in self._dict) and (self._dict[k] != d[k]):
+ r[k] = self._dict[k]
+ a[k] = d[k]
+ elif k not in self._dict:
+ a[k] = d[k]
+ self._dict[k] = d[k]
+ else:
+ if k in self._dict:
+ r[k] = self._dict[k]
+ del self._dict[k]
+
+ if (len(a) > 0) or (len(r) > 0):
+ self._trigger(a,r)
+
+ def register_listener(self, L):
+ """Register a change-listener L. Whenever another user makes a change
+ to this dict, L will be called with L(dict_added, dict_removed). The
+ two arguments are the dict of new entries, and the dict of entries that
+ have been deleted or overwritten."""
+ self._listeners.append(L)
+ L(self._dict.copy(), dict())
+
+ def _trigger(self, added, removed):
+ for L in self._listeners:
+ L(added, removed)
+
+class UserDict(dbus.gobject_service.ExportedGObject):
+ IFACE = "org.dobject.UserDict"
+ BASEPATH = "/org/dobject/UserDict/"
+
+ def __init__(self, name, tubebox, myval, translator = empty_translator):
+ self._myname = name
+ self.PATH = UserDict.BASEPATH + name
+ dbus.gobject_service.ExportedGObject.__init__(self)
+ self._logger = logging.getLogger(self.PATH)
+ self._tube_box = tube_box
+ self.tube = None
+
+ self._dict = dict()
+ self._myval = myval
+ self._trans = translator
+
+ self._tube_box.register_listener(self.set_tube)
+
+ self.__contains__ = self._dict.__contains__
+ #No __delitem__
+ self.__eq__ = self._dict.__eq__
+ self.__ge__ = self._dict.__ge__
+ self.__getitem__ = self._dict.__getitem__
+ self.__gt__ = self._dict.__gt__
+ self.__le__ = self._dict.__le__
+ self.__len__ = self._dict.__len__
+ self.__lt__ = self._dict.__lt__
+ self.__ne__ = self._dict.__ne__
+ #No __setitem__
+
+ #No clear
+ self.copy = self._dict.copy
+ self.get = self._dict.get
+ self.has_key = self._dict.has_key
+ self.items = self._dict.items
+ self.iteritems = self._dict.iteritems
+ self.iterkeys = self._dict.iterkeys
+ self.itervalues = self._dict.itervalues
+ self.keys = self._dict.keys
+ #No pop
+ #No popitem
+ #No setdefault
+ #No update
+ self.values = self._dict.values
+
+ def set_tube(self, tube, is_initiator):
+ """Callback for the TubeBox"""
+ self.tube = tube
+ self.add_to_connection(self.tube, self.PATH)
+
+ self.tube.add_signal_receiver(self.receive_value, signal_name='send_value', dbus_interface=UserDict.IFACE, sender_keyword='sender', path=self.PATH)
+ self.tube.add_signal_receiver(self.tell_value, signal_name='ask_values', dbus_interface=UserDict.IFACE, sender_keyword='sender', path=self.PATH)
+ self.tube.watch_participants(self.members_changed)
+
+ #Alternative implementation of members_changed (not yet working)
+ #self.tube.add_signal_receiver(self.members_changed, signal_name="MembersChanged", dbus_interface="org.freedesktop.Telepathy.Channel.Interface.Group")
+
+ self.ask_values()
+
+ def get_path(self):
+ """Returns the DBus path of this handler. The path is the closest thing
+ to a unique identifier for each abstract DObject."""
+ return self.PATH
+
+ def get_tube(self):
+ """Returns the TubeBox used to create this handler. This method is
+ necessary if one DObject wishes to create another."""
+ return self._tube_box
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='v')
+ def send_value(self, value):
+ """This method broadcasts message to all other handlers for this UO"""
+ return
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='')
+ def ask_values(self):
+ return
+
+ def tell_value(self, sender=None):
+ self._logger.debug("tell_history to " + str(sender))
+ try:
+ if sender == self.tube.get_unique_name():
+ return
+ remote = self.tube.get_object(sender, self.PATH)
+ remote.receive_value(self._myval, sender_keyword='sender', reply_handler=PassFunction, error_handler=PassFunction)
+ finally:
+ return
+
+ @dbus.service.method(dbus_interface=IFACE, in_signature = 'v', out_signature='', sender_keyword = 'sender')
+ def receive_value(self, value, sender=None):
+ self._dict[sender] = self._trans(value, False)
+
+ #Alternative implementation of a members_changed (not yet working)
+ """
+ def members_changed(self, message, added, removed, local_pending, remote_pending, actor, reason):
+ added_names = self.tube.InspectHandles(telepathy.CONNECTION_HANDLE_TYPE_LIST, added)
+ for name in added_names:
+ self.tell_history(name)
+ """
+ def members_changed(self, added, removed):
+ self._logger.debug("members_changed")
+ for (handle, name) in removed:
+ if name in self._dict:
+ del self._dict[name]
+ for (handle, name) in added:
+ self.tell_value(sender=name)
+
+class UnorderedString(UnorderedObject):
+
+ def __init__(self,initstring=''):
+ self._tree = stringtree.SimpleStringTree()
+ self._listeners = []
+ self._newbuffer = []
+ if initstring:
+ self.insert(initstring, 0)
+
+ def insert(self, text, pos):
+ x = self._tree.insert(text,pos)
+ if self.handler is not None:
+ self.handler.send(dbus.Array(stringtree.translator(i,True) for i in x))
+
+ def delete(self, k, n):
+ x = self._tree.delete(k,n)
+ if self.handler is not None:
+ self.handler.send(dbus.Array(stringtree.translator(i,True) for i in x))
+
+ def _net_update(self, L):
+ transformed_list = []
+ self._newbuffer.append(L)
+ for li in self._newbuffer[::-1]:
+ if self._tree.is_ready(li[0]): #each update from the net is required to
+ #obey the rule that if the tree is ready for the first Change,
+ #then it is ready for all the changes. This may be a sort of
+ #violation of the Unordered abstraction...
+ for c in li:
+ transformed_list.extend(self._tree.add_change(c))
+ self._newbuffer.pop() #Having handled the contents of li, we
+ #should make sure it doesn't come up for consideration again
+ self._trigger(transformed_list)
+
+ def get_history(self):
+ return dbus.Array((stringtree.translator(c, True)
+ for c in self._tree.get_changes()),
+ signature = 'v')
+
+ def add_history(self, msg):
+ L = []
+ for el in msg:
+ change = stringtree.translator(el, False)
+ if change.unique_id not in self._tree._id2rec:
+ L.append(change)
+ if L:
+ self._net_update(L)
+
+ receive_message = add_history
+
+ def register_listener(self, L):
+ """Register a listener L(editlist). Every time another user modifies
+ the string, L will be called with a set of edits that represent those
+ changes on the local version of the string. Note that the edits must
+ be performed in order."""
+ self._listeners.append(L)
+
+ def _trigger(self, editlist):
+ for L in self._listeners:
+ L(editlist)
+
+class CausalTree(CausalObject):
+ #SET_PARENT and DELETE_NODE are opcodes to be sent over the wire, and also
+ #to the trigger. MAJOR_CHANGE is sent only to the trigger, and it is not
+ #an opcode. It represents a significant but undefined changed in the tree.
+ SET_PARENT = 0
+ DELETE_NODE = 1
+ CLEAR = 2
+ MAJOR_CHANGE = -1
+
+ ROOT = 0
+
+ def __init__(self):
+ self._timeline = ListSet()
+ self._reverse = {}
+ self._listeners = []
+ self._reset()
+
+ def _reset(self):
+ self._parent = {}
+ self._children = {self.ROOT:set()}
+
+ def __contains__(self, node):
+ return node in self._children
+
+ def get_parent(self,node):
+ if node == self.ROOT:
+ return self.ROOT
+ else:
+ return self._parent[node]
+
+ def get_children(self, node):
+ return frozenset(self._children[node])
+
+ def _process_local_cmd(self,cmd):
+ i = self.handler.get_index()
+ self._timeline.add((i,cmd))
+ rev = self._step(cmd)
+ self._reverse[(i,cmd)] = rev
+ self.handler.send(self._cmd_trans(cmd,True),i)
+
+ def change_parent(self,node,newparent):
+ if (node in self._parent) and (newparent in self._children):
+ if self._parent[node] != newparent:
+ cmd = (self.SET_PARENT, node, newparent)
+ self._process_local_cmd(cmd)
+ else:
+ raise KeyError("One or both nodes is not present")
+
+ def new_child(self,parent):
+ node = random.getrandbits(64)
+ cmd = (self.SET_PARENT, node, parent)
+ self._process_local_cmd(cmd)
+ return node
+
+ def delete(self,node):
+ if node == self.ROOT:
+ raise KeyError("You cannot delete the root node.")
+ if node not in self._children:
+ raise KeyError("No such node.")
+ cmd = (self.DELETE_NODE, node)
+ self._process_local_cmd(cmd)
+
+ def clear(self):
+ cmd = (self.CLEAR,)
+ self._process_local_cmd(cmd)
+
+ def _step(self, cmd):
+ # Returns () if the command failed or had no effect
+ # If the command succeeded, returns an iterable of the commands necessary
+ # to undo this command
+ if cmd[0] == self.SET_PARENT:
+ if cmd[2] in self._children: #if newparent is known
+ if cmd[1] in self._parent: #if node is known
+ if self._parent[cmd[1]] == cmd[2]:
+ return () #No change necessary. This SET_PARENT is redundant
+ if cmd[1] in self._allparents(cmd[2]): #if node is above newparent
+ #This command would create a loop. It is therefore illegal
+ #and should be ignored
+ return ()
+ else:
+ #remove node from under its current parent
+ oldp = self._parent[cmd[1]]
+ self._children[oldp].remove(cmd[1])
+ self._children[cmd[2]].add(cmd[1])
+ self._parent[cmd[1]] = cmd[2]
+ return ((self.SET_PARENT, cmd[1], oldp),)
+ else:
+ #Node is unknown, so it must be added
+ self._children[cmd[1]] = set()
+ self._children[cmd[2]].add(cmd[1])
+ self._parent[cmd[1]] = cmd[2]
+ return ((self.DELETE_NODE, cmd[1]),) #the command executed successfully
+ else:
+ #The new parent is unknown, so the command is illegal and should
+ #be ignored.
+ return ()
+ elif cmd[0] == self.DELETE_NODE:
+ if cmd[1] == self.ROOT:
+ #Deleting the root node is not allowed, so this command is illegal and should be ignored
+ return ()
+ if cmd[1] in self._children:
+ p = self._parent[cmd[1]]
+ self._children[p].remove(cmd[1])
+ cmds = [(self.SET_PARENT, cmd[1], p)]
+ for c in self._children[cmd[1]]:
+ self._children[p].add(c)
+ self._parent[c] = p
+ cmds.append((self.SET_PARENT,c,cmd[1]))
+ del self._children[cmd[1]]
+ del self._parent[cmd[1]]
+ return cmds #The command completed successfully
+ else:
+ #cmd[1] is an unknown node, so this command should be ignored
+ return ()
+ elif cmd[0] == self.CLEAR:
+ deleted = self._parent.keys() #relies on self.ROOT not being in _parent
+ cmds = []
+ stack = [self.ROOT]
+ while len(stack) > 0:
+ n = stack.pop()
+ for c in self._children[n]:
+ cmds.append((self.SET_PARENT, c, n))
+ stack.append(c)
+ self._reset()
+ return cmds
+
+ def _allparents(self, node):
+ s = set()
+ while node != self.ROOT:
+ s.add(node)
+ node = self._parent[node]
+ s.add(self.ROOT)
+ return s
+
+ def _cmd_trans(self,cmd,pack):
+ #This code does not completely specify the dbus typing because it avoids
+ #calling dbus.Struct. The tuple will be introspected.
+ if len(cmd) == 1: #CLEAR
+ return (self._instruction_trans(cmd[0],pack),)
+ if len(cmd) == 2: #DELETE_NODE
+ return (self._instruction_trans(cmd[0],pack), self.node_trans(cmd[1],pack))
+ elif len(cmd) == 3: #SET_PARENT
+ return (self._instruction_trans(cmd[0],pack),
+ self.node_trans(cmd[1],pack),
+ self.node_trans(cmd[2],pack))
+
+ def _instruction_trans(self,ins,pack):
+ return int_translator(ins,pack)
+
+ def node_trans(self,node,pack):
+ return uint_translator(node,pack)
+
+ def register_listener(self, L):
+ self._listeners.append(L)
+
+ def receive_message(self, cmd, i):
+ cmd = self._cmd_trans(cmd,False)
+ elt = (i, cmd)
+ if elt > self._timeline.last():
+ self._timeline.add(elt)
+ s = self._step(cmd)
+ self._reverse[elt] = s
+ if s:
+ self._trigger((cmd,),s)
+ else:
+ (forward, reverse) = self._antestep((elt,))
+ if forward:
+ self._trigger(forward, reverse)
+
+ def _antestep(self, elts):
+ #_antestep accepts an iterable of (i, cmd)s that may have
+ # occurred at previous times. It incorporates these changes into the
+ # timeline and state. It also returns a two-element tuple:
+ # a list of cmds that would have the same effect as the inclusion of elts, and a
+ # list of cmds that would reverse this effect.
+ newelts = [e for e in elts if e not in self._timeline]
+ if len(newelts) == 0:
+ return (False, False)
+ affected = [e for e in self._timeline.tailset(newelts[0]) if self._reverse[e]]
+ rollback = []
+ for l in affected[::-1]:
+ rollback.extend(self._reverse[l])
+ for cmd in rollback:
+ self._step(cmd)
+ # We have now rolled back to the point where newelts[0] is inserted
+ self._timeline.update(newelts)
+ new_effective = []
+ reversers = []
+ for (i,cmd) in self._timeline.tailset(newelts[0]):
+ rev = self._step(cmd)
+ self._reverse[(i,cmd)] = rev
+ if rev: #If the command had any effect
+ reversers.append(rev)
+ new_effective.append(cmd)
+ reversers.reverse()
+ reversenew = []
+ for l in reversers:
+ reversenew.extend(l)
+ forward = rollback
+ forward.extend(new_effective)
+ reverse = reversenew
+ reverse.extend(affected)
+ return (forward, reverse)
+ #This implementation is extremely suboptimal. An ideal implementation
+ #would use some knowledge about the commutativity of different commands
+ #to shorten forward and reverse substantially. As is, they will likely
+ #contain mostly redundant undo-and-then-redo.
+
+ def get_history(self):
+ return dbus.Array(
+ (self.handler.index_trans(i,True), self._cmd_trans(cmd,True))
+ for (i,cmd) in self._timeline)
+
+ def add_history(self,h):
+ elts = ((self.handler.index_trans(i,False), self._cmd_trans(cmd,False))
+ for (i,cmd) in h)
+ (forward, reverse) = self._antestep(elts)
+ if forward:
+ self._trigger(forward, reverse)
+
+ def _trigger(self, info):
+ # info is either (added, removed, affected) if that info is available,
+ # or False if there has been a change but no info is available
+ for L in self._listeners:
+ L(info)