Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBenjamin Schwartz <bens@alum.mit.edu>2008-02-16 16:41:18 (GMT)
committer Benjamin Schwartz <bens@alum.mit.edu>2008-02-16 16:41:18 (GMT)
commit79a3abe8234c496415dac8754a948b80dfd6f7da (patch)
tree8f946adb4b026c9c53f7ad251d0e748c11658ddb
parent36d533def2139ccb93e46a97c70fa2865d34b44c (diff)
Update dobject
-rw-r--r--dobject.py459
1 files changed, 448 insertions, 11 deletions
diff --git a/dobject.py b/dobject.py
index b3d5ce0..cc76f9d 100644
--- a/dobject.py
+++ b/dobject.py
@@ -24,6 +24,7 @@ import logging
import threading
import thread
import random
+from dobject_helpers import *
"""
DObject is a library of components useful for constructing distributed
@@ -214,7 +215,7 @@ class UnorderedHandler(dbus.gobject_service.ExportedGObject):
The name is used to identify the UO; all UO with the same name on the
same Tube should be considered views into the same abstract distributed
object."""
- self._name = name
+ self._myname = name
self.PATH = UnorderedHandler.BASEPATH + name
dbus.gobject_service.ExportedGObject.__init__(self)
self._logger = logging.getLogger(self.PATH)
@@ -305,12 +306,18 @@ class UnorderedHandler(dbus.gobject_service.ExportedGObject):
self.tell_history(sender=name)
def __repr__(self):
- return 'UnorderedHandler(' + self._name + ', ' + repr(self._tube_box) + ')'
+ return 'UnorderedHandler(' + self._myname + ', ' + repr(self._tube_box) + ')'
+
+ def copy(self, name):
+ """A convenience function for returning a new UnorderedHandler derived
+ from this one, with a new name. This is safe as long as copy() is called
+ with a different name every time."""
+ return UnorderedHandler(self._myname + "/" + name, self._tube_box)
def empty_translator(x, pack):
return x
-class HighScore():
+class HighScore:
""" A HighScore is the simplest nontrivial DObject. A HighScore's state consists
of a value and a score. The user may suggest a new value and score. If the new
score is higher than the current score, then the value and score are updated.
@@ -364,8 +371,8 @@ class HighScore():
def set_value(self, val, score):
"""This method suggests a value and score for this HighScore. If the
- score is higher, then both value and score will be broadcast to all
- other participants.
+ suggested score is higher than the current score, then both value and
+ score will be broadcast to all other participants.
"""
self._logger.debug("set_value " + str(val) + " " + str(score))
if self._actually_set_value(val, score, None):
@@ -505,12 +512,9 @@ class AddOnlySet:
is perfectly coherent, since the order in which elements are added is not
important.
"""
- def __init__(self, handler, initset = None, translator=empty_translator):
+ def __init__(self, handler, initset = (), translator=empty_translator):
self._logger = logging.getLogger('dobject.AddOnlySet')
- if initset is None:
- self._set = set()
- else:
- self._set = initset
+ self._set = set(initset)
self._lock = threading.Lock()
@@ -543,7 +547,6 @@ class AddOnlySet:
self.__ror__ = self._set.__ror__
self.__rsub__ = self._set.__rsub__
self.__rxor__ = self._set.__rxor__
- self.__rsub__ = self._set.__rsub__
self.__sub__ = self._set.__sub__
self.__xor__ = self._set.__xor__
@@ -613,3 +616,437 @@ class AddOnlySet:
def __repr__(self):
return 'AddOnlySet(' + repr(self._handler) + ', ' + repr(self._set) + ', ' + repr(self._trans) + ')'
+
+class AddOnlySortedSet:
+ """ AddOnlySortedSet is much like AddOnlySet, only backed by a ListSet, which
+ provides a set for objects that are ordered under cmp(). Items are maintained
+ in order. This approach is most useful in cases where each item is a message,
+ and the messages are subject to a time-like ordering. Messages may still
+ arrive out of order, but they will be stored in the same order on each
+ computer.
+ """
+ def __init__(self, handler, initset = (), translator=empty_translator):
+ self._logger = logging.getLogger('dobject.AddOnlySortedSet')
+ self._set = ListSet(initset)
+
+ self._lock = threading.Lock()
+
+ self._trans = translator
+ self._listeners = [] #This must be done before registering with the handler
+
+ self._handler = handler
+ self._handler.register(self)
+
+ self.__and__ = self._set.__and__
+ self.__contains__ = self._set.__contains__
+ # No self.__delitem__
+ self.__eq__ = self._set.__eq__
+ self.__ge__ = self._set.__ge__
+ # Not implementing getattribute
+ self.__getitem__ = self._set.__getitem__
+ self.__gt__ = self._set.__gt__
+ # Not implementing iand (it can remove items)
+ # Special wrapper for ior to trigger events
+ # Not implementing isub (it can remove items)
+ self.__iter__ = self._set.__iter__
+ # Not implementing ixor (it can remove items)
+ self.__le__ = self._set.__le__
+ self.__len__ = self._set.__len__
+ self.__lt__ = self._set.__lt__
+ self.__ne__ = self._set.__ne__
+ self.__or__ = self._set.__or__
+ self.__rand__ = self._set.__rand__
+ # Special implementation of repr
+ self.__ror__ = self._set.__ror__
+ self.__rsub__ = self._set.__rsub__
+ self.__rxor__ = self._set.__rxor__
+ self.__sub__ = self._set.__sub__
+ self.__xor__ = self._set.__xor__
+
+ # Special implementation of add to trigger events
+ # Not implementing clear
+ self.copy = self._set.copy
+ self.difference = self._set.difference
+ # Not implementing difference_update (it removes items)
+ # Not implementing discard (it removes items)
+ self.first = self._set.first
+ self.headset = self._set.headset
+ self.index = self._set.index
+ self.intersection = self._set.intersection
+ # Not implementing intersection_update (it removes items)
+ self.issubset = self._set.issubset
+ self.issuperset = self._set.issuperset
+ self.last = self._set.last
+ # Not implementing pop
+ self.position = self._set.position
+ # Not implementing remove
+ self.subset = self._set.subset
+ self.symmetric_difference = self._set.symmetric_difference
+ # Not implementing symmetric_difference_update
+ self.tailset = self._set.tailset
+ self.union = self._set.union
+ # Special implementation of update to trigger events
+
+ def update(self, y):
+ """Add all the elements of an iterable y to the current set. If any of
+ these elements were not already present, they will be broadcast to all
+ other users."""
+ d = ListSet(y)
+ d -= self._set
+ if len(d) > 0:
+ self._set.update(d)
+ self._send(d)
+
+ __ior__ = update
+
+ def add(self, y):
+ """ Add the single element y to the current set. If y is not already
+ present, it will be broadcast to all other users."""
+ if y not in self._set:
+ self._set.add(y)
+ self._send((y,))
+
+ def _send(self, els):
+ self._handler.send(dbus.Array([self._trans(el, True) for el in els]))
+
+ def _net_update(self, y):
+ d = ListSet()
+ d._list = y
+ d -= self._set
+ if len(d) > 0:
+ self._set |= d
+ self._trigger(d)
+
+ def receive_message(self, msg):
+ self._net_update([self._trans(el, False) for el in msg])
+
+ def get_history(self):
+ return dbus.Array([self._trans(el, True) for el in self._set._list])
+
+ add_history = receive_message
+
+ def register_listener(self, L):
+ """Register a listener L(diffset). Every time another user adds items
+ to the set, L will be called with the set of new items as a SortedSet."""
+ self._listeners.append(L)
+ L(self._set.copy())
+
+ def _trigger(self, s):
+ for L in self._listeners:
+ L(s)
+
+ def __repr__(self):
+ return 'AddOnlySortedSet(' + repr(self._handler) + ', ' + repr(self._set) + ', ' + repr(self._trans) + ')'
+
+
+def CausalHandler():
+ """The CausalHandler is analogous to the UnorderedHandler, in that it
+ presents an interface with which to build a wide variety of objects with
+ distributed state. The CausalHandler is different from the Unordered in two
+ ways:
+
+ 1. The send() method of an CausalHandler returns an index, which must be
+ stored by the CausalObject in connection with the information that was sent.
+ This index is a universal, fully-ordered, strictly causal identifier
+ for each message.
+
+ 2. A CausalObject's receive_message method takes two arguments: the message
+ and its index.
+
+ As a convenience, there is also
+
+ 3. A get_index() method, which provides a new index on each call, always
+ higher than all previous indexes.
+
+ CausalObjects are responsible for including index information in the
+ return value of get_history, and processing index information in add_history.
+
+ It is noteworthy that CausalHandler is in fact implemented on _top_ of
+ UnorderedHandler. The imposition of ordering does not require lower-level
+ access to the network. This fact of implementation may change in the
+ future, but CausalObjects will not be able to tell the difference.
+ """
+ _max64 = 2**64
+
+ def __init__(self, name, tube_box):
+ self._unordered = UnorderedObject(name, tube_box)
+ self._counter = 0
+
+ self._object = None
+
+ def register(self, obj):
+ self._object = obj
+ self._unordered.register(self)
+
+ def get_index(self):
+ """get_index returns a new index, higher than all previous indexes.
+ The primary reason to use get_index is if you wish two know the index
+ of an item _before_ calling send()"""
+ self._counter += 1
+ return (self._counter, random.randrange(0, CausalHandler._max64))
+
+ def index_trans(self, index, pack):
+ """index_trans is a standard serialization translator for the index
+ format. Thanks to this translator, a CausalObject can and should treat
+ each index as an opaque, comparable object."""
+ if pack:
+ return dbus.Tuple((dbus.UInt64(index[0]), dbus.UInt64(index[1])), 'tt')
+ else:
+ return (int(index[0]), int(index[1]))
+
+ def send(self, msg, index=None):
+ """send() broadcasts a message to all other participants. If called
+ with one argument, send() broadcasts that message, along with a new
+ index, and returns the index. If called with two arguments, the second
+ may be an index, which will be used for this message. The index must
+ have been acquired using get_index(). In this case, the index must be
+ acquired immediately prior to calling send(). Otherwise, another
+ message may arrive in the interim, causing a violation of causality."""
+ if index is None:
+ index = self.get_index()
+ self._unordered.send(dbus.Tuple((msg, self.index_trans(index, True))))
+ return index
+
+ def receive_message(self, msg):
+ m = msg[0]
+ index = self.index_trans(msg[1], False)
+ self._counter = max(self._counter, index[0])
+ self._object.receive_message(m, index)
+
+ def add_history(self, hist):
+ h = hist[0]
+ index = self.index_trans(hist[1], False)
+ self._counter = max(self._counter, index[0])
+ self._object.add_history(h)
+
+ def get_history(self):
+ h = self._object.get_history()
+ hist = dbus.Tuple((h, self.index_trans(self.get_index(), True)))
+ return
+
+class CausalDict:
+ """NOTE: CausalDict is UNTESTED. Other things may be buggy, but CausalDict
+ PROBABLY DOES NOT WORK.
+
+ CausalDict is a distributed version of a Dict (hash table). All users keep
+ a copy of the entire table, so this is not a "Distributed Hash Table"
+ according to the terminology of the field.
+
+ CausalDict permits all Dict operations, including removing keys and
+ modifying the value of existing keys. This would not be possible using an
+ Unordered approach, because two value assignments to the same key could
+ arrive in different orders for different users, leaving them in different
+ states at quiescence.
+
+ To solve this problem, every assignment and removal is given a monotonically
+ increasing unique index, and whenever there is a conflict, the higher-index
+ operation wins.
+
+ One side effect of this design is that deleted keys cannot be forgotten. If
+ an assignment operation is received whose index is lower than
+ the deletion's, then that assignment is considered obsolete and must not be
+ executed.
+
+ To provide a mechanism for reducing memory usage, the clear() method has
+ been interpreted to remove not only all entries received so far, but also
+ all entries that will ever be received with index less than the current
+ index.
+ """
+ ADD = 0
+ DELETE = 1
+ CLEAR = 2
+
+ def __init__(self, handler, initdict=(), key_translator=empty_translator, value_translator=empty_translator):
+ self._handler = handler
+ self._dict = dict(initdict)
+ self._clear = self._handler.get_index() #this must happen before index_dict initialization, so that self._clear is less than any index in index_dict
+ self._index_dict = dict(((k, self._handler.get_index()) for k in initdict))
+
+ self._listeners = []
+
+ self._key_trans = key_translator
+ self._val_trans = value_translator
+
+ self.__contains__ = self._dict.__contains__
+ #Special __delitem__
+ self.__eq__ = self._dict.__eq__
+ self.__ge__ = self._dict.__ge__
+ self.__getitem__ = self._dict.__getitem__
+ self.__gt__ = self._dict.__gt__
+ self.__le__ = self._dict.__le__
+ self.__len__ = self._dict.__len__
+ self.__lt__ = self._dict.__lt__
+ self.__ne__ = self._dict.__ne__
+ # special __setitem__
+
+ #Special clear
+ self.copy = self._dict.copy
+ self.get = self._dict.get
+ self.has_key = self._dict.has_key
+ self.items = self._dict.items
+ self.iteritems = self._dict.iteritems
+ self.iterkeys = self._dict.iterkeys
+ self.itervalues = self._dict.itervalues
+ self.keys = self._dict.keys
+ #Special pop
+ #Special popitem
+ #special setdefault
+ #special update
+ self.values = self._dict.values
+
+ self._handler.register(self)
+
+ def __delitem__(self, key):
+ """Same as for dict"""
+ del self._dict[key]
+ n = self._handler.send(((dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))))
+ self._index_dict[key] = n
+
+ def __setitem__(self, key, value):
+ """Same as for dict"""
+ self._dict[key] = value
+ n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(key, True), self._val_trans(value, True))]))
+ self._index_dict[key] = n
+
+ def clear(self):
+ """Same as for dict"""
+ self._dict.clear()
+ self._index_dict.clear()
+ n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.CLEAR))]))
+ self._clear = n
+
+ def pop(self, key, x=None):
+ """Same as for dict"""
+ t = (key in self._dict)
+ if x is None:
+ r = self._dict.pop(key)
+ else:
+ r = self._dict.pop(key, x)
+
+ if t:
+ n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))]))
+ self._index_dict[key] = n
+
+ return r
+
+ def popitem(self):
+ """Same as for dict"""
+ p = self._dict.popitem()
+ key = p[0]
+ n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))]))
+ self._index_dict[key] = n
+ return p
+
+ def setdefault(self, key, x):
+ """Same as for dict"""
+ if key not in self._dict:
+ self._dict[key] = x
+ n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(key, True), self._val_trans(value, True))]))
+ self._index_dict[key] = n
+
+ def update(*args,**kargs):
+ """Same as for dict"""
+ d = dict()
+ d.update(*args,**kargs)
+ newpairs = []
+ for p in d.items():
+ if (p[0] not in self._dict) or (self._dict[p[0]] != p[1]):
+ newpairs.append(p)
+ self._dict[p[0]] = p[1]
+ n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(p[0], True), self._val_trans(p[1], True)) for p in newpairs]))
+
+ for p in newpairs:
+ self._index_dict[p[0]] = n
+
+ def receive_message(self, msg, n):
+ if n > self._clear:
+ a = dict()
+ r = dict()
+ for m in msg:
+ flag = int(m[0]) #don't know length of m without checking flag
+ if flag == CausalDict.ADD:
+ key = self._key_trans(m[1], False)
+ if (key not in self._index_dict) or (self._index_dict[key] < n):
+ val = self._val_trans(m[2], False)
+ if key in self._dict:
+ r[key] = self._dict[key]
+ self._dict[key] = val
+ a[key] = val
+ self._index_dict[key] = n
+ elif flag == CausalDict.DELETE:
+ key = self._key_trans(m[1], False)
+ if key not in self._index_dict:
+ self._index_dict[key] = n
+ elif (self._index_dict[key] < n):
+ self._index_dict[key] = n
+ if key in self._dict:
+ r[key] = self._dict[key]
+ del self._dict[key]
+ elif flag == CausalDict.CLEAR:
+ self._clear = n
+ for (k, ind) in self._index_dict.items():
+ if ind < self._clear:
+ del self._index_dict[k]
+ if k in self._dict:
+ r[k] = self._dict[k]
+ del self._dict[k]
+ if (len(a) > 0) or (len(r) > 0):
+ self._trigger(a,r)
+
+ def get_history(self):
+ c = self._handler.index_trans(self._clear, True)
+ d = dbus.Array([(self._key_trans(p[0], True), self._val_trans(p[1], True)) for p in self._dict.items()])
+ i = dbus.Array([(self._key_trans(p[0], True), self._handler.index_trans(p[1], True)) for p in self._index_dict.items()])
+ return dbus.Tuple((c,d,i))
+
+ def add_history(self, hist):
+ c = self._handler.index_trans(hist[0], False)
+ d = dict(((self._key_trans(p[0], False), self._val_trans(p[1], False)) for p in hist[1]))
+ i = [(self._key_trans(p[0], False), self._handler.index_trans(p[1], False)) for p in hist[2]]
+
+ a = dict()
+ r = dict()
+
+ if c > self._clear:
+ self._clear = c
+ for (k, n) in self._index_dict.items():
+ if n < self._clear:
+ del self._index_dict[k]
+ if k in self._dict:
+ r[k] = self._dict[k]
+ del self._dict[k]
+
+ k_changed = []
+ for (k, n) in i:
+ if (((k not in self._index_dict) and (n > self._clear)) or
+ ((k in self._index_dict) and (n > self._index_dict[k]))):
+ k_changed.append(k)
+ self._index_dict[k] = n
+
+ for k in k_changed:
+ if k in d:
+ if (k in self._dict) and (self._dict[k] != d[k]):
+ r[k] = self._dict[k]
+ a[k] = d[k]
+ elif k not in self._dict:
+ a[k] = d[k]
+ self._dict[k] = d[k]
+ else:
+ if k in self._dict:
+ r[k] = self._dict[k]
+ del self._dict[k]
+
+ if (len(a) > 0) or (len(r) > 0):
+ self._trigger(a,r)
+
+ def register_listener(self, L):
+ """Register a change-listener L. Whenever another user makes a change
+ to this dict, L will be called with L(dict_added, dict_removed). The
+ two arguments are the dict of new entries, and the dict of entries that
+ have been deleted or overwritten."""
+ self._listeners.append(L)
+ L(self._dict.copy(), dict())
+
+ def _trigger(self, added, removed):
+ for L in self._listeners:
+ L(added, removed)