diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/Buddy.py | 34 | ||||
-rw-r--r-- | p2p/Group.py | 102 | ||||
-rw-r--r-- | p2p/Service.py | 31 | ||||
-rw-r--r-- | p2p/StreamReader.py | 51 | ||||
-rw-r--r-- | p2p/StreamWriter.py | 43 | ||||
-rw-r--r-- | p2p/network.py | 176 | ||||
-rw-r--r-- | p2p/presence.py | 92 |
7 files changed, 0 insertions, 529 deletions
diff --git a/p2p/Buddy.py b/p2p/Buddy.py deleted file mode 100644 index 19d7c0e..0000000 --- a/p2p/Buddy.py +++ /dev/null @@ -1,34 +0,0 @@ -import pwd -import os - -from Service import * - -PRESENCE_SERVICE_TYPE = "_olpc_presence._tcp" -PRESENCE_SERVICE_PORT = 6000 - -class Buddy: - def __init__(self, service, nick_name): - self._service = service - self._nick_name = nick_name - - def get_service_name(self): - return self._service.get_name() - - def get_nick_name(self): - return self._nick_name - -class Owner(Buddy): - def __init__(self, group): - self._group = group - - nick = pwd.getpwuid(os.getuid())[0] - if not nick or not len(nick): - nick = "n00b" - - service = Service(nick, PRESENCE_SERVICE_TYPE, - '', PRESENCE_SERVICE_PORT) - - Buddy.__init__(self, service, nick) - - def register(self): - self._service.register(self._group) diff --git a/p2p/Group.py b/p2p/Group.py deleted file mode 100644 index dedbc1e..0000000 --- a/p2p/Group.py +++ /dev/null @@ -1,102 +0,0 @@ -import avahi - -import presence -from Buddy import * -from Service import * - -SERVICE_ADDED = "service_added" -SERVICE_REMOVED = "service_removed" - -BUDDY_JOIN = "buddy_join" -BUDDY_LEAVE = "buddy_leave" - -class Group: - def __init__(self): - self._service_listeners = [] - self._presence_listeners = [] - - def join(self, buddy): - pass - - def add_service_listener(self, listener): - self._service_listeners.append(listener) - - def add_presence_listener(self, listener): - self._presence_listeners.append(listener) - - def _notify_service_added(self, service): - for listener in self._service_listeners: - listener(SERVICE_ADDED, buddy) - - def _notify_service_removed(self, service): - for listener in self._service_listeners: - listener(SERVICE_REMOVED,buddy) - - def _notify_buddy_join(self, buddy): - for listener in self._presence_listeners: - listener(BUDDY_JOIN, buddy) - - def _notify_buddy_leave(self, buddy): - for listener in self._presence_listeners: - listener(BUDDY_LEAVE, buddy) - -class LocalGroup(Group): - def __init__(self): - Group.__init__(self) - - self._services = {} - self._buddies = {} - - self._pdiscovery = presence.PresenceDiscovery() - self._pdiscovery.add_service_listener(self._on_service_change) - self._pdiscovery.start() - - def get_owner(self): - return self._owner - - def add_service(self, service): - sid = (service.get_name(), service.get_type()) - self._services[sid] = service - self._notify_service_added(service) - - def remove_service(self, sid): - self._notify_service_removed(service) - del self._services[sid] - - def join(self): - self._owner = Owner(self) - self._owner.register() - - def get_service(self, name, stype): - return self._services[(name, stype)] - - def get_buddy(self, name): - return self._buddies[name] - - def _add_buddy(self, buddy): - bid = buddy.get_nick_name() - if not self._buddies.has_key(bid): - self._buddies[bid] = buddy - self._notify_buddy_join(buddy) - - def _remove_buddy(self, buddy): - self._notify_buddy_leave(buddy) - del self._buddies[buddy.get_nick_name()] - - def _on_service_change(self, action, interface, protocol, name, stype, domain, flags): - if action == presence.ACTION_SERVICE_NEW: - self._pdiscovery.resolve_service(interface, protocol, name, stype, domain, - self._on_service_resolved) - elif action == presence.ACTION_SERVICE_REMOVED: - if stype == PRESENCE_SERVICE_TYPE: - self._remove_buddy(name) - elif stype.startswith("_olpc"): - self.remove_service((name, stype)) - - def _on_service_resolved(self, interface, protocol, name, stype, domain, - host, aprotocol, address, port, txt, flags): - service = Service(name, stype, address, port) - if stype == PRESENCE_SERVICE_TYPE: - self._add_buddy(Buddy(service, name)) - elif stype.startswith("_olpc"): - self.add_service(service) diff --git a/p2p/Service.py b/p2p/Service.py deleted file mode 100644 index 50bbf86..0000000 --- a/p2p/Service.py +++ /dev/null @@ -1,31 +0,0 @@ -import presence - -class Service(object): - def __init__(self, name, stype, address, port, multicast=False): - self._name = name - self._stype = stype - self._address = str(address) - self._port = int(port) - self._multicast = multicast - - def get_name(self): - return self._name - - def get_type(self): - return self._stype - - def get_address(self): - return self._address - - def get_port(self): - return self._port - - def set_port(self, port): - self._port = port - - def is_multicast(self): - return self._multicast - - def register(self, group): - pannounce = presence.PresenceAnnounce() - pannounce.register_service(self._name, self._port, self._stype) diff --git a/p2p/StreamReader.py b/p2p/StreamReader.py deleted file mode 100644 index c108547..0000000 --- a/p2p/StreamReader.py +++ /dev/null @@ -1,51 +0,0 @@ -import network - -class StreamReaderRequestHandler(object): - def __init__(self, reader): - self._reader = reader - - def message(self, nick_name, message): - address = network.get_authinfo() - self._reader.recv(nick_name, message) - return True - -class StreamReader: - def __init__(self, group, service): - self._group = group - self._service = service - - if self._service.is_multicast(): - self._setup_multicast() - else: - self._setup_unicast() - - def set_listener(self, callback): - self._callback = callback - - def _setup_multicast(self): - address = self._service.get_address() - port = self._service.get_port() - server = network.GroupServer(address, port, self._recv_multicast) - server.start() - - def _setup_unicast(self): - started = False - tries = 10 - port = self._service.get_port() - while not started and tries > 0: - try: - p2p_server = network.GlibXMLRPCServer(("", port)) - p2p_server.register_instance(StreamReaderRequestHandler(self)) - started = True - except: - port = port + 1 - tries = tries - 1 - self._service.set_port(port) - - def _recv_multicast(self, msg): - [ nick_name, data ] = msg['data'].split(" |**| ", 2) - self.recv(nick_name, data) - - def recv(self, nick_name, data): - if nick_name != self._group.get_owner().get_nick_name(): - self._callback(self._group.get_buddy(nick_name), data) diff --git a/p2p/StreamWriter.py b/p2p/StreamWriter.py deleted file mode 100644 index f30801e..0000000 --- a/p2p/StreamWriter.py +++ /dev/null @@ -1,43 +0,0 @@ -import xmlrpclib -import traceback -import socket - -import network - -class StreamWriter: - def __init__(self, group, service): - self._group = group - self._service = service - self._address = self._service.get_address() - self._port = self._service.get_port() - - if self._service.is_multicast(): - self._setup_multicast() - else: - self._setup_unicast() - - def write(self, data): - if self._service.is_multicast(): - self._multicast_write(data) - else: - self._unicast_write(data) - - def _setup_unicast(self): - xmlrpc_addr = "http://%s:%d" % (self._address, self._port) - self._uclient = xmlrpclib.ServerProxy(xmlrpc_addr) - - def _unicast_write(self, data): - try: - nick_name = self._group.get_owner().get_nick_name() - self._uclient.message(nick_name, data) - return True - except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError), e: - traceback.print_exc() - return False - - def _setup_multicast(self): - self._mclient = network.GroupClient(self._address, self._port) - - def _multicast_write(self, data): - nick_name = self._group.get_owner().get_nick_name() - self._mclient.send_msg(nick_name + " |**| " + data) diff --git a/p2p/network.py b/p2p/network.py deleted file mode 100644 index c88ede6..0000000 --- a/p2p/network.py +++ /dev/null @@ -1,176 +0,0 @@ -# -*- tab-width: 4; indent-tabs-mode: t -*- - -import socket -import threading -import traceback -import select -import time -import xmlrpclib -import sys - -import gobject -import SimpleXMLRPCServer -import SocketServer - -__authinfos = {} - -def _add_authinfo(authinfo): - __authinfos[threading.currentThread()] = authinfo - -def get_authinfo(): - return __authinfos.get(threading.currentThread()) - -def _del_authinfo(): - del __authinfos[threading.currentThread()] - - -class GlibTCPServer(SocketServer.TCPServer): - """GlibTCPServer - - Integrate socket accept into glib mainloop. - """ - - allow_reuse_address = True - request_queue_size = 20 - - def __init__(self, server_address, RequestHandlerClass): - SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass) - self.socket.setblocking(0) # Set nonblocking - - # Watch the listener socket for data - gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept) - - def _handle_accept(self, source, condition): - if not (condition & gobject.IO_IN): - return True - self.handle_request() - return True - -class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): - """ GlibXMLRPCRequestHandler - - The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass - the client's address and/or SSL certificate into the function that actually - _processes_ the request. So we have to store it in a thread-indexed dict. - """ - - def do_POST(self): - _add_authinfo(self.client_address) - try: - SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self) - except socket.timeout: - pass - except socket.error, e: - print "Error (%s): socket error - '%s'" % (self.client_address, e) - except: - print "Error while processing POST:" - traceback.print_exc() - _del_authinfo() - -class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher): - """GlibXMLRPCServer - - Use nonblocking sockets and handle the accept via glib rather than - blocking on accept(). - """ - - def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1): - self.logRequests = logRequests - - SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self) - GlibTCPServer.__init__(self, addr, requestHandler) - - def _marshaled_dispatch(self, data, dispatch_method = None): - """Dispatches an XML-RPC method from marshalled (XML) data. - - XML-RPC methods are dispatched from the marshalled (XML) data - using the _dispatch method and the result is returned as - marshalled data. For backwards compatibility, a dispatch - function can be provided as an argument (see comment in - SimpleXMLRPCRequestHandler.do_POST) but overriding the - existing method through subclassing is the prefered means - of changing method dispatch behavior. - """ - - params, method = xmlrpclib.loads(data) - - # generate response - try: - if dispatch_method is not None: - response = dispatch_method(method, params) - else: - response = self._dispatch(method, params) - # wrap response in a singleton tuple - response = (response,) - response = xmlrpclib.dumps(response, methodresponse=1) - except xmlrpclib.Fault, fault: - response = xmlrpclib.dumps(fault) - except: - print "Exception while processing request:" - traceback.print_exc() - - # report exception back to server - response = xmlrpclib.dumps( - xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value)) - ) - - return response - -class GroupServer(object): - - _MAX_MSG_SIZE = 500 - - def __init__(self, address, port, data_cb): - self._address = address - self._port = port - self._data_cb = data_cb - - self._setup_listener() - - def _setup_listener(self): - # Listener socket - self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - - # Set some options to make it multicast-friendly - self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - try: - self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - except: - pass - self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20) - self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1) - - def start(self): - # Set some more multicast options - self._listen_sock.bind(('', self._port)) - self._listen_sock.settimeout(2) - intf = socket.gethostbyname(socket.gethostname()) - self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0')) - self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0')) - - # Watch the listener socket for data - gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data) - - def _handle_incoming_data(self, source, condition): - if not (condition & gobject.IO_IN): - return True - msg = {} - msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE) - if self._data_cb: - self._data_cb(msg) - return True - -class GroupClient(object): - - _MAX_MSG_SIZE = 500 - - def __init__(self, address, port): - self._address = address - self._port = port - - self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - # Make the socket multicast-aware, and set TTL. - self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit - - def send_msg(self, data): - self._send_sock.sendto(data, (self._address, self._port)) diff --git a/p2p/presence.py b/p2p/presence.py deleted file mode 100644 index e16fc92..0000000 --- a/p2p/presence.py +++ /dev/null @@ -1,92 +0,0 @@ -# -*- tab-width: 4; indent-tabs-mode: t -*- - -import avahi, dbus, dbus.glib - -ACTION_SERVICE_NEW = 'new' -ACTION_SERVICE_REMOVED = 'removed' - -class PresenceDiscovery(object): - def __init__(self): - self.bus = dbus.SystemBus() - self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER) - self._service_browsers = {} - self._service_type_browsers = {} - self._service_listeners = [] - - def add_service_listener(self, listener): - self._service_listeners.append(listener) - - def start(self): - # Always browse .local - self.browse_domain(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "local") - db = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.DomainBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "", avahi.DOMAIN_BROWSER_BROWSE, dbus.UInt32(0))), avahi.DBUS_INTERFACE_DOMAIN_BROWSER) - db.connect_to_signal('ItemNew', self.new_domain) - - def _error_handler(self, err): - print "Error resolving: %s" % err - - def resolve_service(self, interface, protocol, name, stype, domain, reply_handler, error_handler=None): - if not error_handler: - error_handler = self._error_handler - self.server.ResolveService(int(interface), int(protocol), name, stype, domain, avahi.PROTO_UNSPEC, dbus.UInt32(0), reply_handler=reply_handler, error_handler=error_handler) - - def new_service(self, interface, protocol, name, stype, domain, flags): -# print "Found service '%s' (%d) of type '%s' in domain '%s' on %i.%i." % (name, flags, stype, domain, interface, protocol) - for listener in self._service_listeners: - listener(ACTION_SERVICE_NEW, interface, protocol, name, stype, domain, flags) - - def remove_service(self, interface, protocol, name, stype, domain, flags): -# print "Service '%s' of type '%s' in domain '%s' on %i.%i disappeared." % (name, stype, domain, interface, protocol) - for listener in self._service_listeners: - listener(ACTION_SERVICE_REMOVED, interface, protocol, name, stype, domain, flags) - - def new_service_type(self, interface, protocol, stype, domain, flags): - # Are we already browsing this domain for this type? - if self._service_browsers.has_key((interface, protocol, stype, domain)): - return - -# print "Browsing for services of type '%s' in domain '%s' on %i.%i ..." % (stype, domain, interface, protocol) - - b = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.ServiceBrowserNew(interface, protocol, stype, domain, dbus.UInt32(0))), avahi.DBUS_INTERFACE_SERVICE_BROWSER) - b.connect_to_signal('ItemNew', self.new_service) - b.connect_to_signal('ItemRemove', self.remove_service) - - self._service_browsers[(interface, protocol, stype, domain)] = b - - def browse_domain(self, interface, protocol, domain): - # Are we already browsing this domain? - if self._service_type_browsers.has_key((interface, protocol, domain)): - return - -# print "Browsing domain '%s' on %i.%i ..." % (domain, interface, protocol) - - b = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.ServiceTypeBrowserNew(interface, protocol, domain, dbus.UInt32(0))), avahi.DBUS_INTERFACE_SERVICE_TYPE_BROWSER) - b.connect_to_signal('ItemNew', self.new_service_type) - - self._service_type_browsers[(interface, protocol, domain)] = b - - def new_domain(self,interface, protocol, domain, flags): - if domain != "local": - return - self.browse_domain(interface, protocol, domain) - - -class PresenceAnnounce(object): - def __init__(self): - self.bus = dbus.SystemBus() - self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER) - self._hostname = None - - def register_service(self, rs_name, rs_port, rs_service, **kwargs): - g = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.EntryGroupNew()), avahi.DBUS_INTERFACE_ENTRY_GROUP) - if rs_name is None: - if self._hostname is None: - self._hostname = "%s:%s" % (self.server.GetHostName(), rs_port) - rs_name = self._hostname - - info = ["%s=%s" % (k,v) for k,v in kwargs.items()] - g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, rs_name, rs_service, - "", "", # domain, host (let the system figure it out) - dbus.UInt16(rs_port), info,) - g.Commit() - return g |