diff options
author | Benjamin Schwartz <bens@alum.mit.edu> | 2008-02-16 16:41:18 (GMT) |
---|---|---|
committer | Benjamin Schwartz <bens@alum.mit.edu> | 2008-02-16 16:41:18 (GMT) |
commit | 79a3abe8234c496415dac8754a948b80dfd6f7da (patch) | |
tree | 8f946adb4b026c9c53f7ad251d0e748c11658ddb | |
parent | 36d533def2139ccb93e46a97c70fa2865d34b44c (diff) |
Update dobject
-rw-r--r-- | dobject.py | 459 |
1 files changed, 448 insertions, 11 deletions
@@ -24,6 +24,7 @@ import logging import threading import thread import random +from dobject_helpers import * """ DObject is a library of components useful for constructing distributed @@ -214,7 +215,7 @@ class UnorderedHandler(dbus.gobject_service.ExportedGObject): The name is used to identify the UO; all UO with the same name on the same Tube should be considered views into the same abstract distributed object.""" - self._name = name + self._myname = name self.PATH = UnorderedHandler.BASEPATH + name dbus.gobject_service.ExportedGObject.__init__(self) self._logger = logging.getLogger(self.PATH) @@ -305,12 +306,18 @@ class UnorderedHandler(dbus.gobject_service.ExportedGObject): self.tell_history(sender=name) def __repr__(self): - return 'UnorderedHandler(' + self._name + ', ' + repr(self._tube_box) + ')' + return 'UnorderedHandler(' + self._myname + ', ' + repr(self._tube_box) + ')' + + def copy(self, name): + """A convenience function for returning a new UnorderedHandler derived + from this one, with a new name. This is safe as long as copy() is called + with a different name every time.""" + return UnorderedHandler(self._myname + "/" + name, self._tube_box) def empty_translator(x, pack): return x -class HighScore(): +class HighScore: """ A HighScore is the simplest nontrivial DObject. A HighScore's state consists of a value and a score. The user may suggest a new value and score. If the new score is higher than the current score, then the value and score are updated. @@ -364,8 +371,8 @@ class HighScore(): def set_value(self, val, score): """This method suggests a value and score for this HighScore. If the - score is higher, then both value and score will be broadcast to all - other participants. + suggested score is higher than the current score, then both value and + score will be broadcast to all other participants. """ self._logger.debug("set_value " + str(val) + " " + str(score)) if self._actually_set_value(val, score, None): @@ -505,12 +512,9 @@ class AddOnlySet: is perfectly coherent, since the order in which elements are added is not important. """ - def __init__(self, handler, initset = None, translator=empty_translator): + def __init__(self, handler, initset = (), translator=empty_translator): self._logger = logging.getLogger('dobject.AddOnlySet') - if initset is None: - self._set = set() - else: - self._set = initset + self._set = set(initset) self._lock = threading.Lock() @@ -543,7 +547,6 @@ class AddOnlySet: self.__ror__ = self._set.__ror__ self.__rsub__ = self._set.__rsub__ self.__rxor__ = self._set.__rxor__ - self.__rsub__ = self._set.__rsub__ self.__sub__ = self._set.__sub__ self.__xor__ = self._set.__xor__ @@ -613,3 +616,437 @@ class AddOnlySet: def __repr__(self): return 'AddOnlySet(' + repr(self._handler) + ', ' + repr(self._set) + ', ' + repr(self._trans) + ')' + +class AddOnlySortedSet: + """ AddOnlySortedSet is much like AddOnlySet, only backed by a ListSet, which + provides a set for objects that are ordered under cmp(). Items are maintained + in order. This approach is most useful in cases where each item is a message, + and the messages are subject to a time-like ordering. Messages may still + arrive out of order, but they will be stored in the same order on each + computer. + """ + def __init__(self, handler, initset = (), translator=empty_translator): + self._logger = logging.getLogger('dobject.AddOnlySortedSet') + self._set = ListSet(initset) + + self._lock = threading.Lock() + + self._trans = translator + self._listeners = [] #This must be done before registering with the handler + + self._handler = handler + self._handler.register(self) + + self.__and__ = self._set.__and__ + self.__contains__ = self._set.__contains__ + # No self.__delitem__ + self.__eq__ = self._set.__eq__ + self.__ge__ = self._set.__ge__ + # Not implementing getattribute + self.__getitem__ = self._set.__getitem__ + self.__gt__ = self._set.__gt__ + # Not implementing iand (it can remove items) + # Special wrapper for ior to trigger events + # Not implementing isub (it can remove items) + self.__iter__ = self._set.__iter__ + # Not implementing ixor (it can remove items) + self.__le__ = self._set.__le__ + self.__len__ = self._set.__len__ + self.__lt__ = self._set.__lt__ + self.__ne__ = self._set.__ne__ + self.__or__ = self._set.__or__ + self.__rand__ = self._set.__rand__ + # Special implementation of repr + self.__ror__ = self._set.__ror__ + self.__rsub__ = self._set.__rsub__ + self.__rxor__ = self._set.__rxor__ + self.__sub__ = self._set.__sub__ + self.__xor__ = self._set.__xor__ + + # Special implementation of add to trigger events + # Not implementing clear + self.copy = self._set.copy + self.difference = self._set.difference + # Not implementing difference_update (it removes items) + # Not implementing discard (it removes items) + self.first = self._set.first + self.headset = self._set.headset + self.index = self._set.index + self.intersection = self._set.intersection + # Not implementing intersection_update (it removes items) + self.issubset = self._set.issubset + self.issuperset = self._set.issuperset + self.last = self._set.last + # Not implementing pop + self.position = self._set.position + # Not implementing remove + self.subset = self._set.subset + self.symmetric_difference = self._set.symmetric_difference + # Not implementing symmetric_difference_update + self.tailset = self._set.tailset + self.union = self._set.union + # Special implementation of update to trigger events + + def update(self, y): + """Add all the elements of an iterable y to the current set. If any of + these elements were not already present, they will be broadcast to all + other users.""" + d = ListSet(y) + d -= self._set + if len(d) > 0: + self._set.update(d) + self._send(d) + + __ior__ = update + + def add(self, y): + """ Add the single element y to the current set. If y is not already + present, it will be broadcast to all other users.""" + if y not in self._set: + self._set.add(y) + self._send((y,)) + + def _send(self, els): + self._handler.send(dbus.Array([self._trans(el, True) for el in els])) + + def _net_update(self, y): + d = ListSet() + d._list = y + d -= self._set + if len(d) > 0: + self._set |= d + self._trigger(d) + + def receive_message(self, msg): + self._net_update([self._trans(el, False) for el in msg]) + + def get_history(self): + return dbus.Array([self._trans(el, True) for el in self._set._list]) + + add_history = receive_message + + def register_listener(self, L): + """Register a listener L(diffset). Every time another user adds items + to the set, L will be called with the set of new items as a SortedSet.""" + self._listeners.append(L) + L(self._set.copy()) + + def _trigger(self, s): + for L in self._listeners: + L(s) + + def __repr__(self): + return 'AddOnlySortedSet(' + repr(self._handler) + ', ' + repr(self._set) + ', ' + repr(self._trans) + ')' + + +def CausalHandler(): + """The CausalHandler is analogous to the UnorderedHandler, in that it + presents an interface with which to build a wide variety of objects with + distributed state. The CausalHandler is different from the Unordered in two + ways: + + 1. The send() method of an CausalHandler returns an index, which must be + stored by the CausalObject in connection with the information that was sent. + This index is a universal, fully-ordered, strictly causal identifier + for each message. + + 2. A CausalObject's receive_message method takes two arguments: the message + and its index. + + As a convenience, there is also + + 3. A get_index() method, which provides a new index on each call, always + higher than all previous indexes. + + CausalObjects are responsible for including index information in the + return value of get_history, and processing index information in add_history. + + It is noteworthy that CausalHandler is in fact implemented on _top_ of + UnorderedHandler. The imposition of ordering does not require lower-level + access to the network. This fact of implementation may change in the + future, but CausalObjects will not be able to tell the difference. + """ + _max64 = 2**64 + + def __init__(self, name, tube_box): + self._unordered = UnorderedObject(name, tube_box) + self._counter = 0 + + self._object = None + + def register(self, obj): + self._object = obj + self._unordered.register(self) + + def get_index(self): + """get_index returns a new index, higher than all previous indexes. + The primary reason to use get_index is if you wish two know the index + of an item _before_ calling send()""" + self._counter += 1 + return (self._counter, random.randrange(0, CausalHandler._max64)) + + def index_trans(self, index, pack): + """index_trans is a standard serialization translator for the index + format. Thanks to this translator, a CausalObject can and should treat + each index as an opaque, comparable object.""" + if pack: + return dbus.Tuple((dbus.UInt64(index[0]), dbus.UInt64(index[1])), 'tt') + else: + return (int(index[0]), int(index[1])) + + def send(self, msg, index=None): + """send() broadcasts a message to all other participants. If called + with one argument, send() broadcasts that message, along with a new + index, and returns the index. If called with two arguments, the second + may be an index, which will be used for this message. The index must + have been acquired using get_index(). In this case, the index must be + acquired immediately prior to calling send(). Otherwise, another + message may arrive in the interim, causing a violation of causality.""" + if index is None: + index = self.get_index() + self._unordered.send(dbus.Tuple((msg, self.index_trans(index, True)))) + return index + + def receive_message(self, msg): + m = msg[0] + index = self.index_trans(msg[1], False) + self._counter = max(self._counter, index[0]) + self._object.receive_message(m, index) + + def add_history(self, hist): + h = hist[0] + index = self.index_trans(hist[1], False) + self._counter = max(self._counter, index[0]) + self._object.add_history(h) + + def get_history(self): + h = self._object.get_history() + hist = dbus.Tuple((h, self.index_trans(self.get_index(), True))) + return + +class CausalDict: + """NOTE: CausalDict is UNTESTED. Other things may be buggy, but CausalDict + PROBABLY DOES NOT WORK. + + CausalDict is a distributed version of a Dict (hash table). All users keep + a copy of the entire table, so this is not a "Distributed Hash Table" + according to the terminology of the field. + + CausalDict permits all Dict operations, including removing keys and + modifying the value of existing keys. This would not be possible using an + Unordered approach, because two value assignments to the same key could + arrive in different orders for different users, leaving them in different + states at quiescence. + + To solve this problem, every assignment and removal is given a monotonically + increasing unique index, and whenever there is a conflict, the higher-index + operation wins. + + One side effect of this design is that deleted keys cannot be forgotten. If + an assignment operation is received whose index is lower than + the deletion's, then that assignment is considered obsolete and must not be + executed. + + To provide a mechanism for reducing memory usage, the clear() method has + been interpreted to remove not only all entries received so far, but also + all entries that will ever be received with index less than the current + index. + """ + ADD = 0 + DELETE = 1 + CLEAR = 2 + + def __init__(self, handler, initdict=(), key_translator=empty_translator, value_translator=empty_translator): + self._handler = handler + self._dict = dict(initdict) + self._clear = self._handler.get_index() #this must happen before index_dict initialization, so that self._clear is less than any index in index_dict + self._index_dict = dict(((k, self._handler.get_index()) for k in initdict)) + + self._listeners = [] + + self._key_trans = key_translator + self._val_trans = value_translator + + self.__contains__ = self._dict.__contains__ + #Special __delitem__ + self.__eq__ = self._dict.__eq__ + self.__ge__ = self._dict.__ge__ + self.__getitem__ = self._dict.__getitem__ + self.__gt__ = self._dict.__gt__ + self.__le__ = self._dict.__le__ + self.__len__ = self._dict.__len__ + self.__lt__ = self._dict.__lt__ + self.__ne__ = self._dict.__ne__ + # special __setitem__ + + #Special clear + self.copy = self._dict.copy + self.get = self._dict.get + self.has_key = self._dict.has_key + self.items = self._dict.items + self.iteritems = self._dict.iteritems + self.iterkeys = self._dict.iterkeys + self.itervalues = self._dict.itervalues + self.keys = self._dict.keys + #Special pop + #Special popitem + #special setdefault + #special update + self.values = self._dict.values + + self._handler.register(self) + + def __delitem__(self, key): + """Same as for dict""" + del self._dict[key] + n = self._handler.send(((dbus.Int32(CausalDict.DELETE), self._key_trans(key, True)))) + self._index_dict[key] = n + + def __setitem__(self, key, value): + """Same as for dict""" + self._dict[key] = value + n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(key, True), self._val_trans(value, True))])) + self._index_dict[key] = n + + def clear(self): + """Same as for dict""" + self._dict.clear() + self._index_dict.clear() + n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.CLEAR))])) + self._clear = n + + def pop(self, key, x=None): + """Same as for dict""" + t = (key in self._dict) + if x is None: + r = self._dict.pop(key) + else: + r = self._dict.pop(key, x) + + if t: + n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))])) + self._index_dict[key] = n + + return r + + def popitem(self): + """Same as for dict""" + p = self._dict.popitem() + key = p[0] + n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.DELETE), self._key_trans(key, True))])) + self._index_dict[key] = n + return p + + def setdefault(self, key, x): + """Same as for dict""" + if key not in self._dict: + self._dict[key] = x + n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(key, True), self._val_trans(value, True))])) + self._index_dict[key] = n + + def update(*args,**kargs): + """Same as for dict""" + d = dict() + d.update(*args,**kargs) + newpairs = [] + for p in d.items(): + if (p[0] not in self._dict) or (self._dict[p[0]] != p[1]): + newpairs.append(p) + self._dict[p[0]] = p[1] + n = self._handler.send(dbus.Array([(dbus.Int32(CausalDict.ADD), self._key_trans(p[0], True), self._val_trans(p[1], True)) for p in newpairs])) + + for p in newpairs: + self._index_dict[p[0]] = n + + def receive_message(self, msg, n): + if n > self._clear: + a = dict() + r = dict() + for m in msg: + flag = int(m[0]) #don't know length of m without checking flag + if flag == CausalDict.ADD: + key = self._key_trans(m[1], False) + if (key not in self._index_dict) or (self._index_dict[key] < n): + val = self._val_trans(m[2], False) + if key in self._dict: + r[key] = self._dict[key] + self._dict[key] = val + a[key] = val + self._index_dict[key] = n + elif flag == CausalDict.DELETE: + key = self._key_trans(m[1], False) + if key not in self._index_dict: + self._index_dict[key] = n + elif (self._index_dict[key] < n): + self._index_dict[key] = n + if key in self._dict: + r[key] = self._dict[key] + del self._dict[key] + elif flag == CausalDict.CLEAR: + self._clear = n + for (k, ind) in self._index_dict.items(): + if ind < self._clear: + del self._index_dict[k] + if k in self._dict: + r[k] = self._dict[k] + del self._dict[k] + if (len(a) > 0) or (len(r) > 0): + self._trigger(a,r) + + def get_history(self): + c = self._handler.index_trans(self._clear, True) + d = dbus.Array([(self._key_trans(p[0], True), self._val_trans(p[1], True)) for p in self._dict.items()]) + i = dbus.Array([(self._key_trans(p[0], True), self._handler.index_trans(p[1], True)) for p in self._index_dict.items()]) + return dbus.Tuple((c,d,i)) + + def add_history(self, hist): + c = self._handler.index_trans(hist[0], False) + d = dict(((self._key_trans(p[0], False), self._val_trans(p[1], False)) for p in hist[1])) + i = [(self._key_trans(p[0], False), self._handler.index_trans(p[1], False)) for p in hist[2]] + + a = dict() + r = dict() + + if c > self._clear: + self._clear = c + for (k, n) in self._index_dict.items(): + if n < self._clear: + del self._index_dict[k] + if k in self._dict: + r[k] = self._dict[k] + del self._dict[k] + + k_changed = [] + for (k, n) in i: + if (((k not in self._index_dict) and (n > self._clear)) or + ((k in self._index_dict) and (n > self._index_dict[k]))): + k_changed.append(k) + self._index_dict[k] = n + + for k in k_changed: + if k in d: + if (k in self._dict) and (self._dict[k] != d[k]): + r[k] = self._dict[k] + a[k] = d[k] + elif k not in self._dict: + a[k] = d[k] + self._dict[k] = d[k] + else: + if k in self._dict: + r[k] = self._dict[k] + del self._dict[k] + + if (len(a) > 0) or (len(r) > 0): + self._trigger(a,r) + + def register_listener(self, L): + """Register a change-listener L. Whenever another user makes a change + to this dict, L will be called with L(dict_added, dict_removed). The + two arguments are the dict of new entries, and the dict of entries that + have been deleted or overwritten.""" + self._listeners.append(L) + L(self._dict.copy(), dict()) + + def _trigger(self, added, removed): + for L in self._listeners: + L(added, removed) |