Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/Rpyc/Stream.py
blob: f7e42b97b5989c78b344b71a0fc9c3a27bf60933 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import select
import socket


class Stream(object):
    """
    a stream is a file-like object that is used to expose a consistent and uniform 
    interface to the underlying 'physical' file-like object (like sockets and pipes),
    which have many quirks (sockets may recv() less than `count`, pipes are simplex
    and don't flush, etc.). a stream is always in blocking mode.
    """
    __slots__ = []
    def close(self):
        raise NotImplementedError()
    def fileno(self):
        raise NotImplementedError()
    def is_available(self):
        rlist, wlist, xlist = select.select([self], [], [], 0)
        return bool(rlist)
    def read(self, count):
        raise NotImplementedError()
    def write(self, data):
        raise NotImplementedError()

class SocketStream(Stream):
    """
    a stream that operates over a socket. the socket is expected to be in 
    blocking mode and reliable (i.e., TCP)
    """
    CONNECT_TIMEOUT = 5
    __slots__ = ["sock"]
    def __init__(self, sock):
        self.sock = sock
    def __repr__(self):
        host, port = self.sock.getpeername()
        return "<%s(%s:%d)>" % (self.__class__.__name__, host, port)
    def fileno(self):
        return self.sock.fileno()
    def close(self):
        self.sock.close()
    def read(self, count):
        data = []
        while count > 0:
            buf = self.sock.recv(count)
            if not buf:
                raise EOFError()
            count -= len(buf)
            data.append(buf)
        return "".join(data)
    def write(self, data):
        while data:
            count = self.sock.send(data)
            data = data[count:]
    @classmethod
    def from_new_socket(cls, host, port):
        sock = socket.socket()
        sock.settimeout(cls.CONNECT_TIMEOUT)
        sock.connect((host, port))
        sock.settimeout(None)
        return cls(sock)
    @classmethod
    def from_new_secure_socket(cls, host, port, username, password):
        from tlslite.api import TLSConnection
        stream = cls.from_new_socket(host, port)
        stream.sock = TLSConnection(stream.sock)
        stream.sock.handshakeClientSRP(username, password)
        return stream
    @classmethod
    def from_secure_server_socket(cls, sock, vdb):
        from tlslite.api import TLSConnection
        sock = TLSConnection(sock)
        sock.handshakeServer(verifierDB=vdb)
        return cls(sock)

class PipeStream(Stream):
    """
    a stream that operates over two simplex pipes. the pipes are expected 
    to be in blocking mode
    """
    __slots__ = ["incoming", "outgoing"]
    def __init__(self, incoming, outgoing):
        self.incoming = incoming
        self.outgoing = outgoing
    def fileno(self):
        return self.incoming.fileno()
    def close(self):
        self.incoming.close()
        self.outgoing.close()
    def read(self, count):
        data = []
        while count > 0:
            buf = self.incoming.read(count)
            if not buf:
                raise EOFError()
            count -= len(buf)
            data.append(buf)
        return "".join(data)
    def write(self, data):
        self.outgoing.write(data)
        self.outgoing.flush()

    #
    # win32 stub (can't select() on pipes) -- this stub causes problems with
    # Async objects: doing obj.is_ready blocks. but it's better to have at 
    # least some functionality with pipes on win32 than none at all.
    #
    from sys import platform
    
    if platform == "win32":
        def is_available(self):
            return True