Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar/p2p/Stream.py
blob: b3239b30cb7baa551bc9a574a4ae666a957fea69 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Copyright (C) 2006, Red Hat, Inc.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.

import xmlrpclib
import socket
import traceback
import random
import logging

import network
from MostlyReliablePipe import MostlyReliablePipe
from sugar.presence import Service

def is_multicast_address(address):
    """Simple numerical check for whether an IP4 address
    is in the range for multicast addresses or not."""
    if not address:
        return False
    if address[3] != '.':
        return False
    first = int(float(address[:3]))
    if first >= 224 and first <= 239:
        return True
    return False

class Stream(object):
    def __init__(self, service):
        if not service.get_port():
            raise ValueError("service must have an address.")
        self._service = service
        self._reader_port = self._service.get_port()
        self._writer_port = self._reader_port
        self._address = self._service.get_address()
        self._callback = None

    def new_from_service(service, start_reader=True):
        if is_multicast_address(service.get_address()):
            return MulticastStream(service)
        else:
            return UnicastStream(service, start_reader)
    new_from_service = staticmethod(new_from_service)

    def set_data_listener(self, callback):
        self._callback = callback

    def _recv(self, address, data):
        if self._callback:
            self._callback(address, data)


class UnicastStreamWriter(object):
    def __init__(self, stream, service):
        # set up the writer
        self._service = service
        if not service.get_address():
            raise ValueError("service must have a valid address.")
        self._address = self._service.get_address()
        self._port = self._service.get_port()
        self._xmlrpc_addr = "http://%s:%d" % (self._address, self._port)
        self._writer = network.GlibServerProxy(self._xmlrpc_addr)

    def write(self, xmlrpc_data):
        """Write some data to the default endpoint of this pipe on the remote server."""
        try:
            self._writer.message(None, None, xmlrpc_data)
            return True
        except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
            traceback.print_exc()
        return False

    def custom_request(self, method_name, request_cb, user_data, *args):
        """Call a custom XML-RPC method on the remote server."""
        try:
            method = getattr(self._writer, method_name)
            method(request_cb, user_data, *args)
            return True
        except (socket.error, xmlrpclib.Fault, xmlrpclib.ProtocolError):
            traceback.print_exc()
        return False


class UnicastStream(Stream):
    def __init__(self, service, start_reader=True):
        """Initializes the stream.  If the 'start_reader' argument is True,
        the stream will initialize and start a new stream reader, if it
        is False, no reader will be created and the caller must call the
        start_reader() method to start the stream reader and be able to
        receive any data from the stream."""
        Stream.__init__(self, service)
        if start_reader:
            self.start_reader()

    def start_reader(self):
        """Start the stream's reader, which for UnicastStream objects is
        and XMLRPC server.  If there's a port conflict with some other
        service, the reader will try to find another port to use instead.
        Returns the port number used for the reader."""
        # Set up the reader
        self._reader = network.GlibXMLRPCServer(("", self._reader_port))
        self._reader.register_function(self._message, "message")

    def _message(self, message):
        """Called by the XMLRPC server when network data arrives."""
        address = network.get_authinfo()
        self._recv(address, message)
        return True

    def register_reader_handler(self, handler, name):
        """Register a custom message handler with the reader.  This call
        adds a custom XMLRPC method call with the name 'name' to the reader's
        XMLRPC server, which then calls the 'handler' argument back when
        a method call for it arrives over the network."""
        if name == "message":
            raise ValueError("Handler name 'message' is a reserved handler.")
        self._reader.register_function(handler, name)

    def new_writer(self, service):
        """Return a new stream writer object."""
        return UnicastStreamWriter(self, service)


class MulticastStream(Stream):
    def __init__(self, service):
        Stream.__init__(self, service)
        self._service = service
        self._internal_start_reader()

    def start_reader(self):
        return self._reader_port

    def _internal_start_reader(self):
        logging.debug('Start multicast stream, address %s, port %d' % (self._address, self._reader_port))
        if not self._service.get_address():
            raise ValueError("service must have a valid address.")
        self._pipe = MostlyReliablePipe('', self._address, self._reader_port,
                self._recv_data_cb)
        self._pipe.start()

    def write(self, data):
        self._pipe.send(data)

    def _recv_data_cb(self, address, data, user_data=None):
        self._recv(address[0], data)

    def new_writer(self, service=None):
        return self