diff options
Diffstat (limited to 'sharedstate.git/sharedstate/sharedstate.py')
-rw-r--r-- | sharedstate.git/sharedstate/sharedstate.py | 532 |
1 files changed, 532 insertions, 0 deletions
diff --git a/sharedstate.git/sharedstate/sharedstate.py b/sharedstate.git/sharedstate/sharedstate.py new file mode 100644 index 0000000..c69ace0 --- /dev/null +++ b/sharedstate.git/sharedstate/sharedstate.py @@ -0,0 +1,532 @@ +# sharedstate.py, classes to aid activities in sharing a state +# Reinier Heeres, reinier@heeres.eu +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# +# Change log: +# 2007-05-22: rwh, first version + +"""General imports""" +import types +import copy + +import logging +_logger = logging.getLogger('sharinghelper') + +"""DBus imorts""" +import dbus +import dbus.service +import dbus.glib +import gobject + +"""Sugar imports""" +from sugar.presence import presenceservice +from sugar.presence import activity +from sugar.activity.activity import Activity + +"""Telepathy imports""" +import telepathy +import telepathy.client +from tubeconn import TubeConnection + +from sharedobjects import * + +IFACE = "org.laptop.SharingHelper" + +class SharingHelper(dbus.service.Object): + """Class to help activities with state sharing""" + + def __init__(self, actparent, opt={}): + self._activity = actparent + self._options = opt + self._shared_types = {} + self._shared_objects = {} + self._buddy_list = {} + self._service_name = '' + self._object_path = '' + self._tube = None + self._own_bus_name = None + + self.register_shared_type('int', SharedPython, inc=False, autotype=int) + self.register_shared_type('long', SharedPython, inc=False, autotype=long) + self.register_shared_type('float', SharedPython, inc=False, autotype=float) + self.register_shared_type('string', SharedText, inc=True, autotype=str) + self.register_shared_type('ustring', SharedText, inc=True, autotype=unicode) + self.register_shared_type('dict', SharedDict, inc=True, autotype=dict) + self.register_shared_type('python', SharedPython, inc=True) + + self._tp_support = TubePresenceSupport(self) + self._tp_support.connect_to_ps() + + return + + def __getitem__(self, key): + if type(key) != types.StringType: + raise TypeError, "SharingHelper.__getitem()__ only accepts string keys" + + if isinstance(self._shared_objects[key], SharedDict): + return self._shared_objects[key] + else: + return self._shared_objects[key].get_value() + + def __setitem__(self, key, val): + if type(key) != types.StringType: + raise TypeError, "SharingHelper.__setitem()__ only accepts string keys" + self._shared_objects[key].set_value(val) + + def get_object(self, key): + if type(key) != types.StringType: + raise TypeError, "SharingHelper.get_object() only accepts string keys" + self._shared_objects[key] + + def _tube_created_cb(self, tube, reqsync): + self._tube = tube + self._object_path = '/org/laptop/SharingHelper/%s' % (self._activity._shared_activity._id) + + if self._tube is None: + _logger.error('setup_shared_tube(): no tube connection yet!') + return False + + dbus.service.Object.__init__(self, self._tube, self._object_path) + + _logger.info('Connected to bus as %s, object path %s', self._service_name, self._object_path) + self._activity._shared_activity.connect('buddy-joined', self._buddy_joined_cb) + self._activity._shared_activity.connect('buddy-left', self._buddy_left_cb) + + self._tube.add_signal_receiver(self._receive_object, 'SendObject', \ + IFACE, path=self._object_path, sender_keyword='sender') + self._tube.add_signal_receiver(self._receive_new_object, 'SendNewObject', \ + IFACE, path=self._object_path, sender_keyword='sender') + self._tube.add_signal_receiver(self._receive_sync_request, 'RequestSync', \ + IFACE, path=self._object_path, sender_keyword='sender') + self._tube.add_signal_receiver(self._receive_message, 'SendMessage', \ + IFACE, path=self._object_path, sender_keyword='sender') + + _logger.info('Tube setup successful!') + + if reqsync: + self.synchronize() + if 'on_connect' in self._options: + self._options['on_connect']() + + return True + + def _buddy_joined_cb(self, activity, buddy): + """Callback for buddy joining""" + _logger.info('Buddy %s joined', buddy._properties["nick"]) + + key = buddy._properties["key"] + if key not in self._buddy_list: + self._buddy_list[key] = buddy + + def _buddy_left_cb(self, activity, buddy): + """Callback for buddy leaving""" + _logger.info('Buddy %s left', buddy) + + key = buddy._properties["key"] + if buddy in self._buddy_list: + del self._buddy_list[key] + + def get_buddy_list(self): + return self._buddy_list + + def tube_connected(self): + if self._tube is None: + return False + + return True + +########################################## +# Shared object collection managing functions +########################################## + + def register_shared_type(self, name, oclass, inc=1, autotype=None): + self._shared_types[name] = (oclass, inc, autotype) + + def add_shared_object(self, o): + """Add shared object to list of objects to process""" + self._shared_objects[o._name] = o + return True + + def create_shared_object(self, name, opt, iv=None): + """Create a new shared object""" + +# Auto-detect object type + if 'type' not in opt: + for key, (oclass, inc, autotype) in self._shared_types.iteritems(): + if autotype is not None and isinstance(iv, autotype): + opt['type'] = key + opt['incremental'] = inc + + _logger.debug("Shared object %s of type %s requested", name, opt['type']) + + if opt['type'] not in self._shared_types: + _logger.error("Shared object type %s unknown", opt['type']) + return None + + (oclass, inc, autotype) = self._shared_types[opt['type']] + obj = oclass(name, opt=opt, helper=self) + + if obj is None: + return None + + self.add_shared_object(obj) + +# Tell other instances about dynamically created objects + if 'dynamic' in opt and opt['dynamic'] is True: + self.SendNewObject(name, opt) + + if iv is not None: + obj.set_value(iv) + + return obj + +########################################## +# Sending and receiving objects +########################################## + + def is_remote_sender(self, sender): + if self._own_bus_name is None: + self._own_bus_name = self._tube.get_unique_name() + _logger.info('Acquired unique name: %s', self._own_bus_name) + + if sender == self._own_bus_name: + return False + else: + return True + + @dbus.service.signal(dbus_interface=IFACE, signature='subs') + def SendObject(self, name, versionid, incremental, objstr): + """Signal proxy to send an object""" + _logger.info('Sending object %s v%d on bus (inc=%d)', name, versionid, incremental) + + def _receive_object(self, name, versionid, incremental, objstr, sender=None): + """Response to SendObject() signal; updates the state of the shared object. + If an update is requested for an object that does not exist yet we + must be in a confused state, so ask for a sync. + """ + if not self.is_remote_sender(sender): + return True + + _logger.info('receive_object(): Received object: %s v%d (inc=%d) from %s, %s', \ + name, versionid, incremental, sender, objstr) + if name in self._shared_objects: + self._shared_objects[name].process_update(versionid, incremental, objstr, sender) + else: + _logger.error('receive_object(): Unknown object %s; requesting sync', name) + self.synchronize() + + @dbus.service.signal(dbus_interface=IFACE, signature='sa{sv}') + def SendNewObject(self, name, opts): + """Signal proxy to create a new object""" + _logger.info('Sending new object %s on bus', name) + + def _receive_new_object(self, name, opts, sender=None): + """Response to SendNewObject() signal; creates a new shared object. + If creation is requested for an object that exists already, leave + it untouched. + If the 'objectcreated' option is set call that function + """ + if not self.is_remote_sender(sender): + return True + + _logger.info('receive_new_object(): Received new object: %s from %s', name, sender) + if name in self._shared_objects: + _logger.error('receive_new_object(): object already exists; leaving untouched') + else: + obj = self.create_shared_object(name, opts) + if 'objectcreated' in self._options: + self._options['objectcreated'](obj) + + def synchronize(self): + self._in_sync = False + #self.RequestSync() + #gobject.timeout_add(1000, self._verify_sync) + + def _verify_sync(self): + if not self._in_sync: + self.RequestSync() + return True + return False + + @dbus.service.signal(dbus_interface=IFACE, signature='') + def RequestSync(self): + """Signal proxy to request synchronization""" + _logger.info('Sending synchronization request...') + + def _receive_sync_request(self, sender=None): + """Called when somebody sends a SyncRequest.""" + if not self.is_remote_sender(sender): + return True + + _logger.info('Received sync request from %s, sending objects', sender) + for name, obj in self._shared_objects.iteritems(): +# Sending options is only necessary for dynamically created objects; implement this later +# sendopt = copy.deepcopy(obj.Options) +# if 'changed' in sendopt: +# del sendopt['changed'] + self._tube.get_object(sender, self._object_path).ReceiveSyncObject( \ + name, {'empty': True}, obj._version_id, obj.encode(obj._value)) + + @dbus.service.method(dbus_interface=IFACE, in_signature='sa{sv}us', out_signature='', sender_keyword='sender') + def ReceiveSyncObject(self, name, opt, versionid, objstr, sender=None): + """Function to receive full synchronisation. Used when joining an + existing activity or when in a confused state. + If an object does not exists yet it is created; the version is forced + """ + _logger.debug('Receiving sync object %s from %s', name, sender) + + if not self.is_remote_sender(sender): + return True + + self._in_sync = True + + if name not in self._shared_objects: + obj = self.create_shared_object(name, opts) + + self._shared_objects[name].process_update(versionid, False, objstr, sender, force=True) + + @dbus.service.signal(dbus_interface=IFACE, signature='sd') + def LockObject(self, name, when): + """Signal proxy to request object lock""" + _logger.debug('Sending lock signal for %s', name) + + def _receive_lock_object(self, name, when, sender=None): + """Called when somebody tries to lock an object""" + if name in self._shared_objects: + self._shared_objects[name].receive_lock(sender, when) + else: + _logger.error('Received lock signal for non-existing object %s', name) + + @dbus.service.signal(dbus_interface=IFACE, signature='s') + def UnlockObject(self, name): + """Signal proxy to signal release of an object lock""" + _logger.debug('Sending unlock signal for %s', name) + + def _receive_unlock_object(self, name, sender=None): + """Called when somebody signals unlocking of an object""" + if name in self._shared_objects: + self._shared_objects[name].receive_unlock() + else: + _logger.error('Received unlock signal for non-existing object %s', name) + +########################################## +# Simple message sending between apps +########################################## + + @dbus.service.signal(dbus_interface=IFACE, signature='sv') + def SendMessage(self, msg, val): + """Signal proxy to send simple messages""" + + def send_message(self, msg, val, to=None): + _logger.debug('send_message(msg=%s, val=%r)', msg, val) + try: + if to is None: + self.SendMessage(msg, val) + except Exception, inst: + _logger.error('send_message: %s', inst) + + def _receive_message(self, msg, val, sender=None): + if not self.is_remote_sender(sender): + return True + + _logger.info('_receive_message(%s, %r)', msg, val) + + if 'receive_message' in self._options: + self._options['receive_message'](msg, val) + +########################################## +# Functions for turn-based activities +########################################## + +class TurnBased: + def __init__(self, helper): + self._helper = helper + self._helper.create_shared_object('_turntoken', { + 'locked': self.my_turn, + 'unlocked': self.turn_changed, + 'locklost': self.turn_problem, + }) + self._playing = False + self._playing_buddies = [] + self._watching_buddies = [] + + def set_number_of_players(self, minp, maxp=None): + self._started = False + self._min_players = minp + self._max_players = maxp + self.check_ready() + + def set_turn_callbacks(self, d): + """Set callbacks for turn-based functions: + ready: called when enough players to start + myturn: called when it's this instance's turn + """ + self._turn_callbacks = d + + def check_ready(self): + if len(self._helper.get_buddy_list()) >= self._min_players: + self._turn_callbacks['ready']() + self._started = True + + def start(self): + self._playing = True + self._playing_buddies = self._helper.get_buddy_list() + self._current_player = -1 + self.turn_changed(self, None) + + def who_is_next_player(self): + self._current_player = (self._current_player + 1) % len(self._playing_buddies) + return self._playing_buddies[self._current_player] + + def turn_changed(self, sender): + if sender is not None and 'processturn' in self._turn_callbacks: + self._turn_callbacks['processturn'](sender, self.get_turn_data()) + if self.who_is_next_player() == self._helper._own_dbus_name: + self._helper.get_object('_turntoken').lock() + if 'myturn' in self._turn_callbacks: + self._turn_callbacks['myturn']() + + def release_turn(self, turndata): + self._helper['_turndata'] = turndata + self._helper.get_object('_turntoken').unlock() + + return + + def turn_problem(self, sender): + return + + + +########################################## +# Functions for Tube/Presence support +########################################## + +class TubePresenceSupport: + + def __init__(self, parent): + self._parent = parent + self._activity = parent._activity + + self._request_sync = False + + return + + def connect_to_ps(self): + """Connect to the presence service""" + + self._activity.connect('shared', self._shared_cb) + + self._ps = presenceservice.get_instance() + if self._activity._shared_activity: + # We are trying to join, call this if it worked + self._activity.connect('joined', self._joined_cb) + if self._activity.get_shared(): + self.joined_cb() + + def setup_shared_activity(self): + """Setup things to talk to other SharingHelpers """ + + if self._activity._shared_activity is None: + _logger.error('setup_shared_activity(): no _shared_activity yey!') + return False + + self._service_name = 'org.laptop.SharingHelper' + +# Get basic telepathy stuff + name, path = self._ps.get_preferred_connection() + _logger.info('Preferred connection: name %s, path %s', name, path) + self._tp_conn_name = name + self._tp_conn_path = path + self._tp_conn = telepathy.client.Connection(name, path) + +# Setup tubes channel + self._tube_service_name = '%s.Tube' % (self._service_name) + bus_name, conn_path, channel_paths = self._activity._shared_activity.get_channels() + room = None + self._text_channel = None + self._tube_channel = None + for channel_path in channel_paths: + channel = telepathy.client.Channel(bus_name, channel_path) + htype, handle = channel.GetHandle() + if htype == telepathy.HANDLE_TYPE_ROOM: + _logger.info('Found room with handle %d', handle) + room = handle + ctype = channel.GetChannelType() + if ctype == telepathy.CHANNEL_TYPE_TUBES: + _logger.info('Found Tubes channel %s', channel_path) + self._tube_channel = channel + elif ctype == telepathy.CHANNEL_TYPE_TEXT: + _logger.info('Found Text channel %s', channel_path) + self._text_channel = channel + + if room is None: + _logger.error('Didn\'t find room') + + if self._tube_channel is None: + self._tube_channel = self._tp_conn.request_channel( \ + telepathy.CHANNEL_TYPE_TUBES, \ + telepathy.HANDLE_TYPE_ROOM, room, True) + + self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube', self._new_tube_cb) + + def _shared_cb(self, activity): + """Callback for when our activity is shared""" + _logger.info('Activity shared') + self.setup_shared_activity() + self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube( \ + self._tube_service_name, {}) + + def _joined_cb(self, activity): + """Callback for when we join an existing activity""" + _logger.info('Joined existing activity') + self._request_sync = True + self.setup_shared_activity() + + _logger.info('Getting tubes list...') + self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].ListTubes( \ + reply_handler=self._list_tubes_reply_cb, + error_handler=self._list_tubes_error_cb) + + def _new_tube_cb(self, id, initiator, type, service, params, state): + """Callback for when a new tube is created""" + _logger.info('new_tube_cb(): id=%d, init=%d, type=%d, svc=%s, state=%d, _request_sync=%r', id, initiator, type, service, state, self._request_sync) + _logger.info('Expected: type=%d, svc=%s, state=%d', telepathy.TUBE_TYPE_DBUS, self._tube_service_name, telepathy.TUBE_STATE_LOCAL_PENDING) + if type == telepathy.TUBE_TYPE_DBUS and service == self._tube_service_name: + if state == telepathy.TUBE_STATE_LOCAL_PENDING: + self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id) + self._tube = TubeConnection(self._tp_conn, \ + self._tube_channel[telepathy.CHANNEL_TYPE_TUBES], id, \ + group_iface=self._text_channel[telepathy.CHANNEL_INTERFACE_GROUP]) + + if self._tube is None: + _logger.error('Don\'t have a tube channel, not connecting') + return False + + self._parent._tube_created_cb(self._tube, self._request_sync) + + def _list_tubes_reply_cb(self, tubes): + """Callback for when requesting an existing tube""" + _logger.debug('_list_tubes_reply_cb(): %r', tubes) + for tube_info in tubes: + _logger.debug('tube_info: %r', tube_info) + self._new_tube_cb(*tube_info) + + def _list_tubes_error_cb(self, e): + _logger.error('list_tubes() failed: %s', e) + + def _tube_participant_change_cb(self, added, removed): + _logger.info('Adding participants: %r', added) + _logger.info('Removing participants: %r', removed) |