diff options
author | Benjamin Schwartz <bens@alum.mit.edu> | 2007-12-06 03:48:22 (GMT) |
---|---|---|
committer | Benjamin Schwartz <bens@alum.mit.edu> | 2007-12-06 03:48:22 (GMT) |
commit | 38bec2e32f9f1b2ff21a10e0a52e063eb3e689d4 (patch) | |
tree | cf56154f99ad993e3b42eb514c804fab3cfb6b19 /activity.py | |
parent | f455d63dcbff13e20a3ac5e3f0641554120e65e4 (diff) |
Move to stream tubes, #5204 be damned
Diffstat (limited to 'activity.py')
-rw-r--r-- | activity.py | 282 |
1 files changed, 76 insertions, 206 deletions
diff --git a/activity.py b/activity.py index 9f068d1..efccd11 100644 --- a/activity.py +++ b/activity.py @@ -44,6 +44,8 @@ import threading import thread import socket import base64 +import os +import dbus #import socket_test as arange import arange @@ -53,6 +55,16 @@ SERVICE = "org.laptop.AcousticMeasure" IFACE = SERVICE PATH = "/org/laptop/AcousticMeasure" +def gobject_idle_do(func, *args): + ev = threading.Event() + retval = [] + def helper(r, f, a, e): + r.append(f(*a)) + e.set() + return False + gobject.idle_add(helper, retval, func, args, ev) + ev.wait() + return retval[0] class AcousticMeasureActivity(Activity): """AcousticMeasure Activity as specified in activity.info""" @@ -98,7 +110,10 @@ class AcousticMeasureActivity(Activity): self.button = gtk.ToggleButton(label=self._button_dict['waiting']) self.button.connect('clicked',self._button_clicked) self.button.set_sensitive(False) - + check = gtk.Image() + check.set_from_file('check.svg') + self.button.set_image(check) + self.message = gtk.Label(self._message_dict['unshared']) self.message.set_selectable(True) self.message.set_single_line_mode(True) @@ -109,7 +124,7 @@ class AcousticMeasureActivity(Activity): self.value = gtk.Label() self.value.set_selectable(True) - self._update_distance(0) + thread.start_new_thread(self._update_distance, (0,)) valuefont = pango.FontDescription() valuefont.set_family("monospace") @@ -135,7 +150,10 @@ class AcousticMeasureActivity(Activity): self.set_canvas(self.main_panel) self.show_all() - self.hellotube = None # Shared session + self.server_socket = None + self.main_socket = None + self.main_socket_addr = None + self.main_tube_id = None self.initiating = False # get the Presence Service @@ -147,6 +165,8 @@ class AcousticMeasureActivity(Activity): self.connect('shared', self._shared_cb) self.connect('joined', self._joined_cb) + + self.connect('key-press-event', self._keypress_cb) def _button_clicked(self, button): if button.get_active(): @@ -162,17 +182,14 @@ class AcousticMeasureActivity(Activity): while True: self._logger.debug("helper_thread: button_event.isSet(): " + str(self._button_event.isSet())) self._button_event.wait() -# while not self._button_event.isSet(): -# self._logger.debug("helper_thread still waiting") -# time.sleep(1) self._logger.debug("initiating measurement") - dt = arange.measure_dt_seq(self.hellotube, self.initiating, self._change_message) + dt = arange.measure_dt_seq(self.main_socket, self.initiating, self._change_message) x = dt * self._t_h_bar.get_speed() - arange.OLPC_OFFSET self._update_distance(x) def _update_distance(self, x): mes = locale.format("%.2f", x) - self.value.set_text(mes) + gobject_idle_do(self.value.set_text, mes) def read_file(self, file_path): f = open(file_path, 'r') @@ -198,7 +215,7 @@ class AcousticMeasureActivity(Activity): def _change_message(self,signal): self._logger.debug("_change_message got signal: " + signal) - self.message.set_text(self._message_dict[signal]) + gobject_idle_do(self.message.set_text, self._message_dict[signal]) def _shared_cb(self, activity): self._logger.debug('My activity was shared') @@ -206,8 +223,24 @@ class AcousticMeasureActivity(Activity): self._sharing_setup() self._logger.debug('This is my activity: making a tube...') - id = self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube( - SERVICE, {}) + + f = os.tempnam() + self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.server_socket.bind(f) + + id = self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].OfferStreamTube( + SERVICE, {}, telepathy.SOCKET_ADDRESS_TYPE_UNIX, dbus.ByteArray(f), + telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, "") + + thread.start_new_thread(self.watch_for_join, ()) + + def watch_for_join(self): + self.server_socket.listen(1) + (self.main_socket, self.main_socket_addr) = self.server_socket.accept() + self.main_socket.setblocking(1) + #self.server_socket.close() #don't know if this works with Telepathy's pseudosockets + + self._make_ready() def _sharing_setup(self): if self._shared_activity is None: @@ -220,6 +253,8 @@ class AcousticMeasureActivity(Activity): self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal( 'NewTube', self._new_tube_cb) + self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal( + 'TubeStateChanged', self._tube_state_cb) self._shared_activity.connect('buddy-joined', self._buddy_joined_cb) self._shared_activity.connect('buddy-left', self._buddy_left_cb) @@ -259,22 +294,33 @@ class AcousticMeasureActivity(Activity): else: self._logger.debug("There are already two people, not joining") self._shared_activity.leave() - self._change_message('full') + thread.start_new_thread(self._change_message, ('full',)) def _new_tube_cb(self, id, initiator, type, service, params, state): self._logger.debug('New tube: ID=%d initator=%d type=%d service=%s ' 'params=%r state=%d', id, initiator, type, service, params, state) - if (type == telepathy.TUBE_TYPE_DBUS and - service == SERVICE): + if (type == telepathy.TUBE_TYPE_STREAM and + service == SERVICE and self.main_tube_id is None): if state == telepathy.TUBE_STATE_LOCAL_PENDING: - self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id) - tube_conn = TubeConnection(self.conn, - self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES], - id, group_iface=self.text_chan[telepathy.CHANNEL_INTERFACE_GROUP]) - self.hellotube = HelloTube(tube_conn, self.initiating, - self._get_buddy) - self.button.set_sensitive(True) + self.main_tube_id = id + self.main_socket_addr = str( + self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].AcceptStreamTube(id, + telepathy.SOCKET_ADDRESS_TYPE_UNIX, + telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, "", byte_arrays=True)) + + def _tube_state_cb(self, tube_id, tube_state): + if (self.main_socket is None) and \ + (tube_state == telepathy.TUBE_STATE_OPEN) and \ + (tube_id == self.main_tube_id): + + self.main_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.main_socket.setblocking(1) + self.main_socket.connect(self.main_socket_addr) + thread.start_new_thread(self._make_ready, ()) + + def _make_ready(self): + gobject_idle_do(self.button.set_sensitive, True) self._change_message('ready') def _buddy_joined_cb (self, activity, buddy): @@ -302,189 +348,13 @@ class AcousticMeasureActivity(Activity): assert handle != 0 return self.pservice.get_buddy_by_telepathy_handle( self.conn.service_name, self.conn.object_path, handle) - -class HelloTube(ExportedGObject): - """The bit that talks over the TUBES!!!""" - - def __init__(self, tube, is_initiator, get_buddy): - super(HelloTube, self).__init__(tube, PATH) - self._logger = logging.getLogger('acousticmeasure-activity.HelloTube') - self.tube = tube - self.is_initiator = is_initiator - self.entered = False # Have we set up the tube? - self.helloworld = False # Trivial "game state" - self._get_buddy = get_buddy # Converts handle to Buddy object - self.tube.watch_participants(self.participant_change_cb) - - def participant_change_cb(self, added, removed): - self._logger.debug('Adding participants: %r' % added) - self._logger.debug('Removing participants: %r' % type(removed)) - for handle, bus_name in added: - buddy = self._get_buddy(handle) - if buddy is not None: - self._logger.debug('Buddy %s was added' % buddy.props.nick) - for handle in removed: - self._logger.debug('Tube: Handle %u was removed', handle) - if not self.entered: - if self.is_initiator: - self._logger.debug("I'm initiating the tube, will " - "watch for hellos.") - self.tube.add_signal_receiver(self.hello_cb, 'Hello', IFACE, - path=PATH, sender_keyword='sender') - else: - self._logger.debug('Hello, everyone! What did I miss?') - self.Hello() - self.entered = True - - @signal(dbus_interface=IFACE, signature='') - def Hello(self): - """Say Hello to whoever else is in the tube.""" - self._logger.debug('I said Hello.') - - @method(dbus_interface=IFACE, in_signature='s', out_signature='', sender_keyword='sender') - def World(self, game_state, sender=None): - """To be called on the incoming XO after they Hello.""" - if not self.helloworld: - self._logger.debug('Somebody said World.') - self.helloworld = game_state - # now I can World others -# self.add_hello_handler() - self.tube.add_signal_receiver(self.hello_cb, 'Hello', IFACE, - path=PATH, sender_keyword='sender') - self._remote_socket = self.tube.get_object(sender, PATH) - self._remote_socket_waiter.set() - else: - self._logger.debug("I've already been welcomed, doing nothing") - - def hello_cb(self, sender=None): - """Somebody Helloed me. World them.""" - if sender == self.tube.get_unique_name(): - # sender is my bus name, so ignore my own signal - return - self._logger.debug('Newcomer %s has joined', sender) - self._logger.debug('Welcoming newcomer and sending them the game state') - game_state = str(time.time()) # Something to send for demo - self._remote_socket = self.tube.get_object(sender, PATH) - self._remote_socket_waiter.set() - self._remote_socket.World(game_state, dbus_interface=IFACE) - - def _noop(self, *args): - pass - - #### Begin socket section - _buffer = '' - _buff_waiter = threading.Event() - - _timeout = None - - family = socket.AF_UNIX - type = socket.SOCK_STREAM - proto = -1 - - _recv_allowed = True - _send_allowed = True - _remote_socket = None - _remote_socket_waiter = threading.Event() - - @method(dbus_interface = IFACE, in_signature = 'ay', out_signature = '') - def _handle_incoming(self, message): - self._logger.debug("_handle_incoming: " + message) - msg = base64.b64decode(message) - if self._recv_allowed: - self._buffer += msg - if len(self._buffer) > 0: - self._logger.debug("_handle_incoming will now set the buff_waiter") - self._buff_waiter.set() - self._logger.debug("_handle_incoming: buff_waiter.isSet() " + str(self._buff_waiter.isSet())) - - def recv(self, bufsize): - self._logger.debug("recv") - self._logger.debug("buff_waiter.isSet: " + str(self._buff_waiter.isSet())) - self._logger.debug("buffer: " + base64.b64encode(self._buffer)) - self._buff_waiter.wait(self._timeout) - if len(self._buffer) == 0: - raise 'error: buffer is empty' - retval = self._buffer[:bufsize] - self._buffer = self._buffer[bufsize:] - #self._logger.debug("afterwards, buffer: " + self._buffer) - if len(self._buffer) == 0: - self._logger.debug("recv clearing buff_waiter") - self._buff_waiter.clear() - self._logger.debug("recv: buff_waiter.isSet()" + str(self._buff_waiter.isSet())) - #self._logger.debug("received: " + retval) - return retval - - def recvfrom(self, bufsize): - return (self.recv(bufsize), self.getpeername()) - - def send(self, string, flags=0): - self.sendall(string) - - def sendall(self, string, flags=0): - if self._send_allowed: - self._logger.debug("sendall") - self._remote_socket_waiter.wait(self._timeout) - q = base64.b64encode(string) - self._logger.debug("sendall: " + q) - w = threading.Event() - self._remote_socket._handle_incoming(q, reply_handler=w.set, error_handler=self._logger.debug) - self._logger.debug("sendall: waiting for confirmation") - w.wait() - self._logger.debug("sendall; sent") - return len(string) - else: - self._logger.debug("sendall not allowed") - return 0 - - def setblocking(self, flag): - if flag == 0: - self._timeout = 0 - else: - self._timeout = None - self.button.set_sensitive(False) - - def settimeout(self, value): - self._timeout = value - - def gettimeout(self, ): - return self._timeout - - def close(self): - self.tube.close() - - def _unimplemented(self): - raise "error: unimplemented" - - def fileno(self): - return self.tube.get_unix_fd() - - def getpeername(self): - return self.tube.get_peer_unix_process_id() - - def getsockname(self): - return self.tube.get_unique_name() - - def getsockopt(self): - return 0 - - def setsockopt(self, level,optname,value): - pass - - def shutdown(self, how): - if how == socket.SHUT_RD or how == socket.SHUT_RDWR: - self._recv_allowed = False - if how == socket.SHUT_WR or how == socket.SHUT_RDWR: - self._send_allowed = False - if (not self._recv_allowed) and (not self._send_allowed): - self.close() - - - accept = _unimplemented - bind = _unimplemented - connect = _unimplemented - connect_ex = _unimplemented - listen = _unimplemented - makefile = _unimplemented - sendto = _unimplemented - + # KP_End == check gamekey = 65436 + # KP_Page_Down == X gamekey = 65435 + # KP_Home == box gamekey = 65429 + # KP_Page_Up == O gamekey = 65434 + def _keypress_cb(self, widget, event): + self._logger.debug("key press: " + gtk.gdk.keyval_name(event.keyval)+ " " + str(event.keyval)) + if event.keyval == 65436: + self.button.clicked() + return False |