Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/Buddy.py34
-rw-r--r--p2p/Group.py102
-rw-r--r--p2p/Service.py31
-rw-r--r--p2p/StreamReader.py51
-rw-r--r--p2p/StreamWriter.py43
-rw-r--r--p2p/network.py176
-rw-r--r--p2p/presence.py92
7 files changed, 0 insertions, 529 deletions
diff --git a/p2p/Buddy.py b/p2p/Buddy.py
deleted file mode 100644
index 19d7c0e..0000000
--- a/p2p/Buddy.py
+++ /dev/null
@@ -1,34 +0,0 @@
-import pwd
-import os
-
-from Service import *
-
-PRESENCE_SERVICE_TYPE = "_olpc_presence._tcp"
-PRESENCE_SERVICE_PORT = 6000
-
-class Buddy:
- def __init__(self, service, nick_name):
- self._service = service
- self._nick_name = nick_name
-
- def get_service_name(self):
- return self._service.get_name()
-
- def get_nick_name(self):
- return self._nick_name
-
-class Owner(Buddy):
- def __init__(self, group):
- self._group = group
-
- nick = pwd.getpwuid(os.getuid())[0]
- if not nick or not len(nick):
- nick = "n00b"
-
- service = Service(nick, PRESENCE_SERVICE_TYPE,
- '', PRESENCE_SERVICE_PORT)
-
- Buddy.__init__(self, service, nick)
-
- def register(self):
- self._service.register(self._group)
diff --git a/p2p/Group.py b/p2p/Group.py
deleted file mode 100644
index dedbc1e..0000000
--- a/p2p/Group.py
+++ /dev/null
@@ -1,102 +0,0 @@
-import avahi
-
-import presence
-from Buddy import *
-from Service import *
-
-SERVICE_ADDED = "service_added"
-SERVICE_REMOVED = "service_removed"
-
-BUDDY_JOIN = "buddy_join"
-BUDDY_LEAVE = "buddy_leave"
-
-class Group:
- def __init__(self):
- self._service_listeners = []
- self._presence_listeners = []
-
- def join(self, buddy):
- pass
-
- def add_service_listener(self, listener):
- self._service_listeners.append(listener)
-
- def add_presence_listener(self, listener):
- self._presence_listeners.append(listener)
-
- def _notify_service_added(self, service):
- for listener in self._service_listeners:
- listener(SERVICE_ADDED, buddy)
-
- def _notify_service_removed(self, service):
- for listener in self._service_listeners:
- listener(SERVICE_REMOVED,buddy)
-
- def _notify_buddy_join(self, buddy):
- for listener in self._presence_listeners:
- listener(BUDDY_JOIN, buddy)
-
- def _notify_buddy_leave(self, buddy):
- for listener in self._presence_listeners:
- listener(BUDDY_LEAVE, buddy)
-
-class LocalGroup(Group):
- def __init__(self):
- Group.__init__(self)
-
- self._services = {}
- self._buddies = {}
-
- self._pdiscovery = presence.PresenceDiscovery()
- self._pdiscovery.add_service_listener(self._on_service_change)
- self._pdiscovery.start()
-
- def get_owner(self):
- return self._owner
-
- def add_service(self, service):
- sid = (service.get_name(), service.get_type())
- self._services[sid] = service
- self._notify_service_added(service)
-
- def remove_service(self, sid):
- self._notify_service_removed(service)
- del self._services[sid]
-
- def join(self):
- self._owner = Owner(self)
- self._owner.register()
-
- def get_service(self, name, stype):
- return self._services[(name, stype)]
-
- def get_buddy(self, name):
- return self._buddies[name]
-
- def _add_buddy(self, buddy):
- bid = buddy.get_nick_name()
- if not self._buddies.has_key(bid):
- self._buddies[bid] = buddy
- self._notify_buddy_join(buddy)
-
- def _remove_buddy(self, buddy):
- self._notify_buddy_leave(buddy)
- del self._buddies[buddy.get_nick_name()]
-
- def _on_service_change(self, action, interface, protocol, name, stype, domain, flags):
- if action == presence.ACTION_SERVICE_NEW:
- self._pdiscovery.resolve_service(interface, protocol, name, stype, domain,
- self._on_service_resolved)
- elif action == presence.ACTION_SERVICE_REMOVED:
- if stype == PRESENCE_SERVICE_TYPE:
- self._remove_buddy(name)
- elif stype.startswith("_olpc"):
- self.remove_service((name, stype))
-
- def _on_service_resolved(self, interface, protocol, name, stype, domain,
- host, aprotocol, address, port, txt, flags):
- service = Service(name, stype, address, port)
- if stype == PRESENCE_SERVICE_TYPE:
- self._add_buddy(Buddy(service, name))
- elif stype.startswith("_olpc"):
- self.add_service(service)
diff --git a/p2p/Service.py b/p2p/Service.py
deleted file mode 100644
index 50bbf86..0000000
--- a/p2p/Service.py
+++ /dev/null
@@ -1,31 +0,0 @@
-import presence
-
-class Service(object):
- def __init__(self, name, stype, address, port, multicast=False):
- self._name = name
- self._stype = stype
- self._address = str(address)
- self._port = int(port)
- self._multicast = multicast
-
- def get_name(self):
- return self._name
-
- def get_type(self):
- return self._stype
-
- def get_address(self):
- return self._address
-
- def get_port(self):
- return self._port
-
- def set_port(self, port):
- self._port = port
-
- def is_multicast(self):
- return self._multicast
-
- def register(self, group):
- pannounce = presence.PresenceAnnounce()
- pannounce.register_service(self._name, self._port, self._stype)
diff --git a/p2p/StreamReader.py b/p2p/StreamReader.py
deleted file mode 100644
index c108547..0000000
--- a/p2p/StreamReader.py
+++ /dev/null
@@ -1,51 +0,0 @@
-import network
-
-class StreamReaderRequestHandler(object):
- def __init__(self, reader):
- self._reader = reader
-
- def message(self, nick_name, message):
- address = network.get_authinfo()
- self._reader.recv(nick_name, message)
- return True
-
-class StreamReader:
- def __init__(self, group, service):
- self._group = group
- self._service = service
-
- if self._service.is_multicast():
- self._setup_multicast()
- else:
- self._setup_unicast()
-
- def set_listener(self, callback):
- self._callback = callback
-
- def _setup_multicast(self):
- address = self._service.get_address()
- port = self._service.get_port()
- server = network.GroupServer(address, port, self._recv_multicast)
- server.start()
-
- def _setup_unicast(self):
- started = False
- tries = 10
- port = self._service.get_port()
- while not started and tries > 0:
- try:
- p2p_server = network.GlibXMLRPCServer(("", port))
- p2p_server.register_instance(StreamReaderRequestHandler(self))
- started = True
- except:
- port = port + 1
- tries = tries - 1
- self._service.set_port(port)
-
- def _recv_multicast(self, msg):
- [ nick_name, data ] = msg['data'].split(" |**| ", 2)
- self.recv(nick_name, data)
-
- def recv(self, nick_name, data):
- if nick_name != self._group.get_owner().get_nick_name():
- self._callback(self._group.get_buddy(nick_name), data)
diff --git a/p2p/StreamWriter.py b/p2p/StreamWriter.py
deleted file mode 100644
index f30801e..0000000
--- a/p2p/StreamWriter.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import xmlrpclib
-import traceback
-import socket
-
-import network
-
-class StreamWriter:
- def __init__(self, group, service):
- self._group = group
- self._service = service
- self._address = self._service.get_address()
- self._port = self._service.get_port()
-
- if self._service.is_multicast():
- self._setup_multicast()
- else:
- self._setup_unicast()
-
- def write(self, data):
- if self._service.is_multicast():
- self._multicast_write(data)
- else:
- self._unicast_write(data)
-
- def _setup_unicast(self):
- xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
- self._uclient = xmlrpclib.ServerProxy(xmlrpc_addr)
-
- def _unicast_write(self, data):
- try:
- nick_name = self._group.get_owner().get_nick_name()
- self._uclient.message(nick_name, data)
- return True
- except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError), e:
- traceback.print_exc()
- return False
-
- def _setup_multicast(self):
- self._mclient = network.GroupClient(self._address, self._port)
-
- def _multicast_write(self, data):
- nick_name = self._group.get_owner().get_nick_name()
- self._mclient.send_msg(nick_name + " |**| " + data)
diff --git a/p2p/network.py b/p2p/network.py
deleted file mode 100644
index c88ede6..0000000
--- a/p2p/network.py
+++ /dev/null
@@ -1,176 +0,0 @@
-# -*- tab-width: 4; indent-tabs-mode: t -*-
-
-import socket
-import threading
-import traceback
-import select
-import time
-import xmlrpclib
-import sys
-
-import gobject
-import SimpleXMLRPCServer
-import SocketServer
-
-__authinfos = {}
-
-def _add_authinfo(authinfo):
- __authinfos[threading.currentThread()] = authinfo
-
-def get_authinfo():
- return __authinfos.get(threading.currentThread())
-
-def _del_authinfo():
- del __authinfos[threading.currentThread()]
-
-
-class GlibTCPServer(SocketServer.TCPServer):
- """GlibTCPServer
-
- Integrate socket accept into glib mainloop.
- """
-
- allow_reuse_address = True
- request_queue_size = 20
-
- def __init__(self, server_address, RequestHandlerClass):
- SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass)
- self.socket.setblocking(0) # Set nonblocking
-
- # Watch the listener socket for data
- gobject.io_add_watch(self.socket, gobject.IO_IN, self._handle_accept)
-
- def _handle_accept(self, source, condition):
- if not (condition & gobject.IO_IN):
- return True
- self.handle_request()
- return True
-
-class GlibXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
- """ GlibXMLRPCRequestHandler
-
- The stock SimpleXMLRPCRequestHandler and server don't allow any way to pass
- the client's address and/or SSL certificate into the function that actually
- _processes_ the request. So we have to store it in a thread-indexed dict.
- """
-
- def do_POST(self):
- _add_authinfo(self.client_address)
- try:
- SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.do_POST(self)
- except socket.timeout:
- pass
- except socket.error, e:
- print "Error (%s): socket error - '%s'" % (self.client_address, e)
- except:
- print "Error while processing POST:"
- traceback.print_exc()
- _del_authinfo()
-
-class GlibXMLRPCServer(GlibTCPServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
- """GlibXMLRPCServer
-
- Use nonblocking sockets and handle the accept via glib rather than
- blocking on accept().
- """
-
- def __init__(self, addr, requestHandler=GlibXMLRPCRequestHandler, logRequests=1):
- self.logRequests = logRequests
-
- SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
- GlibTCPServer.__init__(self, addr, requestHandler)
-
- def _marshaled_dispatch(self, data, dispatch_method = None):
- """Dispatches an XML-RPC method from marshalled (XML) data.
-
- XML-RPC methods are dispatched from the marshalled (XML) data
- using the _dispatch method and the result is returned as
- marshalled data. For backwards compatibility, a dispatch
- function can be provided as an argument (see comment in
- SimpleXMLRPCRequestHandler.do_POST) but overriding the
- existing method through subclassing is the prefered means
- of changing method dispatch behavior.
- """
-
- params, method = xmlrpclib.loads(data)
-
- # generate response
- try:
- if dispatch_method is not None:
- response = dispatch_method(method, params)
- else:
- response = self._dispatch(method, params)
- # wrap response in a singleton tuple
- response = (response,)
- response = xmlrpclib.dumps(response, methodresponse=1)
- except xmlrpclib.Fault, fault:
- response = xmlrpclib.dumps(fault)
- except:
- print "Exception while processing request:"
- traceback.print_exc()
-
- # report exception back to server
- response = xmlrpclib.dumps(
- xmlrpclib.Fault(1, "%s:%s" % (sys.exc_type, sys.exc_value))
- )
-
- return response
-
-class GroupServer(object):
-
- _MAX_MSG_SIZE = 500
-
- def __init__(self, address, port, data_cb):
- self._address = address
- self._port = port
- self._data_cb = data_cb
-
- self._setup_listener()
-
- def _setup_listener(self):
- # Listener socket
- self._listen_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-
- # Set some options to make it multicast-friendly
- self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- try:
- self._listen_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- except:
- pass
- self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 20)
- self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1)
-
- def start(self):
- # Set some more multicast options
- self._listen_sock.bind(('', self._port))
- self._listen_sock.settimeout(2)
- intf = socket.gethostbyname(socket.gethostname())
- self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
- self._listen_sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(self._address) + socket.inet_aton('0.0.0.0'))
-
- # Watch the listener socket for data
- gobject.io_add_watch(self._listen_sock, gobject.IO_IN, self._handle_incoming_data)
-
- def _handle_incoming_data(self, source, condition):
- if not (condition & gobject.IO_IN):
- return True
- msg = {}
- msg['data'], (msg['addr'], msg['port']) = source.recvfrom(self._MAX_MSG_SIZE)
- if self._data_cb:
- self._data_cb(msg)
- return True
-
-class GroupClient(object):
-
- _MAX_MSG_SIZE = 500
-
- def __init__(self, address, port):
- self._address = address
- self._port = port
-
- self._send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- # Make the socket multicast-aware, and set TTL.
- self._send_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 20) # Change TTL (=20) to suit
-
- def send_msg(self, data):
- self._send_sock.sendto(data, (self._address, self._port))
diff --git a/p2p/presence.py b/p2p/presence.py
deleted file mode 100644
index e16fc92..0000000
--- a/p2p/presence.py
+++ /dev/null
@@ -1,92 +0,0 @@
-# -*- tab-width: 4; indent-tabs-mode: t -*-
-
-import avahi, dbus, dbus.glib
-
-ACTION_SERVICE_NEW = 'new'
-ACTION_SERVICE_REMOVED = 'removed'
-
-class PresenceDiscovery(object):
- def __init__(self):
- self.bus = dbus.SystemBus()
- self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER)
- self._service_browsers = {}
- self._service_type_browsers = {}
- self._service_listeners = []
-
- def add_service_listener(self, listener):
- self._service_listeners.append(listener)
-
- def start(self):
- # Always browse .local
- self.browse_domain(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "local")
- db = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.DomainBrowserNew(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, "", avahi.DOMAIN_BROWSER_BROWSE, dbus.UInt32(0))), avahi.DBUS_INTERFACE_DOMAIN_BROWSER)
- db.connect_to_signal('ItemNew', self.new_domain)
-
- def _error_handler(self, err):
- print "Error resolving: %s" % err
-
- def resolve_service(self, interface, protocol, name, stype, domain, reply_handler, error_handler=None):
- if not error_handler:
- error_handler = self._error_handler
- self.server.ResolveService(int(interface), int(protocol), name, stype, domain, avahi.PROTO_UNSPEC, dbus.UInt32(0), reply_handler=reply_handler, error_handler=error_handler)
-
- def new_service(self, interface, protocol, name, stype, domain, flags):
-# print "Found service '%s' (%d) of type '%s' in domain '%s' on %i.%i." % (name, flags, stype, domain, interface, protocol)
- for listener in self._service_listeners:
- listener(ACTION_SERVICE_NEW, interface, protocol, name, stype, domain, flags)
-
- def remove_service(self, interface, protocol, name, stype, domain, flags):
-# print "Service '%s' of type '%s' in domain '%s' on %i.%i disappeared." % (name, stype, domain, interface, protocol)
- for listener in self._service_listeners:
- listener(ACTION_SERVICE_REMOVED, interface, protocol, name, stype, domain, flags)
-
- def new_service_type(self, interface, protocol, stype, domain, flags):
- # Are we already browsing this domain for this type?
- if self._service_browsers.has_key((interface, protocol, stype, domain)):
- return
-
-# print "Browsing for services of type '%s' in domain '%s' on %i.%i ..." % (stype, domain, interface, protocol)
-
- b = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.ServiceBrowserNew(interface, protocol, stype, domain, dbus.UInt32(0))), avahi.DBUS_INTERFACE_SERVICE_BROWSER)
- b.connect_to_signal('ItemNew', self.new_service)
- b.connect_to_signal('ItemRemove', self.remove_service)
-
- self._service_browsers[(interface, protocol, stype, domain)] = b
-
- def browse_domain(self, interface, protocol, domain):
- # Are we already browsing this domain?
- if self._service_type_browsers.has_key((interface, protocol, domain)):
- return
-
-# print "Browsing domain '%s' on %i.%i ..." % (domain, interface, protocol)
-
- b = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.ServiceTypeBrowserNew(interface, protocol, domain, dbus.UInt32(0))), avahi.DBUS_INTERFACE_SERVICE_TYPE_BROWSER)
- b.connect_to_signal('ItemNew', self.new_service_type)
-
- self._service_type_browsers[(interface, protocol, domain)] = b
-
- def new_domain(self,interface, protocol, domain, flags):
- if domain != "local":
- return
- self.browse_domain(interface, protocol, domain)
-
-
-class PresenceAnnounce(object):
- def __init__(self):
- self.bus = dbus.SystemBus()
- self.server = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER), avahi.DBUS_INTERFACE_SERVER)
- self._hostname = None
-
- def register_service(self, rs_name, rs_port, rs_service, **kwargs):
- g = dbus.Interface(self.bus.get_object(avahi.DBUS_NAME, self.server.EntryGroupNew()), avahi.DBUS_INTERFACE_ENTRY_GROUP)
- if rs_name is None:
- if self._hostname is None:
- self._hostname = "%s:%s" % (self.server.GetHostName(), rs_port)
- rs_name = self._hostname
-
- info = ["%s=%s" % (k,v) for k,v in kwargs.items()]
- g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC, 0, rs_name, rs_service,
- "", "", # domain, host (let the system figure it out)
- dbus.UInt16(rs_port), info,)
- g.Commit()
- return g