Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sharedstate.git/sharedstate/sharedstate.py
diff options
context:
space:
mode:
Diffstat (limited to 'sharedstate.git/sharedstate/sharedstate.py')
-rw-r--r--sharedstate.git/sharedstate/sharedstate.py532
1 files changed, 532 insertions, 0 deletions
diff --git a/sharedstate.git/sharedstate/sharedstate.py b/sharedstate.git/sharedstate/sharedstate.py
new file mode 100644
index 0000000..c69ace0
--- /dev/null
+++ b/sharedstate.git/sharedstate/sharedstate.py
@@ -0,0 +1,532 @@
+# sharedstate.py, classes to aid activities in sharing a state
+# Reinier Heeres, reinier@heeres.eu
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Change log:
+# 2007-05-22: rwh, first version
+
+"""General imports"""
+import types
+import copy
+
+import logging
+_logger = logging.getLogger('sharinghelper')
+
+"""DBus imorts"""
+import dbus
+import dbus.service
+import dbus.glib
+import gobject
+
+"""Sugar imports"""
+from sugar.presence import presenceservice
+from sugar.presence import activity
+from sugar.activity.activity import Activity
+
+"""Telepathy imports"""
+import telepathy
+import telepathy.client
+from tubeconn import TubeConnection
+
+from sharedobjects import *
+
+IFACE = "org.laptop.SharingHelper"
+
+class SharingHelper(dbus.service.Object):
+ """Class to help activities with state sharing"""
+
+ def __init__(self, actparent, opt={}):
+ self._activity = actparent
+ self._options = opt
+ self._shared_types = {}
+ self._shared_objects = {}
+ self._buddy_list = {}
+ self._service_name = ''
+ self._object_path = ''
+ self._tube = None
+ self._own_bus_name = None
+
+ self.register_shared_type('int', SharedPython, inc=False, autotype=int)
+ self.register_shared_type('long', SharedPython, inc=False, autotype=long)
+ self.register_shared_type('float', SharedPython, inc=False, autotype=float)
+ self.register_shared_type('string', SharedText, inc=True, autotype=str)
+ self.register_shared_type('ustring', SharedText, inc=True, autotype=unicode)
+ self.register_shared_type('dict', SharedDict, inc=True, autotype=dict)
+ self.register_shared_type('python', SharedPython, inc=True)
+
+ self._tp_support = TubePresenceSupport(self)
+ self._tp_support.connect_to_ps()
+
+ return
+
+ def __getitem__(self, key):
+ if type(key) != types.StringType:
+ raise TypeError, "SharingHelper.__getitem()__ only accepts string keys"
+
+ if isinstance(self._shared_objects[key], SharedDict):
+ return self._shared_objects[key]
+ else:
+ return self._shared_objects[key].get_value()
+
+ def __setitem__(self, key, val):
+ if type(key) != types.StringType:
+ raise TypeError, "SharingHelper.__setitem()__ only accepts string keys"
+ self._shared_objects[key].set_value(val)
+
+ def get_object(self, key):
+ if type(key) != types.StringType:
+ raise TypeError, "SharingHelper.get_object() only accepts string keys"
+ self._shared_objects[key]
+
+ def _tube_created_cb(self, tube, reqsync):
+ self._tube = tube
+ self._object_path = '/org/laptop/SharingHelper/%s' % (self._activity._shared_activity._id)
+
+ if self._tube is None:
+ _logger.error('setup_shared_tube(): no tube connection yet!')
+ return False
+
+ dbus.service.Object.__init__(self, self._tube, self._object_path)
+
+ _logger.info('Connected to bus as %s, object path %s', self._service_name, self._object_path)
+ self._activity._shared_activity.connect('buddy-joined', self._buddy_joined_cb)
+ self._activity._shared_activity.connect('buddy-left', self._buddy_left_cb)
+
+ self._tube.add_signal_receiver(self._receive_object, 'SendObject', \
+ IFACE, path=self._object_path, sender_keyword='sender')
+ self._tube.add_signal_receiver(self._receive_new_object, 'SendNewObject', \
+ IFACE, path=self._object_path, sender_keyword='sender')
+ self._tube.add_signal_receiver(self._receive_sync_request, 'RequestSync', \
+ IFACE, path=self._object_path, sender_keyword='sender')
+ self._tube.add_signal_receiver(self._receive_message, 'SendMessage', \
+ IFACE, path=self._object_path, sender_keyword='sender')
+
+ _logger.info('Tube setup successful!')
+
+ if reqsync:
+ self.synchronize()
+ if 'on_connect' in self._options:
+ self._options['on_connect']()
+
+ return True
+
+ def _buddy_joined_cb(self, activity, buddy):
+ """Callback for buddy joining"""
+ _logger.info('Buddy %s joined', buddy._properties["nick"])
+
+ key = buddy._properties["key"]
+ if key not in self._buddy_list:
+ self._buddy_list[key] = buddy
+
+ def _buddy_left_cb(self, activity, buddy):
+ """Callback for buddy leaving"""
+ _logger.info('Buddy %s left', buddy)
+
+ key = buddy._properties["key"]
+ if buddy in self._buddy_list:
+ del self._buddy_list[key]
+
+ def get_buddy_list(self):
+ return self._buddy_list
+
+ def tube_connected(self):
+ if self._tube is None:
+ return False
+
+ return True
+
+##########################################
+# Shared object collection managing functions
+##########################################
+
+ def register_shared_type(self, name, oclass, inc=1, autotype=None):
+ self._shared_types[name] = (oclass, inc, autotype)
+
+ def add_shared_object(self, o):
+ """Add shared object to list of objects to process"""
+ self._shared_objects[o._name] = o
+ return True
+
+ def create_shared_object(self, name, opt, iv=None):
+ """Create a new shared object"""
+
+# Auto-detect object type
+ if 'type' not in opt:
+ for key, (oclass, inc, autotype) in self._shared_types.iteritems():
+ if autotype is not None and isinstance(iv, autotype):
+ opt['type'] = key
+ opt['incremental'] = inc
+
+ _logger.debug("Shared object %s of type %s requested", name, opt['type'])
+
+ if opt['type'] not in self._shared_types:
+ _logger.error("Shared object type %s unknown", opt['type'])
+ return None
+
+ (oclass, inc, autotype) = self._shared_types[opt['type']]
+ obj = oclass(name, opt=opt, helper=self)
+
+ if obj is None:
+ return None
+
+ self.add_shared_object(obj)
+
+# Tell other instances about dynamically created objects
+ if 'dynamic' in opt and opt['dynamic'] is True:
+ self.SendNewObject(name, opt)
+
+ if iv is not None:
+ obj.set_value(iv)
+
+ return obj
+
+##########################################
+# Sending and receiving objects
+##########################################
+
+ def is_remote_sender(self, sender):
+ if self._own_bus_name is None:
+ self._own_bus_name = self._tube.get_unique_name()
+ _logger.info('Acquired unique name: %s', self._own_bus_name)
+
+ if sender == self._own_bus_name:
+ return False
+ else:
+ return True
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='subs')
+ def SendObject(self, name, versionid, incremental, objstr):
+ """Signal proxy to send an object"""
+ _logger.info('Sending object %s v%d on bus (inc=%d)', name, versionid, incremental)
+
+ def _receive_object(self, name, versionid, incremental, objstr, sender=None):
+ """Response to SendObject() signal; updates the state of the shared object.
+ If an update is requested for an object that does not exist yet we
+ must be in a confused state, so ask for a sync.
+ """
+ if not self.is_remote_sender(sender):
+ return True
+
+ _logger.info('receive_object(): Received object: %s v%d (inc=%d) from %s, %s', \
+ name, versionid, incremental, sender, objstr)
+ if name in self._shared_objects:
+ self._shared_objects[name].process_update(versionid, incremental, objstr, sender)
+ else:
+ _logger.error('receive_object(): Unknown object %s; requesting sync', name)
+ self.synchronize()
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='sa{sv}')
+ def SendNewObject(self, name, opts):
+ """Signal proxy to create a new object"""
+ _logger.info('Sending new object %s on bus', name)
+
+ def _receive_new_object(self, name, opts, sender=None):
+ """Response to SendNewObject() signal; creates a new shared object.
+ If creation is requested for an object that exists already, leave
+ it untouched.
+ If the 'objectcreated' option is set call that function
+ """
+ if not self.is_remote_sender(sender):
+ return True
+
+ _logger.info('receive_new_object(): Received new object: %s from %s', name, sender)
+ if name in self._shared_objects:
+ _logger.error('receive_new_object(): object already exists; leaving untouched')
+ else:
+ obj = self.create_shared_object(name, opts)
+ if 'objectcreated' in self._options:
+ self._options['objectcreated'](obj)
+
+ def synchronize(self):
+ self._in_sync = False
+ #self.RequestSync()
+ #gobject.timeout_add(1000, self._verify_sync)
+
+ def _verify_sync(self):
+ if not self._in_sync:
+ self.RequestSync()
+ return True
+ return False
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='')
+ def RequestSync(self):
+ """Signal proxy to request synchronization"""
+ _logger.info('Sending synchronization request...')
+
+ def _receive_sync_request(self, sender=None):
+ """Called when somebody sends a SyncRequest."""
+ if not self.is_remote_sender(sender):
+ return True
+
+ _logger.info('Received sync request from %s, sending objects', sender)
+ for name, obj in self._shared_objects.iteritems():
+# Sending options is only necessary for dynamically created objects; implement this later
+# sendopt = copy.deepcopy(obj.Options)
+# if 'changed' in sendopt:
+# del sendopt['changed']
+ self._tube.get_object(sender, self._object_path).ReceiveSyncObject( \
+ name, {'empty': True}, obj._version_id, obj.encode(obj._value))
+
+ @dbus.service.method(dbus_interface=IFACE, in_signature='sa{sv}us', out_signature='', sender_keyword='sender')
+ def ReceiveSyncObject(self, name, opt, versionid, objstr, sender=None):
+ """Function to receive full synchronisation. Used when joining an
+ existing activity or when in a confused state.
+ If an object does not exists yet it is created; the version is forced
+ """
+ _logger.debug('Receiving sync object %s from %s', name, sender)
+
+ if not self.is_remote_sender(sender):
+ return True
+
+ self._in_sync = True
+
+ if name not in self._shared_objects:
+ obj = self.create_shared_object(name, opts)
+
+ self._shared_objects[name].process_update(versionid, False, objstr, sender, force=True)
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='sd')
+ def LockObject(self, name, when):
+ """Signal proxy to request object lock"""
+ _logger.debug('Sending lock signal for %s', name)
+
+ def _receive_lock_object(self, name, when, sender=None):
+ """Called when somebody tries to lock an object"""
+ if name in self._shared_objects:
+ self._shared_objects[name].receive_lock(sender, when)
+ else:
+ _logger.error('Received lock signal for non-existing object %s', name)
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='s')
+ def UnlockObject(self, name):
+ """Signal proxy to signal release of an object lock"""
+ _logger.debug('Sending unlock signal for %s', name)
+
+ def _receive_unlock_object(self, name, sender=None):
+ """Called when somebody signals unlocking of an object"""
+ if name in self._shared_objects:
+ self._shared_objects[name].receive_unlock()
+ else:
+ _logger.error('Received unlock signal for non-existing object %s', name)
+
+##########################################
+# Simple message sending between apps
+##########################################
+
+ @dbus.service.signal(dbus_interface=IFACE, signature='sv')
+ def SendMessage(self, msg, val):
+ """Signal proxy to send simple messages"""
+
+ def send_message(self, msg, val, to=None):
+ _logger.debug('send_message(msg=%s, val=%r)', msg, val)
+ try:
+ if to is None:
+ self.SendMessage(msg, val)
+ except Exception, inst:
+ _logger.error('send_message: %s', inst)
+
+ def _receive_message(self, msg, val, sender=None):
+ if not self.is_remote_sender(sender):
+ return True
+
+ _logger.info('_receive_message(%s, %r)', msg, val)
+
+ if 'receive_message' in self._options:
+ self._options['receive_message'](msg, val)
+
+##########################################
+# Functions for turn-based activities
+##########################################
+
+class TurnBased:
+ def __init__(self, helper):
+ self._helper = helper
+ self._helper.create_shared_object('_turntoken', {
+ 'locked': self.my_turn,
+ 'unlocked': self.turn_changed,
+ 'locklost': self.turn_problem,
+ })
+ self._playing = False
+ self._playing_buddies = []
+ self._watching_buddies = []
+
+ def set_number_of_players(self, minp, maxp=None):
+ self._started = False
+ self._min_players = minp
+ self._max_players = maxp
+ self.check_ready()
+
+ def set_turn_callbacks(self, d):
+ """Set callbacks for turn-based functions:
+ ready: called when enough players to start
+ myturn: called when it's this instance's turn
+ """
+ self._turn_callbacks = d
+
+ def check_ready(self):
+ if len(self._helper.get_buddy_list()) >= self._min_players:
+ self._turn_callbacks['ready']()
+ self._started = True
+
+ def start(self):
+ self._playing = True
+ self._playing_buddies = self._helper.get_buddy_list()
+ self._current_player = -1
+ self.turn_changed(self, None)
+
+ def who_is_next_player(self):
+ self._current_player = (self._current_player + 1) % len(self._playing_buddies)
+ return self._playing_buddies[self._current_player]
+
+ def turn_changed(self, sender):
+ if sender is not None and 'processturn' in self._turn_callbacks:
+ self._turn_callbacks['processturn'](sender, self.get_turn_data())
+ if self.who_is_next_player() == self._helper._own_dbus_name:
+ self._helper.get_object('_turntoken').lock()
+ if 'myturn' in self._turn_callbacks:
+ self._turn_callbacks['myturn']()
+
+ def release_turn(self, turndata):
+ self._helper['_turndata'] = turndata
+ self._helper.get_object('_turntoken').unlock()
+
+ return
+
+ def turn_problem(self, sender):
+ return
+
+
+
+##########################################
+# Functions for Tube/Presence support
+##########################################
+
+class TubePresenceSupport:
+
+ def __init__(self, parent):
+ self._parent = parent
+ self._activity = parent._activity
+
+ self._request_sync = False
+
+ return
+
+ def connect_to_ps(self):
+ """Connect to the presence service"""
+
+ self._activity.connect('shared', self._shared_cb)
+
+ self._ps = presenceservice.get_instance()
+ if self._activity._shared_activity:
+ # We are trying to join, call this if it worked
+ self._activity.connect('joined', self._joined_cb)
+ if self._activity.get_shared():
+ self.joined_cb()
+
+ def setup_shared_activity(self):
+ """Setup things to talk to other SharingHelpers """
+
+ if self._activity._shared_activity is None:
+ _logger.error('setup_shared_activity(): no _shared_activity yey!')
+ return False
+
+ self._service_name = 'org.laptop.SharingHelper'
+
+# Get basic telepathy stuff
+ name, path = self._ps.get_preferred_connection()
+ _logger.info('Preferred connection: name %s, path %s', name, path)
+ self._tp_conn_name = name
+ self._tp_conn_path = path
+ self._tp_conn = telepathy.client.Connection(name, path)
+
+# Setup tubes channel
+ self._tube_service_name = '%s.Tube' % (self._service_name)
+ bus_name, conn_path, channel_paths = self._activity._shared_activity.get_channels()
+ room = None
+ self._text_channel = None
+ self._tube_channel = None
+ for channel_path in channel_paths:
+ channel = telepathy.client.Channel(bus_name, channel_path)
+ htype, handle = channel.GetHandle()
+ if htype == telepathy.HANDLE_TYPE_ROOM:
+ _logger.info('Found room with handle %d', handle)
+ room = handle
+ ctype = channel.GetChannelType()
+ if ctype == telepathy.CHANNEL_TYPE_TUBES:
+ _logger.info('Found Tubes channel %s', channel_path)
+ self._tube_channel = channel
+ elif ctype == telepathy.CHANNEL_TYPE_TEXT:
+ _logger.info('Found Text channel %s', channel_path)
+ self._text_channel = channel
+
+ if room is None:
+ _logger.error('Didn\'t find room')
+
+ if self._tube_channel is None:
+ self._tube_channel = self._tp_conn.request_channel( \
+ telepathy.CHANNEL_TYPE_TUBES, \
+ telepathy.HANDLE_TYPE_ROOM, room, True)
+
+ self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube', self._new_tube_cb)
+
+ def _shared_cb(self, activity):
+ """Callback for when our activity is shared"""
+ _logger.info('Activity shared')
+ self.setup_shared_activity()
+ self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube( \
+ self._tube_service_name, {})
+
+ def _joined_cb(self, activity):
+ """Callback for when we join an existing activity"""
+ _logger.info('Joined existing activity')
+ self._request_sync = True
+ self.setup_shared_activity()
+
+ _logger.info('Getting tubes list...')
+ self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].ListTubes( \
+ reply_handler=self._list_tubes_reply_cb,
+ error_handler=self._list_tubes_error_cb)
+
+ def _new_tube_cb(self, id, initiator, type, service, params, state):
+ """Callback for when a new tube is created"""
+ _logger.info('new_tube_cb(): id=%d, init=%d, type=%d, svc=%s, state=%d, _request_sync=%r', id, initiator, type, service, state, self._request_sync)
+ _logger.info('Expected: type=%d, svc=%s, state=%d', telepathy.TUBE_TYPE_DBUS, self._tube_service_name, telepathy.TUBE_STATE_LOCAL_PENDING)
+ if type == telepathy.TUBE_TYPE_DBUS and service == self._tube_service_name:
+ if state == telepathy.TUBE_STATE_LOCAL_PENDING:
+ self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id)
+ self._tube = TubeConnection(self._tp_conn, \
+ self._tube_channel[telepathy.CHANNEL_TYPE_TUBES], id, \
+ group_iface=self._text_channel[telepathy.CHANNEL_INTERFACE_GROUP])
+
+ if self._tube is None:
+ _logger.error('Don\'t have a tube channel, not connecting')
+ return False
+
+ self._parent._tube_created_cb(self._tube, self._request_sync)
+
+ def _list_tubes_reply_cb(self, tubes):
+ """Callback for when requesting an existing tube"""
+ _logger.debug('_list_tubes_reply_cb(): %r', tubes)
+ for tube_info in tubes:
+ _logger.debug('tube_info: %r', tube_info)
+ self._new_tube_cb(*tube_info)
+
+ def _list_tubes_error_cb(self, e):
+ _logger.error('list_tubes() failed: %s', e)
+
+ def _tube_participant_change_cb(self, added, removed):
+ _logger.info('Adding participants: %r', added)
+ _logger.info('Removing participants: %r', removed)