Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar/p2p/Stream.py
blob: 7f00c575699e1d00196b7fbee41a28f2e361ead1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import xmlrpclib
import socket
import traceback

import pygtk
pygtk.require('2.0')
import gobject


import network
from MostlyReliablePipe import MostlyReliablePipe

class Stream(object):
	def __init__(self, service, group):
		if not service:
			raise ValueError("service must be valid")
		self._service = service
		self._group = group
		self._owner_nick_name = self._group.get_owner().get_nick_name()
		self._port = self._service.get_port()
		self._address = self._service.get_address()
		self._callback = None

	def new_from_service(service, group):
		if service.is_multicast():
			return MulticastStream(service, group)
		else:
			return UnicastStream(service, group)
	new_from_service = staticmethod(new_from_service)

	def set_data_listener(self, callback):
		self._callback = callback

	def recv(self, nick_name, data):
		if nick_name != self._owner_nick_name:
			if self._callback:
				self._callback(self._group.get_buddy(nick_name), data)


class UnicastStreamWriter(object):
	def __init__(self, stream, service, owner_nick_name):
		# set up the writer
		if not service:
			raise ValueError("service must be valid")
		self._service = service
		self._owner_nick_name = owner_nick_name
		self._address = self._service.get_address()
		self._port = self._service.get_port()
		self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
		self._writer = network.GlibServerProxy(self._xmlrpc_addr)

	def write(self, xmlrpc_data):
		"""Write some data to the default endpoint of this pipe on the remote server."""
		try:
			self._writer.message(None, None, self._owner_nick_name, xmlrpc_data)
			return True
		except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
			traceback.print_exc()
		return False

	def custom_request(self, method_name, request_cb, user_data, *args):
		"""Call a custom XML-RPC method on the remote server."""
		try:
			method = getattr(self._writer, method_name)
			method(request_cb, user_data, *args)
			return True
		except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
			traceback.print_exc()
		return False


class UnicastStream(Stream):
	def __init__(self, service, group):
		Stream.__init__(self, service, group)
		self._setup()

	def _setup(self):
		# Set up the reader
		started = False
		tries = 10
		port = self._service.get_port()
		self._reader = None
		while not started and tries > 0:
			try:
				self._reader = network.GlibXMLRPCServer(("", port))
				self._reader.register_function(self._message, "message")
				started = True
			except(socket.error):
				port = port + 1
				tries = tries - 1
		self._service.set_port(port)

	def _message(self, nick_name, message):
		"""Called by the XMLRPC server when network data arrives."""
		self.recv(nick_name, message)
		return True

	def register_handler(self, handler, name):
		if name == "message":
			raise ValueError("Handler name 'message' is a reserved handler.")
		self._reader.register_function(handler, name)

	def new_writer(self, service):
		return UnicastStreamWriter(self, service, self._owner_nick_name)


class MulticastStream(Stream):
	def __init__(self, service, group):
		Stream.__init__(self, service, group)
		self._address = self._service.get_group_address()
		self._setup()

	def _setup(self):
		self._pipe = MostlyReliablePipe('', self._address, self._port, self._recv_data_cb)
		self._pipe.start()

	def write(self, data):
		self._pipe.send(self._owner_nick_name + " |**| " + data)

	def _recv_data_cb(self, addr, data, user_data=None):
		[ nick_name, data ] = data.split(" |**| ", 2)
		self.recv(nick_name, data)

	def new_writer(self, service=None):
		return self