Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/activity.py
diff options
context:
space:
mode:
authorBenjamin Schwartz <bens@alum.mit.edu>2007-12-06 03:48:22 (GMT)
committer Benjamin Schwartz <bens@alum.mit.edu>2007-12-06 03:48:22 (GMT)
commit38bec2e32f9f1b2ff21a10e0a52e063eb3e689d4 (patch)
treecf56154f99ad993e3b42eb514c804fab3cfb6b19 /activity.py
parentf455d63dcbff13e20a3ac5e3f0641554120e65e4 (diff)
Move to stream tubes, #5204 be damned
Diffstat (limited to 'activity.py')
-rw-r--r--activity.py282
1 files changed, 76 insertions, 206 deletions
diff --git a/activity.py b/activity.py
index 9f068d1..efccd11 100644
--- a/activity.py
+++ b/activity.py
@@ -44,6 +44,8 @@ import threading
import thread
import socket
import base64
+import os
+import dbus
#import socket_test as arange
import arange
@@ -53,6 +55,16 @@ SERVICE = "org.laptop.AcousticMeasure"
IFACE = SERVICE
PATH = "/org/laptop/AcousticMeasure"
+def gobject_idle_do(func, *args):
+ ev = threading.Event()
+ retval = []
+ def helper(r, f, a, e):
+ r.append(f(*a))
+ e.set()
+ return False
+ gobject.idle_add(helper, retval, func, args, ev)
+ ev.wait()
+ return retval[0]
class AcousticMeasureActivity(Activity):
"""AcousticMeasure Activity as specified in activity.info"""
@@ -98,7 +110,10 @@ class AcousticMeasureActivity(Activity):
self.button = gtk.ToggleButton(label=self._button_dict['waiting'])
self.button.connect('clicked',self._button_clicked)
self.button.set_sensitive(False)
-
+ check = gtk.Image()
+ check.set_from_file('check.svg')
+ self.button.set_image(check)
+
self.message = gtk.Label(self._message_dict['unshared'])
self.message.set_selectable(True)
self.message.set_single_line_mode(True)
@@ -109,7 +124,7 @@ class AcousticMeasureActivity(Activity):
self.value = gtk.Label()
self.value.set_selectable(True)
- self._update_distance(0)
+ thread.start_new_thread(self._update_distance, (0,))
valuefont = pango.FontDescription()
valuefont.set_family("monospace")
@@ -135,7 +150,10 @@ class AcousticMeasureActivity(Activity):
self.set_canvas(self.main_panel)
self.show_all()
- self.hellotube = None # Shared session
+ self.server_socket = None
+ self.main_socket = None
+ self.main_socket_addr = None
+ self.main_tube_id = None
self.initiating = False
# get the Presence Service
@@ -147,6 +165,8 @@ class AcousticMeasureActivity(Activity):
self.connect('shared', self._shared_cb)
self.connect('joined', self._joined_cb)
+
+ self.connect('key-press-event', self._keypress_cb)
def _button_clicked(self, button):
if button.get_active():
@@ -162,17 +182,14 @@ class AcousticMeasureActivity(Activity):
while True:
self._logger.debug("helper_thread: button_event.isSet(): " + str(self._button_event.isSet()))
self._button_event.wait()
-# while not self._button_event.isSet():
-# self._logger.debug("helper_thread still waiting")
-# time.sleep(1)
self._logger.debug("initiating measurement")
- dt = arange.measure_dt_seq(self.hellotube, self.initiating, self._change_message)
+ dt = arange.measure_dt_seq(self.main_socket, self.initiating, self._change_message)
x = dt * self._t_h_bar.get_speed() - arange.OLPC_OFFSET
self._update_distance(x)
def _update_distance(self, x):
mes = locale.format("%.2f", x)
- self.value.set_text(mes)
+ gobject_idle_do(self.value.set_text, mes)
def read_file(self, file_path):
f = open(file_path, 'r')
@@ -198,7 +215,7 @@ class AcousticMeasureActivity(Activity):
def _change_message(self,signal):
self._logger.debug("_change_message got signal: " + signal)
- self.message.set_text(self._message_dict[signal])
+ gobject_idle_do(self.message.set_text, self._message_dict[signal])
def _shared_cb(self, activity):
self._logger.debug('My activity was shared')
@@ -206,8 +223,24 @@ class AcousticMeasureActivity(Activity):
self._sharing_setup()
self._logger.debug('This is my activity: making a tube...')
- id = self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube(
- SERVICE, {})
+
+ f = os.tempnam()
+ self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.server_socket.bind(f)
+
+ id = self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].OfferStreamTube(
+ SERVICE, {}, telepathy.SOCKET_ADDRESS_TYPE_UNIX, dbus.ByteArray(f),
+ telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, "")
+
+ thread.start_new_thread(self.watch_for_join, ())
+
+ def watch_for_join(self):
+ self.server_socket.listen(1)
+ (self.main_socket, self.main_socket_addr) = self.server_socket.accept()
+ self.main_socket.setblocking(1)
+ #self.server_socket.close() #don't know if this works with Telepathy's pseudosockets
+
+ self._make_ready()
def _sharing_setup(self):
if self._shared_activity is None:
@@ -220,6 +253,8 @@ class AcousticMeasureActivity(Activity):
self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal(
'NewTube', self._new_tube_cb)
+ self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal(
+ 'TubeStateChanged', self._tube_state_cb)
self._shared_activity.connect('buddy-joined', self._buddy_joined_cb)
self._shared_activity.connect('buddy-left', self._buddy_left_cb)
@@ -259,22 +294,33 @@ class AcousticMeasureActivity(Activity):
else:
self._logger.debug("There are already two people, not joining")
self._shared_activity.leave()
- self._change_message('full')
+ thread.start_new_thread(self._change_message, ('full',))
def _new_tube_cb(self, id, initiator, type, service, params, state):
self._logger.debug('New tube: ID=%d initator=%d type=%d service=%s '
'params=%r state=%d', id, initiator, type, service,
params, state)
- if (type == telepathy.TUBE_TYPE_DBUS and
- service == SERVICE):
+ if (type == telepathy.TUBE_TYPE_STREAM and
+ service == SERVICE and self.main_tube_id is None):
if state == telepathy.TUBE_STATE_LOCAL_PENDING:
- self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id)
- tube_conn = TubeConnection(self.conn,
- self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES],
- id, group_iface=self.text_chan[telepathy.CHANNEL_INTERFACE_GROUP])
- self.hellotube = HelloTube(tube_conn, self.initiating,
- self._get_buddy)
- self.button.set_sensitive(True)
+ self.main_tube_id = id
+ self.main_socket_addr = str(
+ self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].AcceptStreamTube(id,
+ telepathy.SOCKET_ADDRESS_TYPE_UNIX,
+ telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, "", byte_arrays=True))
+
+ def _tube_state_cb(self, tube_id, tube_state):
+ if (self.main_socket is None) and \
+ (tube_state == telepathy.TUBE_STATE_OPEN) and \
+ (tube_id == self.main_tube_id):
+
+ self.main_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ self.main_socket.setblocking(1)
+ self.main_socket.connect(self.main_socket_addr)
+ thread.start_new_thread(self._make_ready, ())
+
+ def _make_ready(self):
+ gobject_idle_do(self.button.set_sensitive, True)
self._change_message('ready')
def _buddy_joined_cb (self, activity, buddy):
@@ -302,189 +348,13 @@ class AcousticMeasureActivity(Activity):
assert handle != 0
return self.pservice.get_buddy_by_telepathy_handle(
self.conn.service_name, self.conn.object_path, handle)
-
-class HelloTube(ExportedGObject):
- """The bit that talks over the TUBES!!!"""
-
- def __init__(self, tube, is_initiator, get_buddy):
- super(HelloTube, self).__init__(tube, PATH)
- self._logger = logging.getLogger('acousticmeasure-activity.HelloTube')
- self.tube = tube
- self.is_initiator = is_initiator
- self.entered = False # Have we set up the tube?
- self.helloworld = False # Trivial "game state"
- self._get_buddy = get_buddy # Converts handle to Buddy object
- self.tube.watch_participants(self.participant_change_cb)
-
- def participant_change_cb(self, added, removed):
- self._logger.debug('Adding participants: %r' % added)
- self._logger.debug('Removing participants: %r' % type(removed))
- for handle, bus_name in added:
- buddy = self._get_buddy(handle)
- if buddy is not None:
- self._logger.debug('Buddy %s was added' % buddy.props.nick)
- for handle in removed:
- self._logger.debug('Tube: Handle %u was removed', handle)
- if not self.entered:
- if self.is_initiator:
- self._logger.debug("I'm initiating the tube, will "
- "watch for hellos.")
- self.tube.add_signal_receiver(self.hello_cb, 'Hello', IFACE,
- path=PATH, sender_keyword='sender')
- else:
- self._logger.debug('Hello, everyone! What did I miss?')
- self.Hello()
- self.entered = True
-
- @signal(dbus_interface=IFACE, signature='')
- def Hello(self):
- """Say Hello to whoever else is in the tube."""
- self._logger.debug('I said Hello.')
-
- @method(dbus_interface=IFACE, in_signature='s', out_signature='', sender_keyword='sender')
- def World(self, game_state, sender=None):
- """To be called on the incoming XO after they Hello."""
- if not self.helloworld:
- self._logger.debug('Somebody said World.')
- self.helloworld = game_state
- # now I can World others
-# self.add_hello_handler()
- self.tube.add_signal_receiver(self.hello_cb, 'Hello', IFACE,
- path=PATH, sender_keyword='sender')
- self._remote_socket = self.tube.get_object(sender, PATH)
- self._remote_socket_waiter.set()
- else:
- self._logger.debug("I've already been welcomed, doing nothing")
-
- def hello_cb(self, sender=None):
- """Somebody Helloed me. World them."""
- if sender == self.tube.get_unique_name():
- # sender is my bus name, so ignore my own signal
- return
- self._logger.debug('Newcomer %s has joined', sender)
- self._logger.debug('Welcoming newcomer and sending them the game state')
- game_state = str(time.time()) # Something to send for demo
- self._remote_socket = self.tube.get_object(sender, PATH)
- self._remote_socket_waiter.set()
- self._remote_socket.World(game_state, dbus_interface=IFACE)
-
- def _noop(self, *args):
- pass
-
- #### Begin socket section
- _buffer = ''
- _buff_waiter = threading.Event()
-
- _timeout = None
-
- family = socket.AF_UNIX
- type = socket.SOCK_STREAM
- proto = -1
-
- _recv_allowed = True
- _send_allowed = True
- _remote_socket = None
- _remote_socket_waiter = threading.Event()
-
- @method(dbus_interface = IFACE, in_signature = 'ay', out_signature = '')
- def _handle_incoming(self, message):
- self._logger.debug("_handle_incoming: " + message)
- msg = base64.b64decode(message)
- if self._recv_allowed:
- self._buffer += msg
- if len(self._buffer) > 0:
- self._logger.debug("_handle_incoming will now set the buff_waiter")
- self._buff_waiter.set()
- self._logger.debug("_handle_incoming: buff_waiter.isSet() " + str(self._buff_waiter.isSet()))
-
- def recv(self, bufsize):
- self._logger.debug("recv")
- self._logger.debug("buff_waiter.isSet: " + str(self._buff_waiter.isSet()))
- self._logger.debug("buffer: " + base64.b64encode(self._buffer))
- self._buff_waiter.wait(self._timeout)
- if len(self._buffer) == 0:
- raise 'error: buffer is empty'
- retval = self._buffer[:bufsize]
- self._buffer = self._buffer[bufsize:]
- #self._logger.debug("afterwards, buffer: " + self._buffer)
- if len(self._buffer) == 0:
- self._logger.debug("recv clearing buff_waiter")
- self._buff_waiter.clear()
- self._logger.debug("recv: buff_waiter.isSet()" + str(self._buff_waiter.isSet()))
- #self._logger.debug("received: " + retval)
- return retval
-
- def recvfrom(self, bufsize):
- return (self.recv(bufsize), self.getpeername())
-
- def send(self, string, flags=0):
- self.sendall(string)
-
- def sendall(self, string, flags=0):
- if self._send_allowed:
- self._logger.debug("sendall")
- self._remote_socket_waiter.wait(self._timeout)
- q = base64.b64encode(string)
- self._logger.debug("sendall: " + q)
- w = threading.Event()
- self._remote_socket._handle_incoming(q, reply_handler=w.set, error_handler=self._logger.debug)
- self._logger.debug("sendall: waiting for confirmation")
- w.wait()
- self._logger.debug("sendall; sent")
- return len(string)
- else:
- self._logger.debug("sendall not allowed")
- return 0
-
- def setblocking(self, flag):
- if flag == 0:
- self._timeout = 0
- else:
- self._timeout = None
- self.button.set_sensitive(False)
-
- def settimeout(self, value):
- self._timeout = value
-
- def gettimeout(self, ):
- return self._timeout
-
- def close(self):
- self.tube.close()
-
- def _unimplemented(self):
- raise "error: unimplemented"
-
- def fileno(self):
- return self.tube.get_unix_fd()
-
- def getpeername(self):
- return self.tube.get_peer_unix_process_id()
-
- def getsockname(self):
- return self.tube.get_unique_name()
-
- def getsockopt(self):
- return 0
-
- def setsockopt(self, level,optname,value):
- pass
-
- def shutdown(self, how):
- if how == socket.SHUT_RD or how == socket.SHUT_RDWR:
- self._recv_allowed = False
- if how == socket.SHUT_WR or how == socket.SHUT_RDWR:
- self._send_allowed = False
- if (not self._recv_allowed) and (not self._send_allowed):
- self.close()
-
-
- accept = _unimplemented
- bind = _unimplemented
- connect = _unimplemented
- connect_ex = _unimplemented
- listen = _unimplemented
- makefile = _unimplemented
- sendto = _unimplemented
-
+ # KP_End == check gamekey = 65436
+ # KP_Page_Down == X gamekey = 65435
+ # KP_Home == box gamekey = 65429
+ # KP_Page_Up == O gamekey = 65434
+ def _keypress_cb(self, widget, event):
+ self._logger.debug("key press: " + gtk.gdk.keyval_name(event.keyval)+ " " + str(event.keyval))
+ if event.keyval == 65436:
+ self.button.clicked()
+ return False