Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sharedstate.git/sharedstate/sharedstate.py
blob: c69ace0726df0806ddd040c725ac2aadd7cf06ad (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
# sharedstate.py, classes to aid activities in sharing a state
# Reinier Heeres, reinier@heeres.eu
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
#
# Change log:
#   2007-05-22: rwh, first version

"""General imports"""
import types
import copy

import logging
_logger = logging.getLogger('sharinghelper')

"""DBus imorts"""
import dbus
import dbus.service
import dbus.glib
import gobject

"""Sugar imports"""
from sugar.presence import presenceservice
from sugar.presence import activity
from sugar.activity.activity import Activity

"""Telepathy imports"""
import telepathy
import telepathy.client
from tubeconn import TubeConnection

from sharedobjects import *

IFACE = "org.laptop.SharingHelper"

class SharingHelper(dbus.service.Object):
    """Class to help activities with state sharing"""

    def __init__(self, actparent, opt={}):
        self._activity = actparent
        self._options = opt
        self._shared_types = {}
        self._shared_objects = {}
        self._buddy_list = {}
        self._service_name = ''
        self._object_path = ''
        self._tube = None
        self._own_bus_name = None

        self.register_shared_type('int', SharedPython, inc=False, autotype=int)
        self.register_shared_type('long', SharedPython, inc=False, autotype=long)
        self.register_shared_type('float', SharedPython, inc=False, autotype=float)
        self.register_shared_type('string', SharedText, inc=True, autotype=str)
        self.register_shared_type('ustring', SharedText, inc=True, autotype=unicode)
        self.register_shared_type('dict', SharedDict, inc=True, autotype=dict)
        self.register_shared_type('python', SharedPython, inc=True)

        self._tp_support = TubePresenceSupport(self)
        self._tp_support.connect_to_ps()

        return

    def __getitem__(self, key):
        if type(key) != types.StringType:
            raise TypeError, "SharingHelper.__getitem()__ only accepts string keys"

        if isinstance(self._shared_objects[key], SharedDict):
            return self._shared_objects[key]
        else:
            return self._shared_objects[key].get_value()

    def __setitem__(self, key, val):
        if type(key) != types.StringType:
            raise TypeError, "SharingHelper.__setitem()__ only accepts string keys"
        self._shared_objects[key].set_value(val)

    def get_object(self, key):
        if type(key) != types.StringType:
            raise TypeError, "SharingHelper.get_object() only accepts string keys"
        self._shared_objects[key]

    def _tube_created_cb(self, tube, reqsync):
        self._tube = tube
        self._object_path = '/org/laptop/SharingHelper/%s' % (self._activity._shared_activity._id)

        if self._tube is None:
            _logger.error('setup_shared_tube(): no tube connection yet!')
            return False

        dbus.service.Object.__init__(self, self._tube, self._object_path)

        _logger.info('Connected to bus as %s, object path %s', self._service_name, self._object_path)
        self._activity._shared_activity.connect('buddy-joined', self._buddy_joined_cb)
        self._activity._shared_activity.connect('buddy-left', self._buddy_left_cb)

        self._tube.add_signal_receiver(self._receive_object, 'SendObject', \
            IFACE, path=self._object_path, sender_keyword='sender')
        self._tube.add_signal_receiver(self._receive_new_object, 'SendNewObject', \
            IFACE, path=self._object_path, sender_keyword='sender')
        self._tube.add_signal_receiver(self._receive_sync_request, 'RequestSync', \
            IFACE, path=self._object_path, sender_keyword='sender')
        self._tube.add_signal_receiver(self._receive_message, 'SendMessage', \
            IFACE, path=self._object_path, sender_keyword='sender')

        _logger.info('Tube setup successful!')

        if reqsync:
            self.synchronize()
            if 'on_connect' in self._options:
                self._options['on_connect']()

        return True

    def _buddy_joined_cb(self, activity, buddy):
        """Callback for buddy joining"""
        _logger.info('Buddy %s joined', buddy._properties["nick"])

        key = buddy._properties["key"]
        if key not in self._buddy_list:
            self._buddy_list[key] = buddy

    def _buddy_left_cb(self, activity, buddy):
        """Callback for buddy leaving"""
        _logger.info('Buddy %s left', buddy)

        key = buddy._properties["key"]
        if buddy in self._buddy_list:
            del self._buddy_list[key]

    def get_buddy_list(self):
        return self._buddy_list

    def tube_connected(self):
        if self._tube is None:
            return False

        return True

##########################################
# Shared object collection managing functions
##########################################

    def register_shared_type(self, name, oclass, inc=1, autotype=None):
        self._shared_types[name] = (oclass, inc, autotype)

    def add_shared_object(self, o):
        """Add shared object to list of objects to process"""
        self._shared_objects[o._name] = o
        return True

    def create_shared_object(self, name, opt, iv=None):
        """Create a new shared object"""

# Auto-detect object type
        if 'type' not in opt:
            for key, (oclass, inc, autotype) in self._shared_types.iteritems():
                if autotype is not None and isinstance(iv, autotype):
                    opt['type'] = key
                    opt['incremental'] = inc

        _logger.debug("Shared object %s of type %s requested", name, opt['type'])

        if opt['type'] not in self._shared_types:
            _logger.error("Shared object type %s unknown", opt['type'])
            return None

        (oclass, inc, autotype) = self._shared_types[opt['type']]
        obj = oclass(name, opt=opt, helper=self)

        if obj is None:
            return None

        self.add_shared_object(obj)

# Tell other instances about dynamically created objects
        if 'dynamic' in opt and opt['dynamic'] is True:
            self.SendNewObject(name, opt)

        if iv is not None:
            obj.set_value(iv)

        return obj

##########################################
# Sending and receiving objects
##########################################

    def is_remote_sender(self, sender):
        if self._own_bus_name is None:
            self._own_bus_name = self._tube.get_unique_name()
            _logger.info('Acquired unique name: %s', self._own_bus_name)

        if sender == self._own_bus_name:
            return False
        else:
            return True

    @dbus.service.signal(dbus_interface=IFACE, signature='subs')
    def SendObject(self, name, versionid, incremental, objstr):
        """Signal proxy to send an object"""
        _logger.info('Sending object %s v%d on bus (inc=%d)', name, versionid, incremental)

    def _receive_object(self, name, versionid, incremental, objstr, sender=None):
        """Response to SendObject() signal; updates the state of the shared object.
        If an update is requested for an object that does not exist yet we
        must be in a confused state, so ask for a sync.
        """
        if not self.is_remote_sender(sender):
            return True

        _logger.info('receive_object(): Received object: %s v%d (inc=%d) from %s, %s', \
            name, versionid, incremental, sender, objstr)
        if name in self._shared_objects:
            self._shared_objects[name].process_update(versionid, incremental, objstr, sender)
        else:
            _logger.error('receive_object(): Unknown object %s; requesting sync', name)
            self.synchronize()

    @dbus.service.signal(dbus_interface=IFACE, signature='sa{sv}')
    def SendNewObject(self, name, opts):
        """Signal proxy to create a new object"""
        _logger.info('Sending new object %s on bus', name)

    def _receive_new_object(self, name, opts, sender=None):
        """Response to SendNewObject() signal; creates a new shared object.
        If creation is requested for an object that exists already, leave
        it untouched.
        If the 'objectcreated' option is set call that function
        """
        if not self.is_remote_sender(sender):
            return True

        _logger.info('receive_new_object(): Received new object: %s from %s', name, sender)
        if name in self._shared_objects:
            _logger.error('receive_new_object(): object already exists; leaving untouched')
        else:
            obj = self.create_shared_object(name, opts)
            if 'objectcreated' in self._options:
                self._options['objectcreated'](obj)

    def synchronize(self):
        self._in_sync = False
        #self.RequestSync()
        #gobject.timeout_add(1000, self._verify_sync)

    def _verify_sync(self):
        if not self._in_sync:
            self.RequestSync()
            return True
        return False

    @dbus.service.signal(dbus_interface=IFACE, signature='')
    def RequestSync(self):
        """Signal proxy to request synchronization"""
        _logger.info('Sending synchronization request...')

    def _receive_sync_request(self, sender=None):
        """Called when somebody sends a SyncRequest."""
        if not self.is_remote_sender(sender):
            return True

        _logger.info('Received sync request from %s, sending objects', sender)
        for name, obj in self._shared_objects.iteritems():
# Sending options is only necessary for dynamically created objects; implement this later
#               sendopt = copy.deepcopy(obj.Options)
#               if 'changed' in sendopt:
#                   del sendopt['changed']
            self._tube.get_object(sender, self._object_path).ReceiveSyncObject( \
                name, {'empty': True}, obj._version_id, obj.encode(obj._value))

    @dbus.service.method(dbus_interface=IFACE, in_signature='sa{sv}us', out_signature='', sender_keyword='sender')
    def ReceiveSyncObject(self, name, opt, versionid, objstr, sender=None):
        """Function to receive full synchronisation. Used when joining an
        existing activity or when in a confused state.
        If an object does not exists yet it is created; the version is forced
        """
        _logger.debug('Receiving sync object %s from %s', name, sender)

        if not self.is_remote_sender(sender):
            return True

        self._in_sync = True

        if name not in self._shared_objects:
            obj = self.create_shared_object(name, opts)

        self._shared_objects[name].process_update(versionid, False, objstr, sender, force=True)

    @dbus.service.signal(dbus_interface=IFACE, signature='sd')
    def LockObject(self, name, when):
        """Signal proxy to request object lock"""
        _logger.debug('Sending lock signal for %s', name)

    def _receive_lock_object(self, name, when, sender=None):
        """Called when somebody tries to lock an object"""
        if name in self._shared_objects:
            self._shared_objects[name].receive_lock(sender, when)
        else:
            _logger.error('Received lock signal for non-existing object %s', name)

    @dbus.service.signal(dbus_interface=IFACE, signature='s')
    def UnlockObject(self, name):
        """Signal proxy to signal release of an object lock"""
        _logger.debug('Sending unlock signal for %s', name)

    def _receive_unlock_object(self, name, sender=None):
        """Called when somebody signals unlocking of an object"""
        if name in self._shared_objects:
            self._shared_objects[name].receive_unlock()
        else:
            _logger.error('Received unlock signal for non-existing object %s', name)

##########################################
# Simple message sending between apps
##########################################

    @dbus.service.signal(dbus_interface=IFACE, signature='sv')
    def SendMessage(self, msg, val):
        """Signal proxy to send simple messages"""

    def send_message(self, msg, val, to=None):
        _logger.debug('send_message(msg=%s, val=%r)', msg, val)
        try:
            if to is None:
                self.SendMessage(msg, val)
        except Exception, inst:
            _logger.error('send_message: %s', inst)

    def _receive_message(self, msg, val, sender=None):
        if not self.is_remote_sender(sender):
            return True

        _logger.info('_receive_message(%s, %r)', msg, val)

        if 'receive_message' in self._options:
            self._options['receive_message'](msg, val)

##########################################
# Functions for turn-based activities
##########################################

class TurnBased:
    def __init__(self, helper):
        self._helper = helper
        self._helper.create_shared_object('_turntoken', {
            'locked': self.my_turn,
            'unlocked': self.turn_changed,
            'locklost': self.turn_problem,
        })
        self._playing = False
        self._playing_buddies = []
        self._watching_buddies = []

    def set_number_of_players(self, minp, maxp=None):
        self._started = False
        self._min_players = minp
        self._max_players = maxp
        self.check_ready()

    def set_turn_callbacks(self, d):
        """Set callbacks for turn-based functions:
        ready: called when enough players to start
        myturn: called when it's this instance's turn
        """
        self._turn_callbacks = d

    def check_ready(self):
        if len(self._helper.get_buddy_list()) >= self._min_players:
            self._turn_callbacks['ready']()
            self._started = True

    def start(self):
        self._playing = True
        self._playing_buddies = self._helper.get_buddy_list()
        self._current_player = -1
        self.turn_changed(self, None)

    def who_is_next_player(self):
        self._current_player = (self._current_player + 1) % len(self._playing_buddies)
        return self._playing_buddies[self._current_player]

    def turn_changed(self, sender):
        if sender is not None and 'processturn' in self._turn_callbacks:
            self._turn_callbacks['processturn'](sender, self.get_turn_data())
        if self.who_is_next_player() == self._helper._own_dbus_name:
            self._helper.get_object('_turntoken').lock()
            if 'myturn' in self._turn_callbacks:
                self._turn_callbacks['myturn']()

    def release_turn(self, turndata):
        self._helper['_turndata'] = turndata
        self._helper.get_object('_turntoken').unlock()

        return

    def turn_problem(self, sender):
        return



##########################################
# Functions for Tube/Presence support
##########################################

class TubePresenceSupport:

    def __init__(self, parent):
        self._parent = parent
        self._activity = parent._activity

        self._request_sync = False

        return

    def connect_to_ps(self):
        """Connect to the presence service"""

        self._activity.connect('shared', self._shared_cb)

        self._ps = presenceservice.get_instance()
        if self._activity._shared_activity:
            # We are trying to join, call this if it worked
            self._activity.connect('joined', self._joined_cb)
            if self._activity.get_shared():
                self.joined_cb()

    def setup_shared_activity(self):
        """Setup things to talk to other SharingHelpers """

        if self._activity._shared_activity is None:
            _logger.error('setup_shared_activity(): no _shared_activity yey!')
            return False

        self._service_name = 'org.laptop.SharingHelper'

# Get basic telepathy stuff
        name, path = self._ps.get_preferred_connection()
        _logger.info('Preferred connection: name %s, path %s', name, path)
        self._tp_conn_name = name
        self._tp_conn_path = path
        self._tp_conn = telepathy.client.Connection(name, path)

# Setup tubes channel
        self._tube_service_name = '%s.Tube' % (self._service_name)
        bus_name, conn_path, channel_paths = self._activity._shared_activity.get_channels()
        room = None
        self._text_channel = None
        self._tube_channel = None
        for channel_path in channel_paths:
            channel = telepathy.client.Channel(bus_name, channel_path)
            htype, handle = channel.GetHandle()
            if htype == telepathy.HANDLE_TYPE_ROOM:
                _logger.info('Found room with handle %d', handle)
                room = handle
                ctype = channel.GetChannelType()
                if ctype == telepathy.CHANNEL_TYPE_TUBES:
                    _logger.info('Found Tubes channel %s', channel_path)
                    self._tube_channel = channel
                elif ctype == telepathy.CHANNEL_TYPE_TEXT:
                    _logger.info('Found Text channel %s', channel_path)
                    self._text_channel = channel

        if room is None:
            _logger.error('Didn\'t find room')

        if self._tube_channel is None:
            self._tube_channel = self._tp_conn.request_channel( \
                telepathy.CHANNEL_TYPE_TUBES, \
                telepathy.HANDLE_TYPE_ROOM, room, True)

        self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube', self._new_tube_cb)

    def _shared_cb(self, activity):
        """Callback for when our activity is shared"""
        _logger.info('Activity shared')
        self.setup_shared_activity()
        self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube( \
            self._tube_service_name, {})

    def _joined_cb(self, activity):
        """Callback for when we join an existing activity"""
        _logger.info('Joined existing activity')
        self._request_sync = True
        self.setup_shared_activity()

        _logger.info('Getting tubes list...')
        self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].ListTubes( \
            reply_handler=self._list_tubes_reply_cb,
            error_handler=self._list_tubes_error_cb)

    def _new_tube_cb(self, id, initiator, type, service, params, state):
        """Callback for when a new tube is created"""
        _logger.info('new_tube_cb(): id=%d, init=%d, type=%d, svc=%s, state=%d, _request_sync=%r', id, initiator, type, service, state, self._request_sync)
        _logger.info('Expected: type=%d, svc=%s, state=%d', telepathy.TUBE_TYPE_DBUS, self._tube_service_name, telepathy.TUBE_STATE_LOCAL_PENDING)
        if type == telepathy.TUBE_TYPE_DBUS and service == self._tube_service_name:
            if state == telepathy.TUBE_STATE_LOCAL_PENDING:
                self._tube_channel[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id)
            self._tube = TubeConnection(self._tp_conn, \
                self._tube_channel[telepathy.CHANNEL_TYPE_TUBES], id, \
                group_iface=self._text_channel[telepathy.CHANNEL_INTERFACE_GROUP])

        if self._tube is None:
            _logger.error('Don\'t have a tube channel, not connecting')
            return False

        self._parent._tube_created_cb(self._tube, self._request_sync)

    def _list_tubes_reply_cb(self, tubes):
        """Callback for when requesting an existing tube"""
        _logger.debug('_list_tubes_reply_cb(): %r', tubes)
        for tube_info in tubes:
            _logger.debug('tube_info: %r', tube_info)
            self._new_tube_cb(*tube_info)

    def _list_tubes_error_cb(self, e):
        _logger.error('list_tubes() failed: %s', e)

    def _tube_participant_change_cb(self, added, removed):
        _logger.info('Adding participants: %r', added)
        _logger.info('Removing participants: %r', removed)