Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/messenger.py
diff options
context:
space:
mode:
Diffstat (limited to 'messenger.py')
-rw-r--r--messenger.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/messenger.py b/messenger.py
index 1bad33f..10701c5 100644
--- a/messenger.py
+++ b/messenger.py
@@ -12,6 +12,165 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+import dbus
+import pickle
+import cjson
+import logging
+from dbus.gobject_service import ExportedGObject
+from dbus.service import method, signal
+
+from sugar.presence import presenceservice
+
+import model
+
+logger = logging.getLogger('flipsticks')
+
SERVICE = 'org.freedesktop.Telepathy.Tube.Connect'
IFACE = SERVICE
PATH = '/org/freedesktop/Telepathy/Tube/Connect'
+
+class Slot:
+ def __init__(self, seqno=-1, owner=None):
+ self.seqno = seqno
+ self.owner = owner
+
+class Messenger(ExportedGObject):
+ def __init__(self, tube, initiator, view):
+ ExportedGObject.__init__(self, tube, PATH)
+
+ self.initiator = initiator
+ self._tube = tube
+ self._entered = False
+ self._slots = []
+ self._view = view
+
+ self._view.connect('frame-changed', self._frame_changed_cb)
+ self._tube.watch_participants(self._participant_change_cb)
+
+ def _participant_change_cb(self, added, removed):
+ if not self._entered and added:
+ self.me = self._tube.get_unique_name()
+
+ for i in range(len(model.keys)):
+ self._slots.append(Slot())
+
+ if self.initiator:
+ self._tube.add_signal_receiver(self._ping_cb, '_ping', IFACE,
+ path=PATH, sender_keyword='sender')
+ for i in self._slots:
+ i.seqno = 0
+ i.owner = self.me
+ else:
+ self._pong_handle = self._tube.add_signal_receiver(
+ self._pong_cb, '_pong', IFACE, path=PATH,
+ sender_keyword='sender')
+ self._ping()
+
+ self._tube.add_signal_receiver(self._notify_cb, '_notify', IFACE,
+ path=PATH, sender_keyword='sender')
+ self._entered = True
+
+ # incomers' signal to retrieve initial snapshot
+ @signal(IFACE, signature='')
+ def _ping(self):
+ logger.debug('send ping')
+ pass
+
+ # object is ready to post snapshot to incomer
+ @signal(IFACE, signature='')
+ def _pong(self):
+ logger.debug('send pong')
+ pass
+
+ # slot was changed
+ @signal(IFACE, signature='iiss')
+ def _notify(self, slot, seqno, sender, raw):
+ pass
+
+ # the whole list of slots for incomers
+ @method(dbus_interface=IFACE, in_signature='', out_signature='a(iss)',
+ sender_keyword='sender')
+ def _snapshot(self, sender=None):
+ logger.debug('_snapshot requested from %s' % sender)
+ out = []
+
+ for i, slot in enumerate(self._slots):
+ out.append((slot.seqno, slot.owner,
+ cjson.encode(model.keys[i].collect())))
+
+ return out
+
+ def _ping_cb(self, sender=None):
+ if sender == self.me:
+ return
+ logger.debug('_ping received from %s' % sender)
+ self._pong()
+
+ def _pong_cb(self, sender=None):
+ if sender == self.me:
+ return
+ logger.debug('_pong sent from %s' % sender)
+
+ # we've got source for _snapshot and don't need _pong anymore
+ self._tube.remove_signal_receiver(self._pong_handle)
+ self._pong_handle = None
+
+ remote = self._tube.get_object(sender, PATH)
+ rawlist = remote._snapshot()
+
+ logger.debug('snapshot received len=%d' % len(rawlist))
+
+ for i, (seqno, owner, raw) in enumerate(rawlist):
+ self._receive(i, seqno, owner, raw, sender)
+
+ # we are ready to receive _snapshot requests
+ self._tube.add_signal_receiver(self._ping_cb, '_ping', IFACE,
+ path=PATH, sender_keyword='sender')
+
+ def _notify_cb(self, slot, seqno, owner, raw, sender=None):
+ if sender == self.me:
+ return
+ logger.debug('_notify requested from %s' % sender)
+ self._receive(slot, seqno, owner, raw, sender)
+
+ def _receive(self, slot, seqno, owner, raw, sender):
+ cur = self._slots[slot]
+ new = Slot(seqno, owner)
+
+ logger.debug('object received slot=%s seqno=%d owner=%s from %s'
+ % (slot, new.seqno, new.owner, sender))
+
+ if cur.seqno > new.seqno:
+ logger.debug('trying to rewrite newer value by older one')
+ return
+ elif cur.seqno == new.seqno:
+ # arrived value was sent at the same time as current one
+ if cur.owner > new.owner:
+ logger.debug('current value is higher ranked then arrived')
+ return
+ if cur.owner == self.me:
+ # we sent current and arrived value rewrites it
+ logger.debug('resend current with higher seqno')
+ self._send(slot)
+ return
+ else:
+ logger.debug('just discard low rank')
+ return
+ else:
+ logger.debug('accept higher seqno')
+
+ self._view.props.keyframe = (slot, model.StoredFrame(cjson.decode(raw)))
+ self._slots[slot] = new
+
+ def _send(self, slot_num):
+ slot = self._slots[slot_num]
+ slot.seqno += 1
+ slot.owner = self.me
+
+ self._notify(slot_num, slot.seqno, slot.owner,
+ cjson.encode(model.keys[slot_num].collect()))
+
+ logger.debug('_send slot=%s seqno=%d' % (slot_num, slot.seqno))
+
+ def _frame_changed_cb(self, sender, slot):
+ self._send(slot)