Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Schwartz <bens@alum.mit.edu>2008-02-10 02:30:39 (GMT)
committer Benjamin Schwartz <bens@alum.mit.edu>2008-02-10 02:30:39 (GMT)
commitf078517c4128eed2093cb88e54cb1e45a0e54162 (patch)
treee004e70a18b08d6c3543f37ee71370d00d03efe0
parentf877814d55d1469069fccfe0bc59bcf492d7b381 (diff)
Initial implementation of AddOnlySet
-rw-r--r--dobject.py165
1 files changed, 164 insertions, 1 deletions
diff --git a/dobject.py b/dobject.py
index a89c6cf..dbf8a1d 100644
--- a/dobject.py
+++ b/dobject.py
@@ -6,6 +6,14 @@ import logging
import threading
import thread
+"""
+DObject is a library of components useful for constructing distributed
+applications that need to maintain coherent state while communicating over
+Telepathy. The DObject tools are design to handle unexpected joins, leaves,
+splits, and merges automatically, and always to leave each connected component
+of users in a coherent state at quiescence.
+"""
+
def PassFunction(*args):
pass
@@ -54,7 +62,8 @@ class TimeHandler(dbus.gobject_service.ExportedGObject):
broadcast and accepting the first response, and assuming that both transfer
displays were equal. The initiator's offset is 0.0, but once another group
member has synchronized, the initiator can leave and new members will still
- be synchronized correctly.
+ be synchronized correctly. Errors at each synchronization are typically
+ between 0.1s and 2s.
TimeHandler is not perfectly resilient to disappearances. If the group
splits, and one of the daughter groups does not contain any members that
@@ -186,6 +195,7 @@ class UnorderedHandler(dbus.gobject_service.ExportedGObject):
The name is used to identify the UO; all UO with the same name on the
same Tube should be considered views into the same abstract distributed
object."""
+ self._name = name
self.PATH = UnorderedHandler.BASEPATH + name
dbus.gobject_service.ExportedGObject.__init__(self)
self._logger = logging.getLogger(self.PATH)
@@ -274,11 +284,27 @@ class UnorderedHandler(dbus.gobject_service.ExportedGObject):
self._logger.debug("members_changed")
for (handle, name) in added:
self.tell_history(sender=name)
+
+ def __repr__(self):
+ return 'UnorderedHandler(' + self._name + ', ' + repr(self._tube_box) + ')'
def empty_translator(x, pack):
return x
class HighScore():
+ """ A HighScore is the simplest nontrivial DObject. A HighScore's state consists
+ of a value and a score. The user may suggest a new value and score. If the new
+ score is higher than the current score, then the value and score are updated.
+ Otherwise, they are not.
+
+ The value can be any object, and the score can be any comparable object.
+
+ To ensure that serialization works correctly, the user may specify a
+ translator function that converts values or scores to and from a format that
+ can be serialized reliably by dbus-python.
+
+ Currently, coherence is only guaranteed in the absence of ties.
+ """
def __init__(self, handler, initval, initscore, value_translator=empty_translator, score_translator=empty_translator):
self._logger = logging.getLogger('stopwatch.HighScore')
self._lock = threading.Lock()
@@ -305,6 +331,10 @@ class HighScore():
add_history = receive_message
def set_value(self, val, score):
+ """This method suggests a value and score for this HighScore. If the
+ score is higher, then both value and score will be broadcast to all
+ other participants.
+ """
self._logger.debug("set_value " + str(val) + " " + str(score))
if self._actually_set_value(val, score):
self._handler.send(self.get_history())
@@ -323,12 +353,15 @@ class HighScore():
return False
def get_value(self):
+ """ Get the current winning value."""
return self._value
def get_score(self):
+ """ Get the current winning score."""
return self._score
def get_pair(self):
+ """ Get the current value and score, returned as a tuple (value, score)"""
self._lock.acquire()
pair = (self._value, self._score)
self._lock.release()
@@ -339,6 +372,8 @@ class HighScore():
return (self._val_trans(p[0], True), self._score_trans(p[1], True))
def register_listener(self, L):
+ """Register a function L that will be called whenever another user sets
+ a new record. L must have the form L(value, score)."""
self._lock.acquire()
self._listeners.append(L)
self._lock.release()
@@ -351,18 +386,30 @@ class HighScore():
L(v,s)
def float_translator(f, pack):
+ """This translator packs and unpacks floats for dbus serialization"""
if pack:
return dbus.Double(f)
else:
return float(f)
def string_translator(s, pack):
+ """This translator packs and unpacks unicode strings for dbus serialization"""
if pack:
return dbus.String(s)
else:
return str(s)
class Latest:
+ """ Latest is a variation on HighScore, in which the score is the current
+ timestamp. Latest uses TimeHandler to provide a groupwide coherent clock.
+ Because TimeHandler's guarantees about synchronization and resilience are
+ weak, Latest is not as resilient to failures as a true DObject.
+
+ The creator must provide a UnorderedHandler and the initial value. One may
+ optionally indicate the initial time (as a float in epoch-time), a
+ TimeHandler (otherwise a new one will be created), and a translator for
+ serialization of the values.
+ """
def __init__(self, handler, initval, inittime=float('-inf'), time_handler=None, translator=empty_translator):
if time_handler is None:
self._time_handler = TimeHandler(handler.get_path(), handler.get_tube())
@@ -376,12 +423,16 @@ class Latest:
self._highscore.register_listener(self._highscore_cb)
def get_value(self):
+ """ Returns the latest value """
return self._highscore.get_value()
def set_value(self, val):
+ """ Suggest a new value """
self._highscore.set_value(val, self._time_handler.time())
def register_listener(self, L):
+ """ Register a listener L(value), to be called whenever another user
+ adds a new latest value."""
self._lock.acquire()
self._listeners.append(L)
self._lock.release()
@@ -390,3 +441,115 @@ class Latest:
def _highscore_cb(self, val, score):
for L in self._listeners:
L(val)
+
+class AddOnlySet:
+ """The AddOnlySet is the archetypal DObject. It consists of a set,
+ supporting all the normal Python set operations except those that cause an
+ item to be removed from the set. Thanks to this restriction, a AddOnlySet
+ is perfectly coherent, since the order in which elements are added is not
+ important.
+ """
+ def __init__(self, handler, initset = set(), translator=empty_translator):
+ self._logger = logging.getLogger('dobject.AddOnlySet')
+ self._set = initset
+ self._lock = threading.Lock()
+
+ self._trans = translator
+ self._listeners = [] #This must be done before registering with the handler
+
+ self._handler = handler
+ self._handler.register(self)
+
+ self.__and__ = self._set.__and__
+ self.__cmp__ = self._set.__cmp__
+ self.__contains__ = self._set.__contains__
+ self.__eq__ = self._set.__eq__
+ self.__ge__ = self._set.__ge__
+ # Not implementing getattribute
+ self.__gt__ = self._set.__gt__
+ self.__hash__ = self._set.__hash__
+ # Not implementing iand (it can remove items)
+ # Special wrapper for ior to trigger events
+ # Not implementing isub (it can remove items)
+ self.__iter__ = self._set.__iter__
+ # Not implementing ixor (it can remove items)
+ self.__le__ = self._set.__le__
+ self.__len__ = self._set.__len__
+ self.__lt__ = self._set.__lt__
+ self.__ne__ = self._set.__ne__
+ self.__or__ = self._set.__or__
+ self.__rand__ = self._set.__rand__
+ # Special implementation of repr
+ self.__ror__ = self._set.__ror__
+ self.__rsub__ = self._set.__rsub__
+ self.__rxor__ = self._set.__rxor__
+ self.__rsub__ = self._set.__rsub__
+ self.__sub__ = self._set.__sub__
+ self.__xor__ = self._set.__xor__
+
+ # Special implementation of add to trigger events
+ # Not implementing clear
+ self.copy = self._set.copy
+ self.difference = self._set.difference
+ # Not implementing difference_update (it removes items)
+ # Not implementing discard (it removes items)
+ self.intersection = self._set.intersection
+ # Not implementing intersection_update (it removes items)
+ self.issubset = self._set.issubset
+ self.issuperset = self._set.issuperset
+ # Not implementing pop
+ # Not implementing remove
+ self.symmetric_difference = self._set.symmetric_difference
+ # Not implementing symmetric_difference_update
+ self.union = self._set.union
+ # Special implementation of update to trigger events
+
+ def update(self, y):
+ """Add all the elements of an iterable y to the current set. If any of
+ these elements were not already present, they will be broadcast to all
+ other users."""
+ s = set(y)
+ d = s - self._set
+ if len(d) > 0:
+ self._set.update(d)
+ self._send(d)
+
+ __ior__ = update
+
+ def add(self, y):
+ """ Add the single element y to the current set. If y is not already
+ present, it will be broadcast to all other users."""
+ if y not in self._set:
+ self._set.add(y)
+ self._send((y))
+
+ def _send(self, els):
+ self._handler.send((self._trans(el, True) for el in els))
+
+ def _net_update(self, y):
+ s = set(y)
+ d = s - self._set
+ if len(d) > 0:
+ self._set.update(d)
+ self._trigger(d)
+
+ def receive_message(self, msg):
+ self._net_update((self._trans(el, False) for el in msg))
+
+ def get_history(self):
+ return (self._trans(el, True) for el in self._set)
+
+ add_history = receive_message
+
+ def register_listener(self, L):
+ """Register a listener L(diffset). Every time another user adds items
+ to the set, L will be called with the set of new items."""
+ self._listeners.append(L)
+ L(self._set.copy())
+
+ def _trigger(self, s):
+ for L in self._listeners:
+ L(s)
+
+ def __repr__(self):
+ return 'AddOnlySet(' + repr(self._handler) + ', ' + repr(self._set) + ', ' + repr(self._trans) + ')'