Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/Rpyc
diff options
context:
space:
mode:
authorolpc user <olpc@localhost.localdomain>2010-01-30 13:13:29 (GMT)
committer olpc user <olpc@localhost.localdomain>2010-01-30 13:13:29 (GMT)
commitf8a54f5c2f4487f264fe4647cec62599db93d063 (patch)
tree8758b1412c524129dad7e90a4da6b463a85ed96f /Rpyc
initial save 1-30-2010 bangladesh
Diffstat (limited to 'Rpyc')
-rw-r--r--Rpyc/AsyncNetProxy.py85
-rw-r--r--Rpyc/Boxing.py123
-rw-r--r--Rpyc/Channel.py57
-rw-r--r--Rpyc/Connection.py212
-rw-r--r--Rpyc/Demo/__init__.py0
-rw-r--r--Rpyc/Demo/demo-1.py156
-rw-r--r--Rpyc/Demo/demo-2.py81
-rw-r--r--Rpyc/Demo/demo-3.py129
-rw-r--r--Rpyc/Demo/demo-4.py41
-rw-r--r--Rpyc/Demo/demo-5.py66
-rw-r--r--Rpyc/Demo/demo-6.py130
-rw-r--r--Rpyc/Demo/pipe-child.py8
-rw-r--r--Rpyc/Demo/pipe-parent.py17
-rw-r--r--Rpyc/Demo/testmodule.py19
-rw-r--r--Rpyc/Demo/testsuite.bat6
-rw-r--r--Rpyc/Lib.py71
-rw-r--r--Rpyc/ModuleNetProxy.py55
-rw-r--r--Rpyc/NetProxy.py117
-rw-r--r--Rpyc/Servers/Users.py9
-rw-r--r--Rpyc/Servers/__init__.py0
-rw-r--r--Rpyc/Servers/forking_server.py32
-rw-r--r--Rpyc/Servers/selecting_server.py35
-rw-r--r--Rpyc/Servers/simple_server.py13
-rw-r--r--Rpyc/Servers/std_server.py31
-rw-r--r--Rpyc/Servers/threaded_server.py10
-rw-r--r--Rpyc/Servers/tls_server.py20
-rw-r--r--Rpyc/Stream.py113
-rw-r--r--Rpyc/Utils/Builtins.py152
-rw-r--r--Rpyc/Utils/Discovery.py38
-rw-r--r--Rpyc/Utils/Dist.py37
-rw-r--r--Rpyc/Utils/Factories.py56
-rw-r--r--Rpyc/Utils/Files.py112
-rw-r--r--Rpyc/Utils/Helpers.py149
-rw-r--r--Rpyc/Utils/Interpreter.py39
-rw-r--r--Rpyc/Utils/Serving.py126
-rw-r--r--Rpyc/Utils/__init__.py7
-rw-r--r--Rpyc/__init__.py33
-rw-r--r--Rpyc/tests/isinstance.py53
38 files changed, 2438 insertions, 0 deletions
diff --git a/Rpyc/AsyncNetProxy.py b/Rpyc/AsyncNetProxy.py
new file mode 100644
index 0000000..06b230a
--- /dev/null
+++ b/Rpyc/AsyncNetProxy.py
@@ -0,0 +1,85 @@
+from NetProxy import NetProxyWrapper, _get_conn, _get_oid
+from Lib import raise_exception
+
+
+class AsyncNetProxy(NetProxyWrapper):
+ """wraps an exiting synchronous netproxy to make is asynchronous
+ (remote operations return AsyncResult objects)"""
+ __slots__ = []
+
+ def __request__(self, handler, *args):
+ res = AsyncResult(_get_conn(self))
+ _get_conn(self).async_request(res.callback, handler, _get_oid(self), *args)
+ return res
+
+ # must return a string... and it's not meaningful to return the repr of an async result
+ def __repr__(self, *args):
+ return self.__request__("handle_repr", *args).result
+ def __str__(self, *args):
+ return self.__request__("handle_str", *args).result
+
+
+class AsyncResult(object):
+ """represents the result of an asynchronous operation"""
+ STATE_PENDING = "pending"
+ STATE_READY = "ready"
+ STATE_EXCEPTION = "exception"
+ __slots__ = ["conn", "_state", "_result", "_on_ready"]
+
+ def __init__(self, conn):
+ self.conn = conn
+ self._state = self.STATE_PENDING
+ self._result = None
+ self._on_ready = None
+
+ def __repr__(self):
+ return "<AsyncResult (%s) at 0x%08x>" % (self._state, id(self))
+
+ def callback(self, obj, is_exception):
+ self._result = obj
+ if is_exception:
+ self._state = self.STATE_EXCEPTION
+ else:
+ self._state = self.STATE_READY
+ if self._on_ready is not None:
+ self._on_ready(self)
+
+ def _get_on_ready(self):
+ return self._ready_callback
+
+ def _set_on_ready(self, obj):
+ self._on_ready = obj
+ if self._state != self.STATE_PENDING:
+ self._on_ready(self)
+
+ def _get_is_ready(self):
+ if self._state == self.STATE_PENDING:
+ self.conn.poll()
+ return self._state != self.STATE_PENDING
+
+ def _get_result(self):
+ while self._state == self.STATE_PENDING:
+ self.conn.serve()
+ if self._state == self.STATE_READY:
+ return self._result
+ elif self._state == self.STATE_EXCEPTION:
+ raise_exception(*self._result)
+
+ is_ready = property(_get_is_ready,
+ doc = "indicates whether or not the result is ready")
+ result = property(_get_result,
+ doc = "the value of the async result (may block)")
+ on_ready = property(_get_on_ready, _set_on_ready,
+ doc = "if not None, specifies a callback which is called when the result is ready")
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Rpyc/Boxing.py b/Rpyc/Boxing.py
new file mode 100644
index 0000000..4797f2a
--- /dev/null
+++ b/Rpyc/Boxing.py
@@ -0,0 +1,123 @@
+import sys
+import traceback
+import cPickle as pickle
+from weakref import WeakValueDictionary
+from Lib import ImmDict
+from NetProxy import NetProxy, SyncNetProxy, _get_conn, _get_oid
+from Lib import orig_isinstance
+
+
+class BoxingError(Exception):
+ pass
+class NestedException(Exception):
+ pass
+
+PICKLE_PROTOCOL = pickle.HIGHEST_PROTOCOL
+TYPE_SIMPLE = 0
+TYPE_PROXY = 1
+TYPE_TUPLE = 2
+TYPE_SLICE = 3
+TYPE_LOCAL_PROXY = 4
+TYPE_IMMDICT = 5
+simple_types = (
+ bool,
+ int,
+ long,
+ float,
+ complex,
+ basestring,
+ type(None),
+)
+
+def dump_exception(typ, val, tb):
+ """dumps the given exception using pickle (since not all exceptions are picklable)"""
+ tbtext = "".join(traceback.format_exception(typ, val, tb))
+ sys.last_type, sys.last_value, sys.last_traceback = typ, val, tb
+ try:
+ pickled_exc = pickle.dumps((typ, val, tbtext), protocol = PICKLE_PROTOCOL)
+ except pickle.PicklingError, ex:
+ newval = NestedException("pickling error %s\nexception type: %r\nexception object: %s" % (ex, typ, val))
+ pickled_exc = pickle.dumps((NestedException, newval, tbtext), protocol = PICKLE_PROTOCOL)
+ return pickled_exc
+
+def load_exception(package):
+ """returns an exception object"""
+ try:
+ return pickle.loads(package)
+ except pickle.PicklingError, ex:
+ return NestedException("failed to unpickle remote exception -- %r" % (ex,))
+
+class Box(object):
+ """a box is where local objects are stored, and remote proxies are created"""
+ __slots__ = ["conn", "objects", "proxy_cache"]
+
+ def __init__(self, conn):
+ self.conn = conn
+ self.objects = {}
+ self.proxy_cache = WeakValueDictionary()
+
+ def close(self):
+ del self.conn
+ del self.objects
+ del self.proxy_cache
+
+ def __getitem__(self, oid):
+ return self.objects[oid][1]
+
+ def _box(self, obj):
+ if orig_isinstance(obj, simple_types):
+ return TYPE_SIMPLE, obj
+ elif orig_isinstance(obj, slice):
+ return TYPE_SLICE, (obj.start, obj.stop, obj.step)
+ elif orig_isinstance(obj, NetProxy) and _get_conn(obj) is self.conn:
+ return TYPE_LOCAL_PROXY, _get_oid(obj)
+ elif orig_isinstance(obj, tuple):
+ if obj:
+ return TYPE_TUPLE, [self._box(subobj) for subobj in obj]
+ else:
+ return TYPE_SIMPLE, ()
+ elif orig_isinstance(obj, ImmDict):
+ if not obj.dict:
+ return TYPE_SIMPLE, {}
+ else:
+ return TYPE_IMMDICT, [(self._box(k), self._box(v)) for k, v in obj.items()]
+ else:
+ oid = id(obj)
+ self.objects.setdefault(oid, [0, obj])[0] += 1
+ return TYPE_PROXY, oid
+
+ def _unbox(self, (type, value)):
+ if type == TYPE_SIMPLE:
+ return value
+ elif type == TYPE_TUPLE:
+ return tuple(self._unbox(subobj) for subobj in value)
+ elif type == TYPE_SLICE:
+ return slice(*value)
+ elif type == TYPE_LOCAL_PROXY:
+ return self[value]
+ elif type == TYPE_IMMDICT:
+ return dict((self._unbox(k), self._unbox(v)) for k, v in value)
+ elif type == TYPE_PROXY:
+ if value in self.proxy_cache:
+ proxy = self.proxy_cache[value]
+ else:
+ proxy = SyncNetProxy(self.conn, value)
+ self.proxy_cache[value] = proxy
+ return proxy
+ else:
+ raise BoxingError("invalid boxed object type", type, value)
+
+ def decref(self, oid):
+ self.objects[oid][0] -= 1
+ if self.objects[oid][0] <= 0:
+ del self.objects[oid]
+
+ def pack(self, obj):
+ """packs an object (returns a package)"""
+ return pickle.dumps(self._box(obj), protocol = PICKLE_PROTOCOL)
+
+ def unpack(self, package):
+ """unpacks a package (returns an object)"""
+ return self._unbox(pickle.loads(package))
+
+
diff --git a/Rpyc/Channel.py b/Rpyc/Channel.py
new file mode 100644
index 0000000..8db2f6f
--- /dev/null
+++ b/Rpyc/Channel.py
@@ -0,0 +1,57 @@
+from threading import RLock
+import struct
+
+
+class Channel(object):
+ """
+ a channel transfers frames over a stream. a frame is any blob of data,
+ up to 4GB in size. it is made of a type field (byte), a sequence number
+ (dword), and a length field (dword), followed by raw data. at the end
+ of the frame, a new line marker (\\r\\n) is appended, to make sure the
+ transport layer will send the message without buffering (to overcome
+ newline buffering). apart from that, channels are duplex, and can do both
+ sending and receiving in a thread-safe manner.
+ """
+ HEADER_FORMAT = "<BLL"
+ HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
+ __slots__ = ["send_lock", "recv_lock", "stream", "seq"]
+
+ def __init__(self, stream):
+ self.send_lock = RLock()
+ self.recv_lock = RLock()
+ self.stream = stream
+ self.seq = 0
+ def __repr__(self):
+ return "<%s(%r)>" % (self.__class__.__name__, self.stream)
+ def close(self):
+ self.stream.close()
+ def fileno(self):
+ return self.stream.fileno()
+ def is_available(self):
+ return self.stream.is_available()
+
+ def send(self, type, seq, data):
+ """sends the given (type, seq, data) frame"""
+ try:
+ self.send_lock.acquire()
+ if seq is None:
+ seq = self.seq
+ self.seq += 1
+ header = struct.pack(self.HEADER_FORMAT, type, seq, len(data))
+ self.stream.write(header + data + "\r\n")
+ return seq
+ finally:
+ self.send_lock.release()
+
+ def recv(self):
+ """returns the next (type, seq, data) frame (blocking)"""
+ try:
+ self.recv_lock.acquire()
+ type, seq, length = struct.unpack(self.HEADER_FORMAT, self.stream.read(self.HEADER_SIZE))
+ data = self.stream.read(length)
+ self.stream.read(2)
+ return type, seq, data
+ finally:
+ self.recv_lock.release()
+
+
diff --git a/Rpyc/Connection.py b/Rpyc/Connection.py
new file mode 100644
index 0000000..c2ce4c4
--- /dev/null
+++ b/Rpyc/Connection.py
@@ -0,0 +1,212 @@
+import sys
+from Boxing import Box, dump_exception, load_exception
+from ModuleNetProxy import RootImporter
+from Lib import raise_exception, AttrFrontend
+
+
+FRAME_REQUEST = 1
+FRAME_RESULT = 2
+FRAME_EXCEPTION = 3
+
+class Connection(object):
+ """
+ the rpyc connection layer (protocol and APIs). generally speaking, the only
+ things you'll need to access directly from this object are:
+ * modules - represents the remote python interprerer's modules namespace
+ * execute - executes the given code on the other side of the connection
+ * namespace - the namespace in which the code you `execute` resides
+
+ the rest of the attributes should be of no intresent to you, except maybe
+ for `remote_conn`, which represents the other side of the connection. it is
+ unlikely, however, you'll need to use it (it is used interally).
+
+ when you are done using a connection, and wish to release the resources it
+ holds, you should call close(). you don't have to, but if you don't, the gc
+ can't release the memory because of cyclic references.
+ """
+ __slots__ = ["_closed", "_local_namespace", "channel", "box", "async_replies",
+ "sync_replies", "module_cache", "remote_conn", "modules", "namespace"]
+
+ def __init__(self, channel):
+ self._closed = False
+ self._local_namespace = {}
+ self.channel = channel
+ self.box = Box(self)
+ self.async_replies = {}
+ self.sync_replies = {}
+ self.module_cache = {}
+ self.remote_conn = self.sync_request("handle_getconn")
+ # user APIs:
+ self.modules = RootImporter(self)
+ self.namespace = AttrFrontend(self.remote_conn._local_namespace)
+ self.execute("")
+
+ def __repr__(self):
+ if self._closed:
+ return "<%s.%s(closed)>" % (self.__class__.__module__, self.__class__.__name__)
+ else:
+ return "<%s.%s(%r)>" % (self.__class__.__module__, self.__class__.__name__, self.channel)
+
+ #
+ # file api layer
+ #
+ def close(self):
+ """closes down the connection and releases all cyclic dependecies"""
+ if not self._closed:
+ self.box.close()
+ self.channel.close()
+ self._closed = True
+ self._local_namespace = None
+ self.channel = None
+ self.box = None
+ self.async_replies = None
+ self.sync_replies = None
+ self.module_cache = None
+ self.modules = None
+ self.remote_conn = None
+ self.namespace = None
+
+ def fileno(self):
+ """connections are select()able"""
+ return self.channel.fileno()
+
+ #
+ # protocol
+ #
+ def send(self, type, seq, obj):
+ if self._closed:
+ raise EOFError("the connection is closed")
+ return self.channel.send(type, seq, self.box.pack(obj))
+
+ def send_request(self, handlername, *args):
+ return self.send(FRAME_REQUEST, None, (handlername, args))
+
+ def send_exception(self, seq, exc_info):
+ self.send(FRAME_EXCEPTION, seq, dump_exception(*exc_info))
+
+ def send_result(self, seq, obj):
+ self.send(FRAME_RESULT, seq, obj)
+
+ #
+ # dispatching
+ #
+ def dispatch_result(self, seq, obj):
+ if seq in self.async_replies:
+ self.async_replies.pop(seq)(obj, False)
+ else:
+ self.sync_replies[seq] = obj
+
+ def dispatch_exception(self, seq, obj):
+ excobj = load_exception(obj)
+ if seq in self.async_replies:
+ self.async_replies.pop(seq)(excobj, True)
+ else:
+ raise_exception(*excobj)
+
+ def dispatch_request(self, seq, handlername, args):
+ try:
+ res = getattr(self, handlername)(*args)
+ except SystemExit:
+ raise
+ except:
+ self.send_exception(seq, sys.exc_info())
+ else:
+ self.send_result(seq, res)
+
+ def poll(self):
+ """if available, serves a single request, otherwise returns (non-blocking serve)"""
+ if self.channel.is_available():
+ self.serve()
+ return True
+ else:
+ return False
+
+ def serve(self):
+ """serves a single request (may block)"""
+ type, seq, data = self.channel.recv()
+ if type == FRAME_RESULT:
+ self.dispatch_result(seq, self.box.unpack(data))
+ elif type == FRAME_REQUEST:
+ self.dispatch_request(seq, *self.box.unpack(data))
+ elif type == FRAME_EXCEPTION:
+ self.dispatch_exception(seq, self.box.unpack(data))
+ else:
+ raise ValueError("invalid frame type (%d)" % (type,))
+
+ #
+ # requests
+ #
+ def sync_request(self, handlername, *args):
+ """performs a synchronous (blocking) request"""
+ seq = self.send_request(handlername, *args)
+ while seq not in self.sync_replies:
+ self.serve()
+ return self.sync_replies.pop(seq)
+
+ def async_request(self, callback, handlername, *args):
+ """performs an asynchronous (non-blocking) request"""
+ seq = self.send_request(handlername, *args)
+ self.async_replies[seq] = callback
+
+ #
+ # root requests (not through NetProxies)
+ #
+ def rimport(self, modulename):
+ """imports a module by name (as a string)"""
+ if modulename not in self.module_cache:
+ module = self.sync_request("handle_import", modulename)
+ self.module_cache[modulename] = module
+ return self.module_cache[modulename]
+
+ def execute(self, expr, mode = "exec"):
+ """executes the given code at the remote side of the connection"""
+ return self.sync_request("handle_execute", expr, mode)
+
+ #
+ # handlers
+ #
+ def handle_decref(self, oid):
+ self.box.decref(oid)
+
+ def handle_delattr(self, oid, name):
+ delattr(self.box[oid], name)
+
+ def handle_getattr(self, oid, name):
+ return getattr(self.box[oid], name)
+
+ def handle_setattr(self, oid, name, value):
+ setattr(self.box[oid], name, value)
+
+ def handle_delitem(self, oid, index):
+ del self.box[oid][index]
+
+ def handle_getitem(self, oid, index):
+ return self.box[oid][index]
+
+ def handle_setitem(self, oid, index, value):
+ self.box[oid][index] = value
+
+ def handle_call(self, oid, args, kwargs):
+ return self.box[oid](*args, **kwargs)
+
+ def handle_repr(self, oid):
+ return repr(self.box[oid])
+
+ def handle_str(self, oid):
+ return str(self.box[oid])
+
+ def handle_bool(self, oid):
+ return bool(self.box[oid])
+
+ def handle_import(self, modulename):
+ return __import__(modulename, None, None, modulename.split(".")[-1])
+
+ def handle_getconn(self):
+ return self
+
+ def handle_execute(self, expr, mode):
+ codeobj = compile(expr, "<from %s>" % (self,), mode)
+ return eval(codeobj, self._local_namespace)
+
+
+
diff --git a/Rpyc/Demo/__init__.py b/Rpyc/Demo/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Rpyc/Demo/__init__.py
diff --git a/Rpyc/Demo/demo-1.py b/Rpyc/Demo/demo-1.py
new file mode 100644
index 0000000..6a9a3a8
--- /dev/null
+++ b/Rpyc/Demo/demo-1.py
@@ -0,0 +1,156 @@
+#
+# welcome to RPyC. this demo serves as an introduction. i believe in learning through
+# showcases, and that's why this package comes with a demo subpackage, instead of
+# documentation
+#
+# so, the first thing we're gonna do is import the SocketConnection. this is a factory
+# function that returns us a new Connection object over a socket stream. we dont need
+# to get into details here.
+#
+from Rpyc import *
+
+#
+# next, we'll get all the helpful utilities. the utilities include wrappers for builtin
+# functions, like dir(), so they'd work as expected with netproxies.
+#
+from Rpyc.Utils import *
+
+#
+# by now you should have an rpyc server running. if you dont, go to the Servers directory
+# and choose your favorite version of a socket server. for unixes i'd recommend the
+# forking server; for windows -- the threaded server.
+#
+# so let's connect to the server
+#
+c = SocketConnection("localhost")
+
+#
+# now it's time to explain a little about how rpyc works. it's quite simple really. the
+# magic comes from a concept called NetProxy. a NetProxy object delivers all of the
+# operations performed on it to the remote object. so if you get a list from your host,
+# what you're are really getting is a NetProxy to that list. it looks and works just
+# like a real list -- but everytime you do something on it, it actually performs a
+# request on the list object stored on the host. this is called boxing. this means
+# you can change the object you get locally, and the remote object changes, etc.
+#
+# however, for efficiency and other reason, not all objects you get are NetProxies.
+# all immutable and pickle-able objects pass by value (through pickle). these types
+# of objects include ints, longs, strings, and some other types. all other types are
+# passed by boxing.
+#
+# this boxing mechanism works on everything -- objects, functions, classes, and modules,
+# which is why rpyc is considered transparent. your code looks just as if it was meant
+# to run locally.
+#
+
+#
+# let's start with something simple -- getting a remote module. accessing the remote
+# namespace always starts with the `modules` attribute, then the module (or package)
+# name, and then the attribute you want to get.
+#
+
+print c.modules.sys
+print c.modules.sys.path
+c.modules.sys.path.append("lucy")
+print c.modules.sys.path[-1]
+
+#
+# these remote objects are first class objects, like all others in python. this means
+# you can store them in variables, pass them as parameters, etc.
+#
+rsys = c.modules.sys
+rpath = rsys.path
+rpath.pop(-1)
+
+#
+# and as you might expect, netproxies also look like the real objects
+#
+print dir(rpath)
+
+#
+# but there are a couple of issues with netproxies. the type(), isinstance(), and
+# issubclass() classes dont work on them... as they query the underlying object, not
+# the remote one. so:
+#
+print type(rsys.maxint) # <int> -- because it's a simple type which is passed by value)
+print type(rsys.path) # <SyncNetProxy> -- because, after all, it's a netproxy, not a list
+
+#
+# now for a demo of packages
+# (which looks very much like 'from xml.dom.minidom import parseString')
+#
+parseString = c.modules.xml.dom.minidom.parseString
+x = parseString("<a>lala</a>")
+print x
+x.toxml()
+print x.firstChild.nodeName
+
+#
+# however, there's a catch when working with packages like that. the way it works is
+# trying to find an attribute with that name, and if not found, trying to import a sub-
+# module.
+#
+# now in english:
+# c.module.xml is the xml module of the server. when you do c.module.xml.dom, rpyc looks
+# for an attribute named 'dom' inside the xml module. since there's no such attribute,
+# it tries to import a subpackage called xml.dom, which succeeds. then it does the same
+# for xml.dom.minidom, and xml.dom.minidom.parseString.
+#
+# but there are times when that's ambiguous. this mean that the module has both a sub-
+# module called 'X', and an attribute called 'X'. according to rpyc's algorithm, the
+# attribute 'X' is returned, not the sub-module.
+#
+# but if you need to be explicit, you can, and it works like this:
+#
+
+c.modules["xml.dom.minidom"].parseString("<a></a>")
+
+#
+# this will make sure the module 'xml.dom.minidom' is returned, and not an attribute.
+# in general, it's better to use this form, unless you know there are no such conflicts.
+# remeber that "Explicit is better than implicit", although it requires four more key
+# strokes. perhaps in a later version it will raise an exception if there's a conflict.
+#
+
+#
+# and now for a little demo of working with files (a common task)
+#
+f = c.modules.__builtin__.open("lala.txt", "w")
+f.write("lucy")
+f.close()
+c.modules.os.remove("lala.txt")
+
+#
+# now to a bitter part of life: exceptions. as you could expect, they work just like
+# regular exceptions
+#
+try:
+ a = c.modules.sys.nonexistent_attribute
+except AttributeError:
+ pass
+else:
+ assert False
+
+try:
+ a = c.modules.__builtin__.open("**\\//##~~..::!@#$%^&*()_+\n <,>?")
+except IOError:
+ pass
+else:
+ assert False
+
+print "goodbye"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Rpyc/Demo/demo-2.py b/Rpyc/Demo/demo-2.py
new file mode 100644
index 0000000..49d94ca
--- /dev/null
+++ b/Rpyc/Demo/demo-2.py
@@ -0,0 +1,81 @@
+#
+# okay, this demo is more advanced. here we'll learn about:
+# * redirecting standard files
+# * synchronous callbacks
+# * the ulitities module
+#
+import sys
+import os
+from Rpyc import *
+from Rpyc.Utils import remote_interpreter
+
+c = SocketConnection("localhost")
+
+#
+# redirect our stdout to the server
+#
+sys.stdout = c.modules.sys.stdout
+print "this time we focus on `the seatle music`"
+
+#
+# and the other way round
+#
+sys.stdout = sys.__stdout__
+c.modules.sys.stdout = sys.stdout
+c.modules.sys.stdout.write("alice in chains\n")
+
+#
+# but you dont believe me, so
+#
+c.modules["Rpyc.Demo.testmodule"].printer("tool")
+
+#
+# and restore that
+#
+c.modules.sys.stdout = c.modules.sys.__stdout__
+
+#
+# now let's play with callbacks
+#
+def f(text):
+ print text
+
+c.modules["Rpyc.Demo.testmodule"].caller(f, "nirvana")
+
+#
+# and if you insist
+#
+def g(func, text):
+ c.modules["Rpyc.Demo.testmodule"].caller(func, text)
+
+c.modules["Rpyc.Demo.testmodule"].caller(g, f, "soundgarden")
+
+#
+# now for the utilities module. it gives us the following cool functions:
+# * dir, getattr, hasattr, help, reload -- overriding builtins
+# * upload, download -- transfering files/directories to/from the client/server (all the permutations)
+# * remote_shell, remote_interpreter -- running remote processess and debugging
+#
+print hasattr(sys, "path")
+print hasattr(c.modules.sys, "path")
+
+print getattr(sys, "maxint")
+print getattr(c.modules.sys, "maxint")
+
+print reload(sys)
+print reload(c.modules.sys)
+
+f=open("lala.txt", "w")
+f.write("king crimson")
+f.close()
+upload(c, "lala.txt", "../lala.txt")
+os.remove("lala.txt")
+c.modules.os.remove("../lala.txt")
+
+remote_interpreter(c)
+
+
+print "goodbye"
+
+
+
diff --git a/Rpyc/Demo/demo-3.py b/Rpyc/Demo/demo-3.py
new file mode 100644
index 0000000..bd0689e
--- /dev/null
+++ b/Rpyc/Demo/demo-3.py
@@ -0,0 +1,129 @@
+#
+# asynchronous proxies as super-events
+#
+from Rpyc import *
+
+c = SocketConnection("localhost")
+
+#
+# this is the remote int type
+#
+rint = c.modules.__builtin__.int
+
+#
+# and we'll wrap it in an asynchronous wrapper
+#
+rint = Async(rint)
+
+#
+# now it still looks like a normal proxy... but operations on it return something called
+# an AsyncResult -- it's an object that represents the would-be result of the operation.
+# it has a .is_ready property, which indicates whether or not the result is ready, and
+# a .result property, which holds the result of the operations. when you access the .result
+# property, it will block until the result is returned
+#
+a = rint("123")
+b = rint("metallica")
+print a
+print b.is_ready
+print a.result
+print a
+
+#
+# and when an exception occurs, it looks like that
+#
+try:
+ print b.result
+except ValueError:
+ pass
+
+#
+# only when you access the result you get the exception, which may look weird, but hey,
+# it's an asynchronous world out there.
+#
+
+#
+# there's another methodology for async proxies -- on_ready callbacks. instead of
+# getting the async result, you can register a callback to collect it, when it arrives.
+#
+def f(res):
+ print "the result is",
+ try:
+ print res.result
+ except:
+ print "an exception"
+
+rint = Async(c.modules.__builtin__.int)
+
+ar = rint("123")
+ar.on_ready = f
+
+# this will cause an exception
+ar = rint("a perfect circle")
+ar.on_ready = f
+
+# or when you dont need to keep the async result
+rint("456").on_ready = f
+
+# and it's not limited to calling it. anything you do to the async proxy is asynchronous.
+# for example, you can also get attributes asynchronously:
+ar = rint.__str__
+
+#
+# now we'll do some other request, which will cause the results to arrive, and the callback
+# to be called.
+#
+print c.modules.sys
+
+############################################################################################
+#
+# this is where we get hardcore: threads and event callbacks
+#
+xxx = 0
+def blah():
+ global xxx
+ xxx += 1
+
+#
+# we'll start a thread on the server which on threadfunc (which is defined in the testmodule).
+# this function will call the callback we give it every second, but will ignore the result.
+# this practically means it's like an event -- trigger and forget. on the client side, the
+# callback will increment `xxx` every time it's called
+#
+c.modules.thread.start_new_thread(c.modules["Rpyc.Demo.testmodule"].threadfunc, (blah,))
+
+#
+# we'll wait a little
+#
+import time
+time.sleep(5)
+
+#
+# and do some operation, which, along with it, will pull all incoming requests
+#
+print c.modules.sys
+print xxx
+
+#
+# and we can start a thread of our own to pull the requests in the background
+#
+#import thread
+#worker_running = True
+#
+#def worker(conn):
+# while worker_running:
+# conn.serve()
+#
+#thread.start_new_thread(worker, (c,))
+#
+#time.sleep(5)
+#worker_running = False
+#
+#print xxx
+#print "goodbye"
+
+#
+# L33TN3SS
+#
+
+
diff --git a/Rpyc/Demo/demo-4.py b/Rpyc/Demo/demo-4.py
new file mode 100644
index 0000000..5467e5c
--- /dev/null
+++ b/Rpyc/Demo/demo-4.py
@@ -0,0 +1,41 @@
+import time
+from Rpyc import SocketConnection, Async
+
+c = SocketConnection("localhost")
+c2 = SocketConnection("localhost")
+
+huge_xml = "<blah a='5' b='6'> " * 50000 + " </blah> " * 50000
+parseString = Async(c.modules.xml.dom.minidom.parseString)
+res = parseString(huge_xml)
+
+print "while we're waiting for the server to complete, we do other stuff"
+t = time.time()
+while not res.is_ready:
+ time.sleep(0.5)
+ # we dont want to use `c`, because it would block us (as the server is blocking)
+ # but `c2` runs on another thread/process, so it wouldn't block
+ print c2.modules.os.getpid()
+
+t = time.time() - t
+print "it took %d seconds" % (t,)
+
+print res.result
+
+
+#
+# note: to improve performance, delete the result when you no longer need it.
+# this should be done because the server might (as in this case) hold enormous
+# amounts of memory, which will slow it down
+#
+# if you do this:
+# res = parseString(huge_xml)
+# res = parseString(huge_xml)
+# res will be deleted only after the second operation finishes, because only when
+# the second result is assigned, the first is released -- server still holds
+# around 160MB of the old xml tree for nothing. so it's a good idea to `del res`
+# when you dont need it.
+#
+# also, there's a memory leak on the server, which i'm working on solving.
+#
+
+
diff --git a/Rpyc/Demo/demo-5.py b/Rpyc/Demo/demo-5.py
new file mode 100644
index 0000000..7a21688
--- /dev/null
+++ b/Rpyc/Demo/demo-5.py
@@ -0,0 +1,66 @@
+#
+# this demo will show you working with asynch proxies and callback
+# verison 2.3 removes the AsyncCallback factory, and instead provides a mechanism
+# where async results can provide a callback. it simplifies the design, so i
+# went for it.
+#
+import time
+from Rpyc import SocketConnection, Async
+
+c1 = SocketConnection("localhost")
+
+# f1 is an async proxy to the server's sleep function
+f1 = Async(c1.modules.time.sleep)
+
+# this would block the server for 9 seconds
+r1 = f1(9)
+# and this would block it for 11
+r2 = f1(11)
+
+# of course the client isnt affected (that's the whole point of Async)
+# but since the same server can't block simultaneously, the second request is
+# queued. this is a good example of queuing.
+
+# now we'll wait for both results to finish. this should print around 20 lines
+# (more or less, depending on the phase)
+while not r1.is_ready or not r2.is_ready:
+ print "!"
+ time.sleep(1)
+
+print "---"
+
+# now we'll dig in the h4xx0r shit -- running things simultaneously
+# for this, we'll need another connection, and another proxy:
+c2 = SocketConnection("localhost")
+f2 = Async(c2.modules.time.sleep)
+
+# now we'll do the same as the above, but this time, it will happen simulatenously
+# becuase f1 and f2 work on different connections
+r1 = f1(9)
+r2 = f2(11)
+
+# so this time, it will print around 11 lines
+while not r1.is_ready or not r2.is_ready:
+ print "!"
+ time.sleep(1)
+
+print "---"
+
+# very haxxor indeed. now, we'll see how to use the on_ready callback
+r1 = f1(9)
+r2 = f2(11)
+
+def blah(res):
+ print "look mama, no hands! res = %r" % (res.result,)
+
+# set the on_ready callback -- when r1 is becomes ready, the callback will
+# be called automagically
+r1.on_ready = blah
+
+# this should print 9 "!", then "look mama", then two more "!"
+while not r1.is_ready or not r2.is_ready:
+ print "!"
+ time.sleep(1)
+
+
+
diff --git a/Rpyc/Demo/demo-6.py b/Rpyc/Demo/demo-6.py
new file mode 100644
index 0000000..1c34039
--- /dev/null
+++ b/Rpyc/Demo/demo-6.py
@@ -0,0 +1,130 @@
+# as you can see - the import line now requires even less typing!
+from Rpyc import *
+c = SocketConnection("localhost")
+
+#------------------------------------------------------------------------------
+# this demo shows the new `execute` and `namespace` features of rpyc
+#------------------------------------------------------------------------------
+
+
+# the code below will run AT THE OTHER SIDE OF THE CONNECTION... so you'll see
+# 'hello world' on the server's console
+c.execute("print 'hello world'")
+
+import sys
+c.modules.sys.stdout = sys.stdout
+
+# and this time, on our console
+c.execute("print 'brave new world'")
+
+# restore that
+c.modules.sys.stdout = c.modules.sys.__stdout__
+
+# anyway, the `execute` method runs the given code at the other side of the connection
+# and works in the `namespace` dict. what?
+c.execute("x = [1,2,3]")
+print c.namespace.x
+
+# now it makes sense, doesn't it? the 'namespace' attribute is something i called
+# AttrFrontend -- it wraps a dict with the attribute protocol, so you can access
+# it with the dot notation, instead of the braces notation (more intuitive).
+# this namespace works both ways -- executing code affects the namespace, while
+# altering the namespace directly also affects it:
+c.namespace.x.append(4)
+c.execute("x.append(5)")
+print c.namespace.x
+
+# but you should not assign complex objects (not int/float/str, etc) to this namespace
+# directy, or NetProxies will be created. there's nothing wrong with that, but keep
+# in mind it might cause blocking (and even deadlocks), as i'll explain later.
+
+# another cool thing i want to show is the second, optional parameter to execute: mode.
+# the mode controls how the code is compiled. the default mode is "exec", which means
+# it executes the code as a module. the other option is "eval" which returns a value.
+# so if you want to _do_ something, like printing of assigning a variable, you do it
+# with "exec", and if you want to evaluate something, you do it with "eval"
+# for example:
+
+# this will print None
+print c.execute("1+2")
+
+# while this will print 3
+print c.execute("1+2", "eval")
+
+# but there's a time in a man's life when he asks himself, why the heck? you can, as i
+# showed in other places, just do this:
+# c.modules.__builtin__.eval("1+2")
+# so what's the point?
+#
+# well, i've been waiting for this question. the rationale behind this seemingly useless
+# feature is for times you NEED to have the code executing remotely, but writing a
+# dedicated module for it is overdoing it:
+# * more files to update ==> more chance that you'll forget to update
+# * distributing the module to all of the machines
+# * making a mess on the file system
+# * it's really not a module... it's just some code that logically belongs to one single
+# module, but technical difficulties prevent it
+#
+# and to show you what i mean -- i want to start a thread on the server, like it did in
+# several places over the demos. this thread will send me an event every second. what i
+# used to do was, creating another module, like testmodule.py to define the thread
+# function, so it will exist on the server, and i could call it.
+# if i defined thread_func at the client side, then the thread will block when trying
+# to execute the code, because the client holds it. so this new mechanism lets you
+# distribute code in a volatile fashion:
+# * when the connection is closed, everything you defined is gone
+# * no file-system mess
+# * no need to distribute files across the network
+# * only one place to maintain
+
+c.execute("""
+my_thread_active = True
+
+def my_thread_func(callback):
+ import time
+ from Rpyc import Async
+
+ callback = Async(callback)
+ while my_thread_active:
+ callback(time.time())
+ time.sleep(1)
+ print "the thread says goodbye"
+""")
+
+def callback(timestamp):
+ print "the timestamp is", timestamp
+
+c.modules.thread.start_new_thread(c.namespace.my_thread_func, (callback,))
+c.modules.time.sleep(5)
+c.namespace.my_thread_active = False
+c.close()
+
+# it's not only for threads of course. there are many times when you NEED the code/objects
+# on the remote side. for example:
+# * situations that would block (like having the thread func on the client)
+# * code that check the type of the object (type or isinstance), and a NetProxy would make
+# it cry. DONT CHECK THE TYPE OF OBJECTS, PEOPLE, JUST USE THEM! that's why they invented
+# duck-typing. argh.
+# * other places i didnt think of as of yet. i want to sleep. leave me alone ;) zzzZZZ
+#
+# so enjoy!
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Rpyc/Demo/pipe-child.py b/Rpyc/Demo/pipe-child.py
new file mode 100644
index 0000000..517d0ef
--- /dev/null
+++ b/Rpyc/Demo/pipe-child.py
@@ -0,0 +1,8 @@
+import sys
+from Rpyc import PipeConnection
+
+c = PipeConnection(sys.stdin, sys.stdout)
+c.modules.sys.path.append("i love lucy")
+
+
+# child dies \ No newline at end of file
diff --git a/Rpyc/Demo/pipe-parent.py b/Rpyc/Demo/pipe-parent.py
new file mode 100644
index 0000000..bd8cc89
--- /dev/null
+++ b/Rpyc/Demo/pipe-parent.py
@@ -0,0 +1,17 @@
+# a demo for parent/child over pipes
+
+import sys
+from popen2 import popen3
+from Rpyc import PipeConnection
+
+cout, cin, cerr = popen3("python pipe-child.py")
+conn = PipeConnection(cout, cin)
+
+try:
+ while True:
+ conn.serve()
+except EOFError:
+ print "goodbye child"
+
+print sys.path[-1]
+
diff --git a/Rpyc/Demo/testmodule.py b/Rpyc/Demo/testmodule.py
new file mode 100644
index 0000000..d4bcb31
--- /dev/null
+++ b/Rpyc/Demo/testmodule.py
@@ -0,0 +1,19 @@
+import time
+from Rpyc import Async
+
+def threadfunc(callback):
+ """this function will call the callback every second"""
+ callback = Async(callback)
+ try:
+ while True:
+ print "!"
+ callback()
+ time.sleep(1)
+ except:
+ print "thread exiting"
+
+def printer(text):
+ print text
+
+def caller(func, *args):
+ func(*args)
diff --git a/Rpyc/Demo/testsuite.bat b/Rpyc/Demo/testsuite.bat
new file mode 100644
index 0000000..fa46892
--- /dev/null
+++ b/Rpyc/Demo/testsuite.bat
@@ -0,0 +1,6 @@
+python demo-1.py
+python demo-2.py
+python demo-3.py
+python demo-4.py
+python demo-5.py
+python demo-6.py
diff --git a/Rpyc/Lib.py b/Rpyc/Lib.py
new file mode 100644
index 0000000..ade15ee
--- /dev/null
+++ b/Rpyc/Lib.py
@@ -0,0 +1,71 @@
+"""
+shared types, functions and constants.
+important - don't reload() this module, or things are likely to break
+"""
+from sys import excepthook, stderr
+
+#
+# the original version of the __builtins__, in case you do
+# __builtin__.x = rpyc_version_of_x
+#
+orig_isinstance = isinstance
+orig_getattr = getattr
+orig_hasttr = hasattr
+orig_issubclass = issubclass
+orig_help = help
+orig_reload = reload
+orig_dir = dir
+orig_excepthook = excepthook
+orig_type = type
+
+
+def raise_exception(typ, val, tbtext):
+ """a helper for raising remote exceptions"""
+ if orig_type(typ) == str:
+ raise typ
+ else:
+ val._remote_traceback = tbtext
+ raise val
+
+class ImmDict(object):
+ """immutable dict (passed by value)"""
+ __slots__ = ["dict"]
+ def __init__(self, dict):
+ self.dict = dict
+ def items(self):
+ return self.dict.items()
+
+def _get_dict(obj):
+ return object.__getattribute__(obj, "____dict__")
+
+class AttrFrontend(object):
+ """a wrapper that implements the attribute protocol for a dict backend"""
+ __slots__ = ["____dict__"]
+
+ def __init__(self, dict):
+ object.__setattr__(self, "____dict__", dict)
+
+ def __delitem__(self, name):
+ del _get_dict(self)[name]
+ def __getitem__(self, name):
+ return _get_dict(self)[name]
+ def __setitem__(self, name, value):
+ _get_dict(self)[name] = value
+
+ __delattr__ = __delitem__
+ __getattr__ = __getitem__
+ __setattr__ = __setitem__
+
+ def __repr__(self):
+ return "<AttrFrontend(%s)>" % (", ".join(_get_dict(self).keys()),)
+
+
+def rpyc_excepthook(exctype, value, traceback):
+ if hasattr(value, "_remote_traceback"):
+ print >> stderr, "======= Remote traceback ======="
+ print >> stderr, value._remote_traceback
+ print >> stderr, "======= Local exception ======="
+ orig_excepthook(exctype, value, traceback)
+ else:
+ orig_excepthook(exctype, value, traceback)
+
diff --git a/Rpyc/ModuleNetProxy.py b/Rpyc/ModuleNetProxy.py
new file mode 100644
index 0000000..6c20582
--- /dev/null
+++ b/Rpyc/ModuleNetProxy.py
@@ -0,0 +1,55 @@
+from NetProxy import NetProxyWrapper, _get_conn, _get_oid
+
+
+class ModuleNetProxy(NetProxyWrapper):
+ """a netproxy specialzied for exposing remote modules (first tries to getattr,
+ if it fails tries to import)"""
+ __slots__ = ["____base__", "____cache__"]
+
+ def __init__(self, proxy, base):
+ NetProxyWrapper.__init__(self, proxy)
+ object.__setattr__(self, "____base__", base)
+ object.__setattr__(self, "____cache__", {})
+
+ def __request__(self, handler, *args):
+ return _get_conn(self).sync_request(handler, _get_oid(self), *args)
+
+ def __getattr__(self, name):
+ cache = object.__getattribute__(self, "____cache__")
+ try:
+ return cache[name]
+ except KeyError:
+ pass
+
+ try:
+ return self.__request__("handle_getattr", name)
+ except AttributeError:
+ pass
+
+ try:
+ fullname = object.__getattribute__(self, "____base__") + "." + name
+ module = ModuleNetProxy(_get_conn(self).rimport(fullname), fullname)
+ cache[name] = module
+ return module
+ except ImportError:
+ raise AttributeError("'module' object has not attribute or submodule %r" % (name,))
+
+
+class RootImporter(object):
+ """the root of the interpreter's import hierarchy"""
+ __slots__ = ["____conn__"]
+
+ def __init__(self, conn):
+ object.__setattr__(self, "____conn__", conn)
+
+ def __getitem__(self, name):
+ return _get_conn(self).rimport(name)
+
+ def __getattr__(self, name):
+ return ModuleNetProxy(self[name], name)
+
+ def __setattr__(self, name, value):
+ raise TypeError("read only type")
+
+
+
diff --git a/Rpyc/NetProxy.py b/Rpyc/NetProxy.py
new file mode 100644
index 0000000..4469fdc
--- /dev/null
+++ b/Rpyc/NetProxy.py
@@ -0,0 +1,117 @@
+from Lib import ImmDict
+
+
+class FullyDynamicMetaclass(type):
+ """
+ a meta class that enables special methods to be accessed like regular names
+ (via __getattr__), like it used to be in old-style classes.
+ """
+
+ def __new__(cls, name, bases, namespace):
+ special_methods = [
+ "__hash__", "__len__", "__iter__", "next", "__reversed__",
+ "__add__", "__iadd__", "__radd__", "__sub__", "__isub__", "__rsub__",
+ "__mul__", "__imul__", "__rmul__", "__div__", "__idiv__", "__rdiv__",
+ "__truediv__", "__itruediv__", "__rtruediv__", "__floordiv__",
+ "__ifloordiv__", "__rfloorfiv__", "__pow__", "__ipow__", "__rpow__",
+ "__lshift__", "__ilshift__", "__rlshift__", "__rshift__", "__irshift__",
+ "__rrshift__", "__and__", "__iand__", "__rand__", "__or__", "__ior__",
+ "__ror__", "__xor__", "__ixor__", "__rxor__", "__mod__", "__imod__",
+ "__rmod__", "__divmod__", "__idivmod__", "__rdivmod__", "__pos__",
+ "__neg__", "__int__", "__float__", "__long__", "__oct__", "__hex__", "__coerce__",
+ "__eq__", "__ne__", "__le__", "__ge__", "__lt__", "__gt__", "__cmp__",
+ ]
+
+ def make_method(name):
+ def method(self, *a, **k):
+ return self.__getattr__(name)(*a, **k)
+ return method
+
+ special_attributes = ["__doc__", "__module__", "__file__", "__name__"]
+
+ def make_property(name):
+ def getter(self):
+ return self.__getattr__(name)
+ def setter(self, value):
+ self.__setattr__(name, value)
+ def deller(self):
+ self.__delattr__(name)
+ return property(getter, setter, deller)
+
+ for sm in special_methods:
+ namespace[sm] = make_method(sm)
+ for sa in special_attributes:
+ namespace[sa] = make_property(sa)
+ return type.__new__(cls, name, bases, namespace)
+
+def _get_conn(proxy):
+ return object.__getattribute__(proxy, "____conn__")
+def _get_oid(proxy):
+ return object.__getattribute__(proxy, "____oid__")
+
+class NetProxy(object):
+ """NetProxy - convey local operations to the remote object. this is an abstract class"""
+ __metaclass__ = FullyDynamicMetaclass
+ __slots__ = ["____conn__", "____oid__", "__weakref__"]
+
+ def __init__(self, conn, oid):
+ object.__setattr__(self, "____conn__", conn)
+ object.__setattr__(self, "____oid__", oid)
+
+ def __request__(self, handler, *args):
+ raise NotImplementedError()
+
+ def __call__(self, *args, **kwargs):
+ return self.__request__("handle_call", args, ImmDict(kwargs))
+
+ def __delattr__(self, *args):
+ return self.__request__("handle_delattr", *args)
+ def __getattr__(self, *args):
+ return self.__request__("handle_getattr", *args)
+ def __setattr__(self, *args):
+ return self.__request__("handle_setattr", *args)
+
+ def __delitem__(self, *args):
+ return self.__request__("handle_delitem", *args)
+ def __getitem__(self, *args):
+ return self.__request__("handle_getitem", *args)
+ def __setitem__(self, *args):
+ return self.__request__("handle_setitem", *args)
+
+ def __repr__(self, *args):
+ return self.__request__("handle_repr", *args)
+ def __str__(self, *args):
+ return self.__request__("handle_str", *args)
+ def __nonzero__(self, *args):
+ return self.__request__("handle_bool", *args)
+
+class NetProxyWrapper(NetProxy):
+ """a netproxy that wraps an inner netproxy"""
+ __slots__ = ["____original__"]
+
+ def __init__(self, proxy):
+ NetProxy.__init__(self, _get_conn(proxy), _get_oid(proxy))
+ object.__setattr__(self, "____original__", proxy)
+
+def _dummy_callback(*args, **kw):
+ pass
+
+class SyncNetProxy(NetProxy):
+ """the default, synchronous netproxy"""
+ __slots__ = []
+
+ def __del__(self):
+ # decref'ing is done asynchronously, because we dont need to wait for the remote
+ # object to die. moreover, we dont care if it fails, because that would mean the
+ # connection is broken, so the remote object is already dead
+ try:
+ _get_conn(self).async_request(_dummy_callback, "handle_decref", _get_oid(self))
+ except:
+ pass
+
+ def __request__(self, handler, *args):
+ return _get_conn(self).sync_request(handler, _get_oid(self), *args)
+
+
+
+
diff --git a/Rpyc/Servers/Users.py b/Rpyc/Servers/Users.py
new file mode 100644
index 0000000..1060e0c
--- /dev/null
+++ b/Rpyc/Servers/Users.py
@@ -0,0 +1,9 @@
+#
+# chmod this file securely and be sure to remove the default users
+#
+users = {
+ "frodo" : "1ring",
+ "yossarian" : "catch22",
+ "ayla" : "jondalar",
+}
+
diff --git a/Rpyc/Servers/__init__.py b/Rpyc/Servers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Rpyc/Servers/__init__.py
diff --git a/Rpyc/Servers/forking_server.py b/Rpyc/Servers/forking_server.py
new file mode 100644
index 0000000..1d55d01
--- /dev/null
+++ b/Rpyc/Servers/forking_server.py
@@ -0,0 +1,32 @@
+import sys
+import os
+from Rpyc.Utils.Serving import (
+ serve_socket,
+ create_listener_socket,
+ DEFAULT_PORT,
+ start_discovery_agent_thread)
+
+
+def serve_in_child(sock):
+ """forks a child to run the server in. the parent doesnt wait() for the child,
+ so if you do a `ps`, you'll see zombies. but who cares. i used to do a doublefork()
+ for that, but it's really meaningless. anyway, when the parent dies, the zombies
+ die as well."""
+ if os.fork() == 0:
+ try:
+ serve_socket(sock)
+ finally:
+ sys.exit()
+
+def main(port = DEFAULT_PORT):
+ # comment this out to disable broadcast queries
+ start_discovery_agent_thread(rpyc_port = port)
+
+ sock = create_listener_socket(port)
+ while True:
+ newsock, name = sock.accept()
+ serve_in_child(newsock)
+
+if __name__ == "__main__":
+ main()
+
diff --git a/Rpyc/Servers/selecting_server.py b/Rpyc/Servers/selecting_server.py
new file mode 100644
index 0000000..c39cf2b
--- /dev/null
+++ b/Rpyc/Servers/selecting_server.py
@@ -0,0 +1,35 @@
+import select
+import socket
+from Rpyc.Utils.Serving import (
+ log,
+ create_listener_socket,
+ DEFAULT_PORT,
+ SocketStream,
+ Channel,
+ Connection)
+
+
+def main(port = DEFAULT_PORT):
+ sock = create_listener_socket(port)
+ connections = []
+
+ while True:
+ rlist, wlist, xlist = select.select([sock] + connections, [], [])
+
+ if sock in rlist:
+ rlist.remove(sock)
+ newsock, name = sock.accept()
+ conn = Connection(Channel(SocketStream(newsock)))
+ conn.sockname = name
+ connections.append(conn)
+ log("welcome", conn.sockname)
+
+ for conn in rlist:
+ try:
+ conn.serve()
+ except (EOFError, socket.error):
+ connections.remove(conn)
+ log("goodbyte", conn.sockname)
+
+if __name__ == "__main__":
+ main()
diff --git a/Rpyc/Servers/simple_server.py b/Rpyc/Servers/simple_server.py
new file mode 100644
index 0000000..ba7b19b
--- /dev/null
+++ b/Rpyc/Servers/simple_server.py
@@ -0,0 +1,13 @@
+from Rpyc.Utils.Serving import serve_socket, create_listener_socket, DEFAULT_PORT
+
+
+def main(port = DEFAULT_PORT):
+ sock = create_listener_socket(port)
+ while True:
+ newsock, name = sock.accept()
+ serve_socket(newsock)
+
+if __name__ == "__main__":
+ main()
+
+ \ No newline at end of file
diff --git a/Rpyc/Servers/std_server.py b/Rpyc/Servers/std_server.py
new file mode 100644
index 0000000..0bf4475
--- /dev/null
+++ b/Rpyc/Servers/std_server.py
@@ -0,0 +1,31 @@
+#!/usr/bin/env python
+#
+# installation instructions
+# * add a service in /etc/services for rpyc: tcp port 18812
+# * add "rpyc .... /usr/lib/pythonXX/site-packages/Rpyc/Servers/std_server.py"
+# to /etc/inetd.conf (i dont remember syntax, rtfm)
+# * dont forget to chmod +x this file
+# * restart inetd with sighup
+#
+import sys
+import time
+from traceback import format_exception
+from Rpyc.Utils.Serving import log, serve_pipes
+
+
+def main(filename = "/tmp/rpyc-server.log"):
+ log.logfile = open(filename, "a")
+ log("-" * 80)
+ log("started serving at", time.asctime())
+ try:
+ try:
+ serve_pipes(sys.stdin, sys.stdout)
+ except:
+ log(*format_exception(*sys.exc_info()))
+ finally:
+ log("server exits at", time.asctime())
+
+if __name__ == "__main__":
+ main()
+
+ \ No newline at end of file
diff --git a/Rpyc/Servers/threaded_server.py b/Rpyc/Servers/threaded_server.py
new file mode 100644
index 0000000..0885aba
--- /dev/null
+++ b/Rpyc/Servers/threaded_server.py
@@ -0,0 +1,10 @@
+from Rpyc.Utils.Serving import DEFAULT_PORT, threaded_server, start_discovery_agent_thread
+
+
+def main(port = DEFAULT_PORT):
+ start_discovery_agent_thread(rpyc_port = port)
+ threaded_server(port = port)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/Rpyc/Servers/tls_server.py b/Rpyc/Servers/tls_server.py
new file mode 100644
index 0000000..7b6189c
--- /dev/null
+++ b/Rpyc/Servers/tls_server.py
@@ -0,0 +1,20 @@
+from Rpyc.Utils.Serving import DEFAULT_PORT, threaded_server, start_discovery_agent_thread
+from Users import users
+from tlslite.api import VerifierDB
+
+
+#
+# creates the verifier db
+#
+vdb = VerifierDB()
+for username, password in users.iteritems():
+ vdb[username] = vdb.makeVerifier(username, password, 2048)
+
+def main(port = DEFAULT_PORT):
+ start_discovery_agent_thread(rpyc_port = port)
+ threaded_server(port = port, secure = True, vdb = vdb)
+
+
+if __name__ == "__main__":
+ main()
+
diff --git a/Rpyc/Stream.py b/Rpyc/Stream.py
new file mode 100644
index 0000000..f7e42b9
--- /dev/null
+++ b/Rpyc/Stream.py
@@ -0,0 +1,113 @@
+import select
+import socket
+
+
+class Stream(object):
+ """
+ a stream is a file-like object that is used to expose a consistent and uniform
+ interface to the underlying 'physical' file-like object (like sockets and pipes),
+ which have many quirks (sockets may recv() less than `count`, pipes are simplex
+ and don't flush, etc.). a stream is always in blocking mode.
+ """
+ __slots__ = []
+ def close(self):
+ raise NotImplementedError()
+ def fileno(self):
+ raise NotImplementedError()
+ def is_available(self):
+ rlist, wlist, xlist = select.select([self], [], [], 0)
+ return bool(rlist)
+ def read(self, count):
+ raise NotImplementedError()
+ def write(self, data):
+ raise NotImplementedError()
+
+class SocketStream(Stream):
+ """
+ a stream that operates over a socket. the socket is expected to be in
+ blocking mode and reliable (i.e., TCP)
+ """
+ CONNECT_TIMEOUT = 5
+ __slots__ = ["sock"]
+ def __init__(self, sock):
+ self.sock = sock
+ def __repr__(self):
+ host, port = self.sock.getpeername()
+ return "<%s(%s:%d)>" % (self.__class__.__name__, host, port)
+ def fileno(self):
+ return self.sock.fileno()
+ def close(self):
+ self.sock.close()
+ def read(self, count):
+ data = []
+ while count > 0:
+ buf = self.sock.recv(count)
+ if not buf:
+ raise EOFError()
+ count -= len(buf)
+ data.append(buf)
+ return "".join(data)
+ def write(self, data):
+ while data:
+ count = self.sock.send(data)
+ data = data[count:]
+ @classmethod
+ def from_new_socket(cls, host, port):
+ sock = socket.socket()
+ sock.settimeout(cls.CONNECT_TIMEOUT)
+ sock.connect((host, port))
+ sock.settimeout(None)
+ return cls(sock)
+ @classmethod
+ def from_new_secure_socket(cls, host, port, username, password):
+ from tlslite.api import TLSConnection
+ stream = cls.from_new_socket(host, port)
+ stream.sock = TLSConnection(stream.sock)
+ stream.sock.handshakeClientSRP(username, password)
+ return stream
+ @classmethod
+ def from_secure_server_socket(cls, sock, vdb):
+ from tlslite.api import TLSConnection
+ sock = TLSConnection(sock)
+ sock.handshakeServer(verifierDB=vdb)
+ return cls(sock)
+
+class PipeStream(Stream):
+ """
+ a stream that operates over two simplex pipes. the pipes are expected
+ to be in blocking mode
+ """
+ __slots__ = ["incoming", "outgoing"]
+ def __init__(self, incoming, outgoing):
+ self.incoming = incoming
+ self.outgoing = outgoing
+ def fileno(self):
+ return self.incoming.fileno()
+ def close(self):
+ self.incoming.close()
+ self.outgoing.close()
+ def read(self, count):
+ data = []
+ while count > 0:
+ buf = self.incoming.read(count)
+ if not buf:
+ raise EOFError()
+ count -= len(buf)
+ data.append(buf)
+ return "".join(data)
+ def write(self, data):
+ self.outgoing.write(data)
+ self.outgoing.flush()
+
+ #
+ # win32 stub (can't select() on pipes) -- this stub causes problems with
+ # Async objects: doing obj.is_ready blocks. but it's better to have at
+ # least some functionality with pipes on win32 than none at all.
+ #
+ from sys import platform
+
+ if platform == "win32":
+ def is_available(self):
+ return True
+
+
diff --git a/Rpyc/Utils/Builtins.py b/Rpyc/Utils/Builtins.py
new file mode 100644
index 0000000..7826c98
--- /dev/null
+++ b/Rpyc/Utils/Builtins.py
@@ -0,0 +1,152 @@
+"""
+replacements for the builtin functions, so they operate correctly on NetProxies
+"""
+import sys
+import inspect
+from Rpyc.NetProxy import NetProxy, _get_conn
+from Rpyc.Lib import (
+ orig_isinstance,
+ orig_issubclass,
+ orig_dir,
+ orig_getattr,
+ orig_reload,
+ orig_help,
+ orig_type)
+
+
+__all__ = ["dir", "getattr", "hasattr", "reload", "help", "isinstance", "issubclass"]
+
+def dir(*obj):
+ """a version of dir() that supports NetProxies"""
+ if not obj:
+ return sorted(inspect.stack()[1][0].f_locals.keys())
+ if not len(obj) == 1:
+ raise TypeError("dir expected at most 1 arguments, got %d" % (len(obj),))
+ obj = obj[0]
+ if orig_isinstance(obj, NetProxy):
+ return _get_conn(obj).modules.__builtin__.dir(obj)
+ else:
+ return orig_dir(obj)
+
+def getattr(obj, name, *default):
+ """a version of getattr() that supports NetProxies"""
+ if len(default) > 1:
+ raise TypeError("getattr expected at most 3 arguments, got %d" % (2 + len(default),))
+ if orig_isinstance(obj, NetProxy):
+ try:
+ return obj.__getattr__(name)
+ except AttributeError:
+ if not default:
+ raise
+ return default[0]
+ else:
+ return orig_getattr(obj, name, *default)
+
+def hasattr(obj, name):
+ """a version of hasattr() that supports NetProxies"""
+ try:
+ getattr(obj, name)
+ except AttributeError:
+ return False
+ else:
+ return True
+
+def _get_fullname(cls):
+ """
+ a heuristic to generate a unique identifier for classes, that is not
+ machine-, platform-, or runtime-dependent
+ """
+ if orig_isinstance(cls, NetProxy):
+ modules = _get_conn(cls).modules.sys.modules
+ else:
+ modules = sys.modules
+ try:
+ filename = modules[cls.__module__].__file__
+ except (KeyError, AttributeError):
+ filename = cls.__module__
+ return (filename, cls.__name__)
+
+def _recursive_issubclass(cls, fullname):
+ for base in cls.__bases__:
+ if _get_fullname(base) == fullname:
+ return True
+ if _recursive_issubclass(base, fullname):
+ return True
+ return False
+
+def _remote_issubclass(cls, bases):
+ cls_fullname = _get_fullname(cls)
+ for base in bases:
+ base_fullname = _get_fullname(base)
+ if cls_fullname == base_fullname:
+ return True
+ if _recursive_issubclass(cls, base_fullname):
+ return True
+ return False
+
+def issubclass(cls, bases):
+ """a version of issubclass that supports NetProxies"""
+ if not orig_isinstance(bases, tuple):
+ bases = (bases,)
+
+ # is cls a proxy?
+ if orig_isinstance(cls, NetProxy):
+ return _remote_issubclass(cls, bases)
+
+ # is one of the bases a proxy?
+ for base in bases:
+ if orig_isinstance(base, NetProxy):
+ return _remote_issubclass(cls, bases)
+
+ # plain old issubclass
+ return orig_issubclass(cls, bases)
+
+def isinstance(obj, bases):
+ """a version of isinstance that supports NetProxies"""
+ try:
+ cls = obj.__getattr__("__class__")
+ except AttributeError:
+ try:
+ cls = obj.__class__
+ except AttributeError:
+ cls = orig_type(obj)
+ return issubclass(cls, bases)
+
+def reload(module):
+ """a version of reload() that supports NetProxies"""
+ if orig_isinstance(module, NetProxy):
+ return _get_conn(module).modules.__builtin__.reload(module)
+ else:
+ return orig_reload(module)
+
+class _Helper(object):
+ """a version of help() that supports NetProxies"""
+ __repr__ = orig_help.__repr__
+
+ def __call__(self, obj = None):
+ if orig_isinstance(obj, NetProxy):
+ print "Help on NetProxy object for an instance of %r:" % (obj.__getattr__("__class__").__name__,)
+ print
+ print "Doc:"
+ print obj.__getattr__("__doc__")
+ print
+ print "Members:"
+ print dir(obj)
+ else:
+ orig_help(obj)
+help = _Helper()
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Rpyc/Utils/Discovery.py b/Rpyc/Utils/Discovery.py
new file mode 100644
index 0000000..538b38c
--- /dev/null
+++ b/Rpyc/Utils/Discovery.py
@@ -0,0 +1,38 @@
+"""
+Discovery: broadcasts a query, attempting to discover all running RPyC servers
+over the local network/specific subnet.
+"""
+import socket
+import select
+import struct
+
+
+__all__ = ["discover_servers"]
+UDP_DISCOVERY_PORT = 18813
+QUERY_MAGIC = "RPYC_QUERY"
+MAX_DGRAM_SIZE = 100
+
+
+def discover_servers(subnet = "255.255.255.255", timeout = 1):
+ """broadcasts a query and returns a list of (addr, port) of running servers"""
+ # broadcast
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
+ s.sendto(QUERY_MAGIC, (subnet, UDP_DISCOVERY_PORT))
+
+ # wait for replies
+ replies = []
+ while True:
+ rlist, dummy, dummy = select.select([s], [], [], timeout)
+ if not rlist:
+ break
+ data, (addr, port) = s.recvfrom(MAX_DGRAM_SIZE)
+ rpyc_port, = struct.unpack("<H", data)
+ replies.append((addr, rpyc_port))
+
+ return list(set(replies))
+
+
+
+
+
diff --git a/Rpyc/Utils/Dist.py b/Rpyc/Utils/Dist.py
new file mode 100644
index 0000000..30ed3eb
--- /dev/null
+++ b/Rpyc/Utils/Dist.py
@@ -0,0 +1,37 @@
+"""
+functions for distributing package and modules across hosts
+"""
+import inspect
+from Files import upload_dir
+from Builtins import reload
+
+
+__all__ = ["upload_package", "update_module"]
+
+def upload_package(conn, module, remotepath = None):
+ """
+ uploads the given package to the server, storing it in `remotepath`. if
+ remotepath is None, it defaults to the server's site-packages. if the package
+ already exists, it is overwritten.
+ usage:
+ import xml
+ upload_package(conn, xml)
+ """
+ if remotepath is None:
+ remotepath = conn.modules["distutils.sysconfig"].get_python_lib()
+ localpath = os.path.dirname(module.__file__)
+ upload_dir(conn, localpath, remotepath, [".py", ".pyd", ".dll", ".so", ".zip"])
+
+def update_module(conn, module):
+ """
+ updates an existing module on the server. the local module is transfered to the
+ server, overwriting the old one, and is reloaded.
+ usage:
+ import xml.dom.minidom
+ update_module(conn, xml.dom.minidom)
+ """
+ remote_module = conn.modules[module.__name__]
+ local_file = inspect.getsourcefile(module)
+ remote_file = inspect.getsourcefile(remote_module)
+ upload_file(conn, local_filem, remote_file)
+ reload(remote_module) \ No newline at end of file
diff --git a/Rpyc/Utils/Factories.py b/Rpyc/Utils/Factories.py
new file mode 100644
index 0000000..8519beb
--- /dev/null
+++ b/Rpyc/Utils/Factories.py
@@ -0,0 +1,56 @@
+"""
+the factory:
+exposes a nice and easy interface to the internals of rpyc.
+this module, along with Utils, are the only modules most clients will need.
+"""
+from weakref import WeakValueDictionary
+from Serving import DEFAULT_PORT
+from Rpyc.Stream import SocketStream, PipeStream
+from Rpyc.Channel import Channel
+from Rpyc.Connection import Connection
+from Rpyc.AsyncNetProxy import AsyncNetProxy
+
+
+__all__ = ["SocketConnection", "PipeConnection", "SecSocketConnection", "Async",
+ "LoginError"]
+
+#
+# connection factories
+#
+def SocketConnection(host, port = DEFAULT_PORT):
+ """shorthand for creating a conneciton over a socket to a server"""
+ return Connection(Channel(SocketStream.from_new_socket(host, port)))
+
+def PipeConnection(incoming, outgoing):
+ """shorthand for creating a conneciton over a pipe"""
+ return Connection(Channel(PipeStream(incoming, outgoing)))
+
+class LoginError(Exception):
+ pass
+
+def SecSocketConnection(host, username, password, port = DEFAULT_PORT):
+ """shorthand for creating secure socket connections"""
+ try:
+ stream = SocketStream.from_new_secure_socket(host, port, username, password)
+ except:
+ raise LoginError("authentication failure")
+ return Connection(Channel(stream))
+
+#
+# Async factory
+#
+_async_proxy_cache = WeakValueDictionary()
+
+def Async(proxy):
+ """a factory for creating asynchronous proxies for existing synchronous ones"""
+ key = id(proxy)
+ if key in _async_proxy_cache:
+ return _async_proxy_cache[key]
+ else:
+ new_proxy = AsyncNetProxy(proxy)
+ _async_proxy_cache[key] = new_proxy
+ return new_proxy
+
+
+
+
diff --git a/Rpyc/Utils/Files.py b/Rpyc/Utils/Files.py
new file mode 100644
index 0000000..17c1112
--- /dev/null
+++ b/Rpyc/Utils/Files.py
@@ -0,0 +1,112 @@
+"""
+file convenience routines
+"""
+import os
+
+
+__all__ = ["upload", "download"]
+CHUNK_SIZE = 1300 # to fit in one ethernet frame
+
+#
+# exceptions
+#
+class UploadError(Exception):
+ pass
+class DownloadError(Exception):
+ pass
+
+#
+# API
+#
+def upload(conn, localpath, remotepath, *a, **k):
+ """
+ uploads a file or a directory recursively (depending on what `localpath` is)
+ an optional `extentions` keyword argument may be given, specifying the
+ extensions of the files to be uploaded (relevant to directories only). if
+ no extentions are given, all files will be uploaded.
+ """
+ if os.path.isdir(localpath):
+ upload_dir(conn, localpath, remotepath, *a, **k)
+ elif os.path.isfile(localpath):
+ upload_file(conn, localpath, remotepath, *a, **k)
+ else:
+ raise UploadError("can only upload files or directories")
+
+def download(conn, remotepath, localpath, *a, **k):
+ """
+ downloads a file or a directory recursively (depending on what `remotepath` is)
+ an optional `extentions` keyword argument may be given, specifying the
+ extensions of the files to be downloaded (relevant to directories only). if
+ no extentions are given, all files will be downloaded.
+ """
+ if conn.modules.os.path.isdir(remotepath):
+ download_dir(conn, remotepath, localpath, *a, **k)
+ elif conn.modules.os.path.isfile(remotepath):
+ download_file(conn, remotepath, localpath, *a, **k)
+ else:
+ raise DownloadError("can only download files or directories")
+
+#
+# internal
+#
+def upload_file(conn, localpath, remotepath):
+ lf = open(localpath, "rb")
+ rf = conn.modules.__builtin__.open(remotepath, "wb")
+ while True:
+ chunk = lf.read(CHUNK_SIZE)
+ if not chunk:
+ break
+ rf.write(chunk)
+ lf.close()
+ rf.close()
+
+def download_file(conn, remotepath, localpath):
+ lf = open(localpath, "wb")
+ rf = conn.modules.__builtin__.open(remotepath, "rb")
+ while True:
+ chunk = rf.read(CHUNK_SIZE)
+ if not chunk:
+ break
+ lf.write(chunk)
+ lf.close()
+ rf.close()
+
+def upload_dir(conn, localpath, remotepath, extensions = [""]):
+ # create the remote path
+ if not conn.modules.os.path.exists(remotepath):
+ conn.modules.os.makedirs(remotepath)
+
+ # upload files and directories
+ for fn in os.listdir(localpath):
+ lfn = os.path.join(localpath, fn)
+ rfn = conn.modules.os.path.join(remotepath, fn)
+
+ if os.path.isdir(lfn):
+ upload_dir(conn, lfn, rfn, extensions)
+
+ elif os.path.isfile(lfn):
+ for ext in extensions:
+ if fn.endswith(ext):
+ upload_file(conn, lfn, rfn)
+ break
+
+def download_dir(conn, remotepath, localpath, extensions = [""]):
+ # create the local path
+ if not os.path.exists(localpath):
+ os.makedirs(localpath)
+
+ # download files and directories
+ for fn in conn.modules.os.listdir(remotepath):
+ lfn = os.path.join(localpath, fn)
+ rfn = conn.modules.os.path.join(remotepath, fn)
+
+ if conn.modules.os.path.isdir(lfn):
+ download_dir(conn, rfn, lfn, extensions)
+
+ elif conn.modules.os.path.isfile(lfn):
+ for ext in extensions:
+ if fn.endswith(ext):
+ download_file(conn, rfn, lfn)
+ break
+
+
diff --git a/Rpyc/Utils/Helpers.py b/Rpyc/Utils/Helpers.py
new file mode 100644
index 0000000..ef18d23
--- /dev/null
+++ b/Rpyc/Utils/Helpers.py
@@ -0,0 +1,149 @@
+"""
+various helper functions
+"""
+import sys
+import cPickle as pickle
+from Builtins import isinstance
+from Rpyc.Lib import orig_isinstance
+from Rpyc.NetProxy import NetProxy, _get_conn
+from types import CodeType as code, FunctionType as function
+
+
+__all__ = ["obtain", "deliver", "isproxy", "getconn", "RedirectedStd", "DeliveringNamespace"]
+
+def isproxy(obj):
+ """indicates whether the given object is a NetProxy"""
+ return orig_isinstance(obj, NetProxy)
+
+def getconn(obj):
+ """returns the connection of a NetProxy"""
+ if not isproxy(obj):
+ raise TypeError("`obj` is not a NetProxy")
+ return _get_conn(obj)
+
+def _dump_function(func):
+ """serializes a function"""
+ func_info = (
+ func.func_name,
+ func.func_defaults,
+ func.func_closure,
+ )
+ code_info = (
+ func.func_code.co_argcount,
+ func.func_code.co_nlocals,
+ func.func_code.co_stacksize,
+ func.func_code.co_flags,
+ func.func_code.co_code,
+ func.func_code.co_consts,
+ func.func_code.co_names,
+ func.func_code.co_varnames,
+ func.func_code.co_filename,
+ func.func_code.co_name,
+ func.func_code.co_firstlineno,
+ func.func_code.co_lnotab,
+ func.func_code.co_freevars,
+ func.func_code.co_cellvars,
+ )
+ return pickle.dumps((code_info, func_info, func.func_doc), pickle.HIGHEST_PROTOCOL)
+
+def _load_function(pickled_func, globals):
+ """recreates a serialized function"""
+ code_info, func_info, doc = pickle.loads(pickled_func)
+ func = function(code(*code_info), globals, *func_info)
+ func.func_doc = doc
+ return func
+
+def obtain(proxy):
+ """
+ obtains (brings forth) a remote object. the object can be a function or
+ any picklable object. obtaining objects creates a local copy of the remote
+ object, so changes made to the local copy are not reflected on the remote
+ one. keep this in mind.
+
+ proxy - any proxy to a remote object
+ returns a "real" object
+ """
+ if not isproxy(proxy):
+ raise TypeError("object must be a proxy")
+ if isinstance(proxy, function):
+ globals = getconn(proxy)._local_namespace
+ return _load_function(_dump_function(proxy), globals)
+ else:
+ return pickle.loads(getconn(proxy).modules.cPickle.dumps(proxy, pickle.HIGHEST_PROTOCOL))
+
+def deliver(obj, conn):
+ """
+ delivers a local object to the other side of the connection. the object
+ can be a function or any picklable object. deliver objects creates a remote
+ copy of the objectm so changes made to the remote copy are not reflected on
+ the local one. keep this in mind.
+
+ obj - the object to deliver
+ conn - the connection which obtains the object
+ returns a proxy to the delivered object
+ """
+ if isproxy(obj):
+ raise TypeError("can't deliver proxies")
+ if orig_isinstance(obj, function):
+ globals = conn.remote_conn._local_namespace
+ dumped = _dump_function(obj)
+ return conn.modules[__name__]._load_function(dumped, globals)
+ else:
+ return conn.modules.cPickle.loads(pickle.dumps(obj, pickle.HIGHEST_PROTOCOL))
+
+class DeliveringNamespace(object):
+ """delivering namesapce: getattr`ing from this object returns a proxy,
+ while setattr`ing this object delivers the given object to the remote side
+ of the connection"""
+ __slots__ = ["____conn__"]
+ def __init__(self, conn):
+ object.__setattr__(self, "____conn__", conn)
+ def __getattr__(self, name):
+ return _get_conn(self).namespace[name]
+ def __setattr__(self, name, value):
+ if isproxy(value):
+ if _get_conn(value) is not _get_conn(self):
+ raise TypeError("proxies must belong to the namespace's connection")
+ _get_conn(self).namespace[name] = value
+ else:
+ _get_conn(self).namespace[name] = deliver(value, _get_conn(self))
+
+class RedirectedStd(object):
+ """redirected std[in|out|err] context"""
+ __slots__ = ["conn", "redirected", "orig_stdin", "orig_stdout", "orig_strerr"]
+ def __init__(self, conn):
+ self.conn = conn
+ self.redirected = False
+ def __del__(self):
+ self.restore()
+ def redirect(self):
+ if self.redirected:
+ return
+ self.orig_stdin = self.conn.modules.sys.stdin
+ self.orig_stdout = self.conn.modules.sys.stdout
+ self.orig_strerr = self.conn.modules.sys.stderr
+ self.conn.modules.sys.stdin = sys.stdin
+ self.conn.modules.sys.stdout = sys.stdout
+ self.conn.modules.sys.stderr = sys.stderr
+ self.redirected = True
+ def restore(self):
+ if not self.redirected:
+ return
+ self.conn.modules.sys.stdin = self.orig_stdin
+ self.conn.modules.sys.stdout = self.orig_stdout
+ self.conn.modules.sys.stderr = self.orig_strerr
+ self.redirected = False
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Rpyc/Utils/Interpreter.py b/Rpyc/Utils/Interpreter.py
new file mode 100644
index 0000000..0960d07
--- /dev/null
+++ b/Rpyc/Utils/Interpreter.py
@@ -0,0 +1,39 @@
+"""
+remote interpreter functions
+"""
+import sys
+from Helpers import RedirectedStd
+
+
+__all__ = ["remote_interpreter", "remote_pm"]
+
+def remote_interpreter(conn, namespace = None):
+ """starts an interactive interpreter on the server"""
+ if namespace is None:
+ namespace = {"conn" : conn}
+
+ std = RedirectedStd(conn)
+ try:
+ std.redirect()
+ conn.modules[__name__]._remote_interpreter_server_side(**namespace)
+ finally:
+ std.restore()
+
+def _remote_interpreter_server_side(**namespace):
+ import code
+ namespace.update(globals())
+ code.interact(local = namespace)
+
+def remote_pm(conn):
+ """a version of pdb.pm() that operates on exceptions at the remote side of the connection"""
+ import pdb
+ pdb.post_mortem(conn.modules.sys.last_traceback)
+
+
+
+
+
+
+
+
+
diff --git a/Rpyc/Utils/Serving.py b/Rpyc/Utils/Serving.py
new file mode 100644
index 0000000..b726428
--- /dev/null
+++ b/Rpyc/Utils/Serving.py
@@ -0,0 +1,126 @@
+import os
+import socket
+import sys
+import gc
+import struct
+from threading import Thread
+from Rpyc.Connection import Connection
+from Rpyc.Stream import SocketStream, PipeStream
+from Rpyc.Channel import Channel
+from Discovery import UDP_DISCOVERY_PORT, MAX_DGRAM_SIZE, QUERY_MAGIC
+
+DEFAULT_PORT = 18812
+
+
+#
+# utilities
+#
+class _Logger(object):
+ def __init__(self, logfile = None, active = True):
+ self.logfile = logfile
+ self.active = active
+ def __call__(self, *args):
+ if self.active and self.logfile:
+ text = " ".join(str(a) for a in args)
+ self.logfile.write("[%d] %s\n" % (os.getpid(), text))
+ self.logfile.flush()
+
+log = _Logger(sys.stdout)
+
+def create_listener_socket(port):
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(("", port))
+ sock.listen(4)
+ log("listening on", sock.getsockname())
+ return sock
+
+#
+# serving
+#
+def serve_channel(chan):
+ conn = Connection(chan)
+ try:
+ try:
+ while True:
+ conn.serve()
+ except EOFError:
+ pass
+ finally:
+ conn.close()
+ gc.collect()
+
+def serve_socket_helper(sock, secure = False, vdb = None):
+ if secure:
+ log("requiring authentication")
+ try:
+ stream = SocketStream.from_secure_server_socket(sock, vdb)
+ except:
+ log("authenication failed")
+ sock.close()
+ else:
+ log("authentication successful")
+ serve_channel(Channel(stream))
+ else:
+ serve_channel(Channel(SocketStream(sock)))
+
+def serve_socket(sock, **kw):
+ sockname = sock.getpeername()
+ log("welcome", sockname)
+ try:
+ try:
+ serve_socket_helper(sock, **kw)
+ except socket.error:
+ pass
+ finally:
+ log("goodbye", sockname)
+
+def serve_pipes(incoming, outgoing):
+ serve_channel(Channel(PipeStream(incoming, outgoing)))
+
+#
+# threaded utilities
+#
+def threaded_server(port = DEFAULT_PORT, **kw):
+ sock = create_listener_socket(port)
+ while True:
+ newsock, name = sock.accept()
+ t = Thread(target = serve_socket, args = (newsock,), kwargs = kw)
+ t.setDaemon(True)
+ t.start()
+
+def start_threaded_server(*args, **kwargs):
+ """starts the threaded_server on a separate thread. this turns the
+ threaded_server into a mix-in you can place anywhere in your code"""
+ t = Thread(target = threaded_server, args = args, kwargs = kwargs)
+ t.setDaemon(True)
+ t.start()
+
+#
+# discovery
+#
+def discovery_agent(rpyc_port):
+ """
+ answers broadcasted queries with the port of the RPyC server on this machine.
+ run this agent on a separate thread
+ """
+ data = struct.pack("<H", rpyc_port)
+
+ # listen
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ s.bind(("", UDP_DISCOVERY_PORT))
+ log("discovery_agent: started")
+
+ # serve
+ while True:
+ query, addr = s.recvfrom(MAX_DGRAM_SIZE)
+ if query == QUERY_MAGIC:
+ log("discovery_agent: now answering", addr)
+ s.sendto(data, addr)
+
+def start_discovery_agent_thread(*args, **kwargs):
+ t = Thread(target = discovery_agent, args = args, kwargs = kwargs)
+ t.setDaemon(True)
+ t.start()
+
+
diff --git a/Rpyc/Utils/__init__.py b/Rpyc/Utils/__init__.py
new file mode 100644
index 0000000..6bd09d9
--- /dev/null
+++ b/Rpyc/Utils/__init__.py
@@ -0,0 +1,7 @@
+from Builtins import *
+from Helpers import *
+from Files import *
+from Interpreter import *
+from Dist import *
+from Discovery import *
+from Factories import *
diff --git a/Rpyc/__init__.py b/Rpyc/__init__.py
new file mode 100644
index 0000000..6a495ef
--- /dev/null
+++ b/Rpyc/__init__.py
@@ -0,0 +1,33 @@
+"""
+RPyC -- Remote Python Call
+http://rpyc.sourceforge.net
+by Tomer Filiba (tomerfiliba at gmail dot com)
+"""
+import sys
+from Lib import rpyc_excepthook
+from Utils import *
+
+
+#
+# API
+#
+__all__ = [
+ # Factories
+ "SocketConnection", "PipeConnection", "SecSocketConnection", "Async",
+ # Builtins
+ "dir", "getattr", "hasattr", "reload", "help", "isinstance", "issubclass",
+ # Helpers
+ "obtain", "deliver", "isproxy", "getconn",
+ # Files
+ "upload", "download",
+ # Discovery
+ "discover_servers",
+]
+
+__version__ = (2, 60)
+
+
+#
+# install custom exception hook
+#
+sys.excepthook = rpyc_excepthook
diff --git a/Rpyc/tests/isinstance.py b/Rpyc/tests/isinstance.py
new file mode 100644
index 0000000..aa5e2cb
--- /dev/null
+++ b/Rpyc/tests/isinstance.py
@@ -0,0 +1,53 @@
+from Rpyc import *
+import time
+c=SocketConnection("localhost")
+
+t=time.time()
+assert isinstance(1, int) == True
+assert isinstance(1, float) == False
+assert isinstance(1, (int, float)) == True
+assert isinstance(1, (str, float)) == False
+
+assert isinstance(c.modules.sys.path, list) == True
+assert isinstance(c.modules.sys.path, str) == False
+assert isinstance(c.modules.sys.path, (list, str)) == True
+assert isinstance(c.modules.sys.path, (int, str)) == False
+
+assert isinstance(c.modules.sys.path, c.modules.__builtin__.list) == True
+assert isinstance(c.modules.sys.path, (str, c.modules.__builtin__.list)) == True
+assert isinstance(c.modules.sys.path, c.modules.__builtin__.int) == False
+assert isinstance(c.modules.sys.path, (str, c.modules.__builtin__.int)) == False
+
+assert isinstance([1,2,3], c.modules.__builtin__.list) == True
+assert isinstance([1,2,3], c.modules.__builtin__.int) == False
+assert isinstance([1,2,3], (c.modules.__builtin__.list, int)) == True
+assert isinstance([1,2,3], (c.modules.__builtin__.int, int)) == False
+
+assert issubclass(str, str) == True
+assert issubclass(str, basestring) == True
+assert issubclass(str, (int, basestring)) == True
+assert issubclass(str, int) == False
+assert issubclass(str, (int, float)) == False
+
+assert issubclass(c.modules.__builtin__.str, str) == True
+assert issubclass(c.modules.__builtin__.str, basestring) == True
+assert issubclass(c.modules.__builtin__.str, (list, basestring)) == True
+assert issubclass(c.modules.__builtin__.str, int) == False
+assert issubclass(c.modules.__builtin__.str, (int, float)) == False
+
+assert issubclass(c.modules.__builtin__.str, c.modules.__builtin__.str) == True
+assert issubclass(c.modules.__builtin__.str, c.modules.__builtin__.basestring) == True
+assert issubclass(c.modules.__builtin__.str, (list, c.modules.__builtin__.basestring)) == True
+assert issubclass(c.modules.__builtin__.str, c.modules.__builtin__.int) == False
+assert issubclass(c.modules.__builtin__.str, (c.modules.__builtin__.int, c.modules.__builtin__.float)) == False
+
+assert issubclass(str, c.modules.__builtin__.str) == True
+assert issubclass(str, c.modules.__builtin__.basestring) == True
+assert issubclass(str, (int, c.modules.__builtin__.basestring)) == True
+assert issubclass(int, c.modules.__builtin__.str) == False
+assert issubclass(int, (c.modules.__builtin__.str, float)) == False
+
+t=time.time()-t
+print "all okay", t
+
+