diff options
author | Gonzalo Odiard <godiard@gmail.com> | 2013-05-06 21:01:13 (GMT) |
---|---|---|
committer | Gonzalo Odiard <godiard@gmail.com> | 2013-05-07 11:56:37 (GMT) |
commit | 71aeb426148dabc39dd910cdb473f908ab4e63aa (patch) | |
tree | c357e96ed4024b4c33e475b6345639d32990c731 | |
parent | a6b7c2768f30d71d4ca1c2cafd2da8e6e03fb812 (diff) |
Implement upload of files from the client using websockets
Now the upload from all sides is done with the add button in the
toolbar. The ui is not blocked, only a watch cursor is shown,
while the file is uploaded.
Signed-off-by: Gonzalo Odiard <gonzalo@laptop.org>
-rw-r--r-- | activity.py | 48 | ||||
-rw-r--r-- | server.py | 38 | ||||
-rw-r--r-- | utils.py | 61 | ||||
-rw-r--r-- | web/index.html | 17 | ||||
-rw-r--r-- | websocket.py | 742 |
5 files changed, 875 insertions, 31 deletions
diff --git a/activity.py b/activity.py index dd19b64..69ea814 100644 --- a/activity.py +++ b/activity.py @@ -20,6 +20,7 @@ from gettext import gettext as _ from gi.repository import GObject GObject.threads_init() from gi.repository import Gtk +from gi.repository import Gdk from gi.repository import WebKit import telepathy @@ -64,7 +65,9 @@ class JournalShare(activity.Activity): self._activity_root = activity.get_activity_root() self._jm = JournalManager(self._activity_root) - self.server_proc = None + # master is the activity in the activity who started the communication + self._master = False + self._ip = '0.0.0.0' if not self.shared_activity: # Get a free socket @@ -78,6 +81,7 @@ class JournalShare(activity.Activity): #TODO: check available port server.run_server(self._activity_path, self._activity_root, self._jm, self.port) + self._master = True toolbar_box = ToolbarBox() @@ -91,12 +95,13 @@ class JournalShare(activity.Activity): add_button.connect('clicked', self.__add_clicked_cb) toolbar_box.toolbar.insert(add_button, -1) - add_favorites_button = ToolButton('emblem-favorite') - add_favorites_button.set_tooltip(_('Add favorite items to share')) - add_favorites_button.show() - add_favorites_button.connect('clicked', - self.__add_favorites_clicked_cb) - toolbar_box.toolbar.insert(add_favorites_button, -1) + if self._master: + add_favorites_button = ToolButton('emblem-favorite') + add_favorites_button.set_tooltip(_('Add favorite items to share')) + add_favorites_button.show() + add_favorites_button.connect('clicked', + self.__add_favorites_clicked_cb) + toolbar_box.toolbar.insert(add_favorites_button, -1) separator = Gtk.SeparatorToolItem() separator.props.draw = False @@ -165,11 +170,28 @@ class JournalShare(activity.Activity): chooser.get_selected_object()) jobject = chooser.get_selected_object() if jobject and jobject.file_path: - self._jm.append_to_shared_items(jobject.object_id) + if self._master: + self._jm.append_to_shared_items(jobject.object_id) + else: + tmp_path = os.path.join(self._activity_root, + 'instance') + logging.error('temp_path %s', tmp_path) + packaged_file_path = utils.package_ds_object( + jobject, tmp_path) + url = 'ws://%s:%d/websocket/upload' % (self._ip, + self.port) + uploader = utils.Uploader(packaged_file_path, url) + uploader.connect('uploaded', self.__uploaded_cb) + cursor = Gdk.Cursor.new(Gdk.CursorType.WATCH) + self.get_window().set_cursor(cursor) + uploader.start() finally: chooser.destroy() del chooser + def __uploaded_cb(self, uploader): + self.get_window().set_cursor(None) + def __add_favorites_clicked_cb(self, button): self._jm.set_shared_items(['*']) @@ -200,9 +222,11 @@ class JournalShare(activity.Activity): assert isinstance(addr[0], str) assert isinstance(addr[1], (int, long)) assert addr[1] > 0 and addr[1] < 65536 - port = int(addr[1]) + self._ip = addr[0] + self.port = int(addr[1]) - self.view.load_uri('http://%s:%d/web/index.html' % (addr[0], port)) + self.view.load_uri('http://%s:%d/web/index.html' % + (self._ip, self.port)) return False def _start_sharing(self): @@ -219,7 +243,7 @@ class JournalShare(activity.Activity): def watch_for_tubes(self): """Watch for new tubes.""" - if self.server_proc is not None: + if self._master: # I am sharing, then, don't try to connect to the tubes return @@ -308,8 +332,6 @@ class JournalShare(activity.Activity): f.close() def can_close(self): - if self.server_proc is not None: - self.server_proc.kill() self._allow_suspend() # remove temporary files instance_path = self._activity_root + '/instance/' @@ -25,6 +25,8 @@ from tornado import websocket from gi.repository import GLib import utils +import tempfile +import base64 class UploaderHandler(web.RequestHandler): @@ -96,6 +98,40 @@ class JournalWebSocketHandler(websocket.WebSocketHandler): logging.error("WebSocket closed") +class WebSocketUploadHandler(websocket.WebSocketHandler): + + def initialize(self, instance_path, journal_manager): + self._instance_path = instance_path + self._jm = journal_manager + + def open(self): + self._tmp_file = tempfile.NamedTemporaryFile( + mode='r+', dir=self._instance_path) + + def on_message(self, message): + self._tmp_file.write(message) + self._tmp_file.flush() + self.write_message('NEXT') + + def on_close(self): + # save to the journal + # decode the file + self._decoded_tmp_file = tempfile.NamedTemporaryFile( + mode='r+', dir=self._instance_path) + self._tmp_file.seek(0) + base64.decode(self._tmp_file, self._decoded_tmp_file) + self._decoded_tmp_file.flush() + + metadata, preview_data, file_path = \ + utils.unpackage_ds_object(self._decoded_tmp_file.name, None) + logging.error('METADATA %s', metadata) + + GLib.idle_add(self._jm.create_object, file_path, metadata, + preview_data) + self._tmp_file.close() + self._decoded_tmp_file.close() + + def run_server(activity_path, activity_root, jm, port): from threading import Thread @@ -110,6 +146,8 @@ def run_server(activity_path, activity_root, jm, port): (r"/datastore/(.*)", DatastoreHandler, {"path": instance_path}), (r"/websocket", JournalWebSocketHandler, {"instance_path": instance_path, "journal_manager": jm}), + (r"/websocket/upload", WebSocketUploadHandler, + {"instance_path": instance_path, "journal_manager": jm}), (r"/upload", UploaderHandler, {"instance_path": instance_path, "static_path": static_path, "journal_manager": jm @@ -15,11 +15,65 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -import os +from gi.repository import GObject import base64 +import os import json import dbus from zipfile import ZipFile +import logging + +import websocket +import tempfile + +CHUNK_SIZE = 2048 + + +class Uploader(GObject.GObject): + + __gsignals__ = {'uploaded': (GObject.SignalFlags.RUN_FIRST, None, ([]))} + + def __init__(self, file_path, url): + GObject.GObject.__init__(self) + logging.error('websocket url %s', url) + # base64 encode the file + self._file = tempfile.TemporaryFile(mode='r+') + base64.encode(open(file_path, 'r'), self._file) + self._file.seek(0) + + self._ws = websocket.WebSocketApp(url, + on_open=self._on_open, + on_message=self._on_message, + on_error=self._on_error, + on_close=self._on_close) + self._chunk = str(self._file.read(CHUNK_SIZE)) + + def start(self): + from threading import Thread + upload_looop = Thread(target=self._ws.run_forever) + upload_looop.setDaemon(True) + upload_looop.start() + + def _on_open(self, ws): + if self._chunk != '': + self._ws.send(self._chunk) + else: + self._ws.close() + + def _on_message(self, ws, message): + self._chunk = self._file.read(CHUNK_SIZE) + if self._chunk != '': + self._ws.send(self._chunk) + else: + self._ws.close() + + def _on_error(self, ws, error): + #self._ws.send(self._chunk) + pass + + def _on_close(self, ws): + self._file.close() + GObject.idle_add(self.emit, 'uploaded') def package_ds_object(dsobj, destination_path): @@ -28,8 +82,10 @@ def package_ds_object(dsobj, destination_path): the preview and the metadata """ object_id = dsobj.object_id + logging.error('id %s', object_id) preview_path = None + logging.error('before preview') if 'preview' in dsobj.metadata: # TODO: copied from expandedentry.py # is needed because record is saving the preview encoded @@ -45,6 +101,7 @@ def package_ds_object(dsobj, destination_path): preview_file.write(preview) preview_file.close() + logging.error('before metadata') # create file with the metadata metadata_path = os.path.join(destination_path, 'metadata_id_' + object_id) @@ -56,6 +113,8 @@ def package_ds_object(dsobj, destination_path): metadata_file.write(json.dumps(metadata)) metadata_file.close() + logging.error('before create zip') + # create a zip fileincluding metadata and preview # to be read from the web server file_path = os.path.join(destination_path, 'id_' + object_id + '.journal') diff --git a/web/index.html b/web/index.html index ea212bc..a2f133c 100644 --- a/web/index.html +++ b/web/index.html @@ -31,11 +31,6 @@ $('#header').css('background-color', owner_info.fill_color); }); - - if (local) { - $('#uploadarea').hide(); - } - $.getJSON("/datastore/selected.json", function(selected) { for (var i = 0; i < selected.length; i++) { id = selected[i].id; @@ -75,18 +70,6 @@ <div id="header"> </div> - <div id="uploadarea"> - <table> - <tr><td> - <form action="/upload" method="post" enctype="multipart/form-data"> - Add from my Journal:<br> - <input type="file" name="journal_item" size="30"> - <input type="submit" value="Send"> - </form> - </td></tr> - </table> - </div> - <table id="journaltable"> </table> diff --git a/websocket.py b/websocket.py new file mode 100644 index 0000000..71d8b7e --- /dev/null +++ b/websocket.py @@ -0,0 +1,742 @@ +""" +websocket - WebSocket client library for Python + +Copyright (C) 2010 Hiroki Ohtani(liris) + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" + + +import socket +from urlparse import urlparse +import os +import struct +import uuid +import sha +import base64 +import logging + +""" +websocket python client. +========================= + +This version support only hybi-13. +Please see http://tools.ietf.org/html/rfc6455 for protocol. +""" + + +# websocket supported version. +VERSION = 13 + +# closing frame status codes. +STATUS_NORMAL = 1000 +STATUS_GOING_AWAY = 1001 +STATUS_PROTOCOL_ERROR = 1002 +STATUS_UNSUPPORTED_DATA_TYPE = 1003 +STATUS_STATUS_NOT_AVAILABLE = 1005 +STATUS_ABNORMAL_CLOSED = 1006 +STATUS_INVALID_PAYLOAD = 1007 +STATUS_POLICY_VIOLATION = 1008 +STATUS_MESSAGE_TOO_BIG = 1009 +STATUS_INVALID_EXTENSION = 1010 +STATUS_UNEXPECTED_CONDITION = 1011 +STATUS_TLS_HANDSHAKE_ERROR = 1015 + +logger = logging.getLogger() + +class WebSocketException(Exception): + """ + websocket exeception class. + """ + pass + +default_timeout = None +traceEnabled = False + +def enableTrace(tracable): + """ + turn on/off the tracability. + + tracable: boolean value. if set True, tracability is enabled. + """ + global traceEnabled + traceEnabled = tracable + if tracable: + if not logger.handlers: + logger.addHandler(logging.StreamHandler()) + logger.setLevel(logging.DEBUG) + +def setdefaulttimeout(timeout): + """ + Set the global timeout setting to connect. + + timeout: default socket timeout time. This value is second. + """ + global default_timeout + default_timeout = timeout + +def getdefaulttimeout(): + """ + Return the global timeout setting(second) to connect. + """ + return default_timeout + +def _parse_url(url): + """ + parse url and the result is tuple of + (hostname, port, resource path and the flag of secure mode) + + url: url string. + """ + if ":" not in url: + raise ValueError("url is invalid") + + scheme, url = url.split(":", 1) + url = url.rstrip("/") + + parsed = urlparse(url, scheme="http") + if parsed.hostname: + hostname = parsed.hostname + else: + raise ValueError("hostname is invalid") + port = 0 + if parsed.port: + port = parsed.port + + is_secure = False + if scheme == "ws": + if not port: + port = 80 + elif scheme == "wss": + is_secure = True + if not port: + port = 443 + else: + raise ValueError("scheme %s is invalid" % scheme) + + if parsed.path: + resource = parsed.path + else: + resource = "/" + + return (hostname, port, resource, is_secure) + +def create_connection(url, timeout=None, **options): + """ + connect to url and return websocket object. + + Connect to url and return the WebSocket object. + Passing optional timeout parameter will set the timeout on the socket. + If no timeout is supplied, the global default timeout setting returned by getdefauttimeout() is used. + You can customize using 'options'. + If you set "headers" dict object, you can set your own custom header. + + >>> conn = create_connection("ws://echo.websocket.org/", + ... headers={"User-Agent": "MyProgram"}) + + timeout: socket timeout time. This value is integer. + if you set None for this value, it means "use default_timeout value" + + options: current support option is only "header". + if you set header as dict value, the custom HTTP headers are added. + """ + websock = WebSocket() + websock.settimeout(timeout != None and timeout or default_timeout) + websock.connect(url, **options) + return websock + +_MAX_INTEGER = (1 << 32) -1 +_AVAILABLE_KEY_CHARS = range(0x21, 0x2f + 1) + range(0x3a, 0x7e + 1) +_MAX_CHAR_BYTE = (1<<8) -1 + +# ref. Websocket gets an update, and it breaks stuff. +# http://axod.blogspot.com/2010/06/websocket-gets-update-and-it-breaks.html + +def _create_sec_websocket_key(): + uid = uuid.uuid4() + return base64.encodestring(uid.bytes).strip() + +_HEADERS_TO_CHECK = { + "upgrade": "websocket", + "connection": "upgrade", + } + +class _SSLSocketWrapper(object): + def __init__(self, sock): + self.ssl = socket.ssl(sock) + + def recv(self, bufsize): + return self.ssl.read(bufsize) + + def send(self, payload): + return self.ssl.write(payload) + +_BOOL_VALUES = (0, 1) +def _is_bool(*values): + for v in values: + if v not in _BOOL_VALUES: + return False + + return True + +class ABNF(object): + """ + ABNF frame class. + see http://tools.ietf.org/html/rfc5234 + and http://tools.ietf.org/html/rfc6455#section-5.2 + """ + + # operation code values. + OPCODE_TEXT = 0x1 + OPCODE_BINARY = 0x2 + OPCODE_CLOSE = 0x8 + OPCODE_PING = 0x9 + OPCODE_PONG = 0xa + + # available operation code value tuple + OPCODES = (OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE, + OPCODE_PING, OPCODE_PONG) + + # opcode human readable string + OPCODE_MAP = { + OPCODE_TEXT: "text", + OPCODE_BINARY: "binary", + OPCODE_CLOSE: "close", + OPCODE_PING: "ping", + OPCODE_PONG: "pong" + } + + # data length threashold. + LENGTH_7 = 0x7d + LENGTH_16 = 1 << 16 + LENGTH_63 = 1 << 63 + + def __init__(self, fin = 0, rsv1 = 0, rsv2 = 0, rsv3 = 0, + opcode = OPCODE_TEXT, mask = 1, data = ""): + """ + Constructor for ABNF. + please check RFC for arguments. + """ + self.fin = fin + self.rsv1 = rsv1 + self.rsv2 = rsv2 + self.rsv3 = rsv3 + self.opcode = opcode + self.mask = mask + self.data = data + self.get_mask_key = os.urandom + + @staticmethod + def create_frame(data, opcode): + """ + create frame to send text, binary and other data. + + data: data to send. This is string value(byte array). + if opcode is OPCODE_TEXT and this value is uniocde, + data value is conveted into unicode string, automatically. + + opcode: operation code. please see OPCODE_XXX. + """ + if opcode == ABNF.OPCODE_TEXT and isinstance(data, unicode): + data = data.encode("utf-8") + # mask must be set if send data from client + return ABNF(1, 0, 0, 0, opcode, 1, data) + + def format(self): + """ + format this object to string(byte array) to send data to server. + """ + if not _is_bool(self.fin, self.rsv1, self.rsv2, self.rsv3): + raise ValueError("not 0 or 1") + if self.opcode not in ABNF.OPCODES: + raise ValueError("Invalid OPCODE") + length = len(self.data) + if length >= ABNF.LENGTH_63: + raise ValueError("data is too long") + + frame_header = chr(self.fin << 7 + | self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 + | self.opcode) + if length < ABNF.LENGTH_7: + frame_header += chr(self.mask << 7 | length) + elif length < ABNF.LENGTH_16: + frame_header += chr(self.mask << 7 | 0x7e) + frame_header += struct.pack("!H", length) + else: + frame_header += chr(self.mask << 7 | 0x7f) + frame_header += struct.pack("!Q", length) + + if not self.mask: + return frame_header + self.data + else: + mask_key = self.get_mask_key(4) + return frame_header + self._get_masked(mask_key) + + def _get_masked(self, mask_key): + s = ABNF.mask(mask_key, self.data) + return mask_key + "".join(s) + + @staticmethod + def mask(mask_key, data): + """ + mask or unmask data. Just do xor for each byte + + mask_key: 4 byte string(byte). + + data: data to mask/unmask. + """ + _m = map(ord, mask_key) + _d = map(ord, data) + for i in range(len(_d)): + _d[i] ^= _m[i % 4] + s = map(chr, _d) + return "".join(s) + +class WebSocket(object): + """ + Low level WebSocket interface. + This class is based on + The WebSocket protocol draft-hixie-thewebsocketprotocol-76 + http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 + + We can connect to the websocket server and send/recieve data. + The following example is a echo client. + + >>> import websocket + >>> ws = websocket.WebSocket() + >>> ws.connect("ws://echo.websocket.org") + >>> ws.send("Hello, Server") + >>> ws.recv() + 'Hello, Server' + >>> ws.close() + + get_mask_key: a callable to produce new mask keys, see the set_mask_key + function's docstring for more details + """ + def __init__(self, get_mask_key = None): + """ + Initalize WebSocket object. + """ + self.connected = False + self.io_sock = self.sock = socket.socket() + self.get_mask_key = get_mask_key + + def set_mask_key(self, func): + """ + set function to create musk key. You can custumize mask key generator. + Mainly, this is for testing purpose. + + func: callable object. the fuct must 1 argument as integer. + The argument means length of mask key. + This func must be return string(byte array), + which length is argument specified. + """ + self.get_mask_key = func + + def settimeout(self, timeout): + """ + Set the timeout to the websocket. + + timeout: timeout time(second). + """ + self.sock.settimeout(timeout) + + def gettimeout(self): + """ + Get the websocket timeout(second). + """ + return self.sock.gettimeout() + + def connect(self, url, **options): + """ + Connect to url. url is websocket url scheme. ie. ws://host:port/resource + You can customize using 'options'. + If you set "headers" dict object, you can set your own custom header. + + >>> ws = WebSocket() + >>> ws.connect("ws://echo.websocket.org/", + ... headers={"User-Agent": "MyProgram"}) + + timeout: socket timeout time. This value is integer. + if you set None for this value, + it means "use default_timeout value" + + options: current support option is only "header". + if you set header as dict value, + the custom HTTP headers are added. + + """ + hostname, port, resource, is_secure = _parse_url(url) + # TODO: we need to support proxy + self.sock.connect((hostname, port)) + if is_secure: + self.io_sock = _SSLSocketWrapper(self.sock) + self._handshake(hostname, port, resource, **options) + + def _handshake(self, host, port, resource, **options): + sock = self.io_sock + headers = [] + headers.append("GET %s HTTP/1.1" % resource) + headers.append("Upgrade: websocket") + headers.append("Connection: Upgrade") + if port == 80: + hostport = host + else: + hostport = "%s:%d" % (host, port) + headers.append("Host: %s" % hostport) + headers.append("Origin: %s" % hostport) + + key = _create_sec_websocket_key() + headers.append("Sec-WebSocket-Key: %s" % key) + headers.append("Sec-WebSocket-Protocol: chat, superchat") + headers.append("Sec-WebSocket-Version: %s" % VERSION) + if "header" in options: + headers.extend(options["header"]) + + headers.append("") + headers.append("") + + header_str = "\r\n".join(headers) + sock.send(header_str) + if traceEnabled: + logger.debug( "--- request header ---") + logger.debug( header_str) + logger.debug("-----------------------") + + status, resp_headers = self._read_headers() + if status != 101: + self.close() + raise WebSocketException("Handshake Status %d" % status) + + success = self._validate_header(resp_headers, key) + if not success: + self.close() + raise WebSocketException("Invalid WebSocket Header") + + self.connected = True + + def _validate_header(self, headers, key): + for k, v in _HEADERS_TO_CHECK.iteritems(): + r = headers.get(k, None) + if not r: + return False + r = r.lower() + if v != r: + return False + + result = headers.get("sec-websocket-accept", None) + if not result: + return False + result = result.lower() + + value = key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + hashed = base64.encodestring(sha.sha(value).digest()).strip().lower() + return hashed == result + + def _read_headers(self): + status = None + headers = {} + if traceEnabled: + logger.debug("--- response header ---") + + while True: + line = self._recv_line() + if line == "\r\n": + break + line = line.strip() + if traceEnabled: + logger.debug(line) + if not status: + status_info = line.split(" ", 2) + status = int(status_info[1]) + else: + kv = line.split(":", 1) + if len(kv) == 2: + key, value = kv + headers[key.lower()] = value.strip().lower() + else: + raise WebSocketException("Invalid header") + + if traceEnabled: + logger.debug("-----------------------") + + return status, headers + + def send(self, payload, opcode = ABNF.OPCODE_TEXT): + """ + Send the data as string. + + payload: Payload must be utf-8 string or unicoce, + if the opcode is OPCODE_TEXT. + Otherwise, it must be string(byte array) + + opcode: operation code to send. Please see OPCODE_XXX. + """ + frame = ABNF.create_frame(payload, opcode) + if self.get_mask_key: + frame.get_mask_key = self.get_mask_key + data = frame.format() + self.io_sock.send(data) + if traceEnabled: + logger.debug("send: " + repr(data)) + + def ping(self, payload = ""): + """ + send ping data. + + payload: data payload to send server. + """ + self.send(payload, ABNF.OPCODE_PING) + + def pong(self, payload): + """ + send pong data. + + payload: data payload to send server. + """ + self.send(payload, ABNF.OPCODE_PONG) + + def recv(self): + """ + Receive string data(byte array) from the server. + + return value: string(byte array) value. + """ + opcode, data = self.recv_data() + return data + + def recv_data(self): + """ + Recieve data with operation code. + + return value: tuple of operation code and string(byte array) value. + """ + while True: + frame = self.recv_frame() + if not frame: + # handle error: + # 'NoneType' object has no attribute 'opcode' + raise WebSocketException("Not a valid frame %s" % frame) + elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY): + return (frame.opcode, frame.data) + elif frame.opcode == ABNF.OPCODE_CLOSE: + self.send_close() + return (frame.opcode, None) + elif frame.opcode == ABNF.OPCODE_PING: + self.pong("Hi!") + + + def recv_frame(self): + """ + recieve data as frame from server. + + return value: ABNF frame object. + """ + header_bytes = self._recv(2) + if not header_bytes: + return None + b1 = ord(header_bytes[0]) + fin = b1 >> 7 & 1 + rsv1 = b1 >> 6 & 1 + rsv2 = b1 >> 5 & 1 + rsv3 = b1 >> 4 & 1 + opcode = b1 & 0xf + b2 = ord(header_bytes[1]) + mask = b2 >> 7 & 1 + length = b2 & 0x7f + + length_data = "" + if length == 0x7e: + length_data = self._recv(2) + length = struct.unpack("!H", length_data)[0] + elif length == 0x7f: + length_data = self._recv(8) + length = struct.unpack("!Q", length_data)[0] + + mask_key = "" + if mask: + mask_key = self._recv(4) + data = self._recv_strict(length) + if traceEnabled: + recieved = header_bytes + length_data + mask_key + data + logger.debug("recv: " + repr(recieved)) + + if mask: + data = ABNF.mask(mask_key, data) + + frame = ABNF(fin, rsv1, rsv2, rsv3, opcode, mask, data) + return frame + + def send_close(self, status = STATUS_NORMAL, reason = ""): + """ + send close data to the server. + + status: status code to send. see STATUS_XXX. + + reason: the reason to close. This must be string. + """ + if status < 0 or status >= ABNF.LENGTH_16: + raise ValueError("code is invalid range") + self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) + + + + def close(self, status = STATUS_NORMAL, reason = ""): + """ + Close Websocket object + + status: status code to send. see STATUS_XXX. + + reason: the reason to close. This must be string. + """ + if self.connected: + if status < 0 or status >= ABNF.LENGTH_16: + raise ValueError("code is invalid range") + + try: + self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) + timeout = self.sock.gettimeout() + self.sock.settimeout(3) + try: + frame = self.recv_frame() + if logger.isEnabledFor(logging.DEBUG): + logger.error("close status: " + repr(frame.data)) + except: + pass + self.sock.settimeout(timeout) + self.sock.shutdown(socket.SHUT_RDWR) + except: + pass + self._closeInternal() + + def _closeInternal(self): + self.connected = False + self.sock.close() + self.io_sock = self.sock + + def _recv(self, bufsize): + bytes = self.io_sock.recv(bufsize) + return bytes + + def _recv_strict(self, bufsize): + remaining = bufsize + bytes = "" + while remaining: + bytes += self._recv(remaining) + remaining = bufsize - len(bytes) + + return bytes + + def _recv_line(self): + line = [] + while True: + c = self._recv(1) + line.append(c) + if c == "\n": + break + return "".join(line) + +class WebSocketApp(object): + """ + Higher level of APIs are provided. + The interface is like JavaScript WebSocket object. + """ + def __init__(self, url, + on_open = None, on_message = None, on_error = None, + on_close = None, keep_running = True, get_mask_key = None): + """ + url: websocket url. + on_open: callable object which is called at opening websocket. + this function has one argument. The arugment is this class object. + on_message: callbale object which is called when recieved data. + on_message has 2 arguments. + The 1st arugment is this class object. + The passing 2nd arugment is utf-8 string which we get from the server. + on_error: callable object which is called when we get error. + on_error has 2 arguments. + The 1st arugment is this class object. + The passing 2nd arugment is exception object. + on_close: callable object which is called when closed the connection. + this function has one argument. The arugment is this class object. + keep_running: a boolean flag indicating whether the app's main loop should + keep running, defaults to True + get_mask_key: a callable to produce new mask keys, see the WebSocket.set_mask_key's + docstring for more information + """ + self.url = url + self.on_open = on_open + self.on_message = on_message + self.on_error = on_error + self.on_close = on_close + self.keep_running = keep_running + self.get_mask_key = get_mask_key + self.sock = None + + def send(self, data): + """ + send message. data must be utf-8 string or unicode. + """ + self.sock.send(data) + + def close(self): + """ + close websocket connection. + """ + self.keep_running = False + self.sock.close() + + def run_forever(self): + """ + run event loop for WebSocket framework. + This loop is infinite loop and is alive during websocket is available. + """ + if self.sock: + raise WebSocketException("socket is already opened") + try: + self.sock = WebSocket(self.get_mask_key) + self.sock.connect(self.url) + self._run_with_no_err(self.on_open) + while self.keep_running: + data = self.sock.recv() + if data is None: + break + self._run_with_no_err(self.on_message, data) + except Exception, e: + self._run_with_no_err(self.on_error, e) + finally: + self.sock.close() + self._run_with_no_err(self.on_close) + self.sock = None + + def _run_with_no_err(self, callback, *args): + if callback: + try: + callback(self, *args) + except Exception, e: + if logger.isEnabledFor(logging.DEBUG): + logger.error(e) + + +if __name__ == "__main__": + enableTrace(True) + ws = create_connection("ws://echo.websocket.org/") + print "Sending 'Hello, World'..." + ws.send("Hello, World") + print "Sent" + print "Receiving..." + result = ws.recv() + print "Received '%s'" % result + ws.close() |