Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGonzalo Odiard <godiard@gmail.com>2013-05-06 21:01:13 (GMT)
committer Gonzalo Odiard <godiard@gmail.com>2013-05-07 11:56:37 (GMT)
commit71aeb426148dabc39dd910cdb473f908ab4e63aa (patch)
treec357e96ed4024b4c33e475b6345639d32990c731
parenta6b7c2768f30d71d4ca1c2cafd2da8e6e03fb812 (diff)
Implement upload of files from the client using websockets
Now the upload from all sides is done with the add button in the toolbar. The ui is not blocked, only a watch cursor is shown, while the file is uploaded. Signed-off-by: Gonzalo Odiard <gonzalo@laptop.org>
-rw-r--r--activity.py48
-rw-r--r--server.py38
-rw-r--r--utils.py61
-rw-r--r--web/index.html17
-rw-r--r--websocket.py742
5 files changed, 875 insertions, 31 deletions
diff --git a/activity.py b/activity.py
index dd19b64..69ea814 100644
--- a/activity.py
+++ b/activity.py
@@ -20,6 +20,7 @@ from gettext import gettext as _
from gi.repository import GObject
GObject.threads_init()
from gi.repository import Gtk
+from gi.repository import Gdk
from gi.repository import WebKit
import telepathy
@@ -64,7 +65,9 @@ class JournalShare(activity.Activity):
self._activity_root = activity.get_activity_root()
self._jm = JournalManager(self._activity_root)
- self.server_proc = None
+ # master is the activity in the activity who started the communication
+ self._master = False
+ self._ip = '0.0.0.0'
if not self.shared_activity:
# Get a free socket
@@ -78,6 +81,7 @@ class JournalShare(activity.Activity):
#TODO: check available port
server.run_server(self._activity_path, self._activity_root,
self._jm, self.port)
+ self._master = True
toolbar_box = ToolbarBox()
@@ -91,12 +95,13 @@ class JournalShare(activity.Activity):
add_button.connect('clicked', self.__add_clicked_cb)
toolbar_box.toolbar.insert(add_button, -1)
- add_favorites_button = ToolButton('emblem-favorite')
- add_favorites_button.set_tooltip(_('Add favorite items to share'))
- add_favorites_button.show()
- add_favorites_button.connect('clicked',
- self.__add_favorites_clicked_cb)
- toolbar_box.toolbar.insert(add_favorites_button, -1)
+ if self._master:
+ add_favorites_button = ToolButton('emblem-favorite')
+ add_favorites_button.set_tooltip(_('Add favorite items to share'))
+ add_favorites_button.show()
+ add_favorites_button.connect('clicked',
+ self.__add_favorites_clicked_cb)
+ toolbar_box.toolbar.insert(add_favorites_button, -1)
separator = Gtk.SeparatorToolItem()
separator.props.draw = False
@@ -165,11 +170,28 @@ class JournalShare(activity.Activity):
chooser.get_selected_object())
jobject = chooser.get_selected_object()
if jobject and jobject.file_path:
- self._jm.append_to_shared_items(jobject.object_id)
+ if self._master:
+ self._jm.append_to_shared_items(jobject.object_id)
+ else:
+ tmp_path = os.path.join(self._activity_root,
+ 'instance')
+ logging.error('temp_path %s', tmp_path)
+ packaged_file_path = utils.package_ds_object(
+ jobject, tmp_path)
+ url = 'ws://%s:%d/websocket/upload' % (self._ip,
+ self.port)
+ uploader = utils.Uploader(packaged_file_path, url)
+ uploader.connect('uploaded', self.__uploaded_cb)
+ cursor = Gdk.Cursor.new(Gdk.CursorType.WATCH)
+ self.get_window().set_cursor(cursor)
+ uploader.start()
finally:
chooser.destroy()
del chooser
+ def __uploaded_cb(self, uploader):
+ self.get_window().set_cursor(None)
+
def __add_favorites_clicked_cb(self, button):
self._jm.set_shared_items(['*'])
@@ -200,9 +222,11 @@ class JournalShare(activity.Activity):
assert isinstance(addr[0], str)
assert isinstance(addr[1], (int, long))
assert addr[1] > 0 and addr[1] < 65536
- port = int(addr[1])
+ self._ip = addr[0]
+ self.port = int(addr[1])
- self.view.load_uri('http://%s:%d/web/index.html' % (addr[0], port))
+ self.view.load_uri('http://%s:%d/web/index.html' %
+ (self._ip, self.port))
return False
def _start_sharing(self):
@@ -219,7 +243,7 @@ class JournalShare(activity.Activity):
def watch_for_tubes(self):
"""Watch for new tubes."""
- if self.server_proc is not None:
+ if self._master:
# I am sharing, then, don't try to connect to the tubes
return
@@ -308,8 +332,6 @@ class JournalShare(activity.Activity):
f.close()
def can_close(self):
- if self.server_proc is not None:
- self.server_proc.kill()
self._allow_suspend()
# remove temporary files
instance_path = self._activity_root + '/instance/'
diff --git a/server.py b/server.py
index 43f3a32..d715631 100644
--- a/server.py
+++ b/server.py
@@ -25,6 +25,8 @@ from tornado import websocket
from gi.repository import GLib
import utils
+import tempfile
+import base64
class UploaderHandler(web.RequestHandler):
@@ -96,6 +98,40 @@ class JournalWebSocketHandler(websocket.WebSocketHandler):
logging.error("WebSocket closed")
+class WebSocketUploadHandler(websocket.WebSocketHandler):
+
+ def initialize(self, instance_path, journal_manager):
+ self._instance_path = instance_path
+ self._jm = journal_manager
+
+ def open(self):
+ self._tmp_file = tempfile.NamedTemporaryFile(
+ mode='r+', dir=self._instance_path)
+
+ def on_message(self, message):
+ self._tmp_file.write(message)
+ self._tmp_file.flush()
+ self.write_message('NEXT')
+
+ def on_close(self):
+ # save to the journal
+ # decode the file
+ self._decoded_tmp_file = tempfile.NamedTemporaryFile(
+ mode='r+', dir=self._instance_path)
+ self._tmp_file.seek(0)
+ base64.decode(self._tmp_file, self._decoded_tmp_file)
+ self._decoded_tmp_file.flush()
+
+ metadata, preview_data, file_path = \
+ utils.unpackage_ds_object(self._decoded_tmp_file.name, None)
+ logging.error('METADATA %s', metadata)
+
+ GLib.idle_add(self._jm.create_object, file_path, metadata,
+ preview_data)
+ self._tmp_file.close()
+ self._decoded_tmp_file.close()
+
+
def run_server(activity_path, activity_root, jm, port):
from threading import Thread
@@ -110,6 +146,8 @@ def run_server(activity_path, activity_root, jm, port):
(r"/datastore/(.*)", DatastoreHandler, {"path": instance_path}),
(r"/websocket", JournalWebSocketHandler,
{"instance_path": instance_path, "journal_manager": jm}),
+ (r"/websocket/upload", WebSocketUploadHandler,
+ {"instance_path": instance_path, "journal_manager": jm}),
(r"/upload", UploaderHandler, {"instance_path": instance_path,
"static_path": static_path,
"journal_manager": jm
diff --git a/utils.py b/utils.py
index 5d4239d..65fa98f 100644
--- a/utils.py
+++ b/utils.py
@@ -15,11 +15,65 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-import os
+from gi.repository import GObject
import base64
+import os
import json
import dbus
from zipfile import ZipFile
+import logging
+
+import websocket
+import tempfile
+
+CHUNK_SIZE = 2048
+
+
+class Uploader(GObject.GObject):
+
+ __gsignals__ = {'uploaded': (GObject.SignalFlags.RUN_FIRST, None, ([]))}
+
+ def __init__(self, file_path, url):
+ GObject.GObject.__init__(self)
+ logging.error('websocket url %s', url)
+ # base64 encode the file
+ self._file = tempfile.TemporaryFile(mode='r+')
+ base64.encode(open(file_path, 'r'), self._file)
+ self._file.seek(0)
+
+ self._ws = websocket.WebSocketApp(url,
+ on_open=self._on_open,
+ on_message=self._on_message,
+ on_error=self._on_error,
+ on_close=self._on_close)
+ self._chunk = str(self._file.read(CHUNK_SIZE))
+
+ def start(self):
+ from threading import Thread
+ upload_looop = Thread(target=self._ws.run_forever)
+ upload_looop.setDaemon(True)
+ upload_looop.start()
+
+ def _on_open(self, ws):
+ if self._chunk != '':
+ self._ws.send(self._chunk)
+ else:
+ self._ws.close()
+
+ def _on_message(self, ws, message):
+ self._chunk = self._file.read(CHUNK_SIZE)
+ if self._chunk != '':
+ self._ws.send(self._chunk)
+ else:
+ self._ws.close()
+
+ def _on_error(self, ws, error):
+ #self._ws.send(self._chunk)
+ pass
+
+ def _on_close(self, ws):
+ self._file.close()
+ GObject.idle_add(self.emit, 'uploaded')
def package_ds_object(dsobj, destination_path):
@@ -28,8 +82,10 @@ def package_ds_object(dsobj, destination_path):
the preview and the metadata
"""
object_id = dsobj.object_id
+ logging.error('id %s', object_id)
preview_path = None
+ logging.error('before preview')
if 'preview' in dsobj.metadata:
# TODO: copied from expandedentry.py
# is needed because record is saving the preview encoded
@@ -45,6 +101,7 @@ def package_ds_object(dsobj, destination_path):
preview_file.write(preview)
preview_file.close()
+ logging.error('before metadata')
# create file with the metadata
metadata_path = os.path.join(destination_path,
'metadata_id_' + object_id)
@@ -56,6 +113,8 @@ def package_ds_object(dsobj, destination_path):
metadata_file.write(json.dumps(metadata))
metadata_file.close()
+ logging.error('before create zip')
+
# create a zip fileincluding metadata and preview
# to be read from the web server
file_path = os.path.join(destination_path, 'id_' + object_id + '.journal')
diff --git a/web/index.html b/web/index.html
index ea212bc..a2f133c 100644
--- a/web/index.html
+++ b/web/index.html
@@ -31,11 +31,6 @@
$('#header').css('background-color', owner_info.fill_color);
});
-
- if (local) {
- $('#uploadarea').hide();
- }
-
$.getJSON("/datastore/selected.json", function(selected) {
for (var i = 0; i < selected.length; i++) {
id = selected[i].id;
@@ -75,18 +70,6 @@
<div id="header">
</div>
- <div id="uploadarea">
- <table>
- <tr><td>
- <form action="/upload" method="post" enctype="multipart/form-data">
- Add from my Journal:<br>
- <input type="file" name="journal_item" size="30">
- <input type="submit" value="Send">
- </form>
- </td></tr>
- </table>
- </div>
-
<table id="journaltable">
</table>
diff --git a/websocket.py b/websocket.py
new file mode 100644
index 0000000..71d8b7e
--- /dev/null
+++ b/websocket.py
@@ -0,0 +1,742 @@
+"""
+websocket - WebSocket client library for Python
+
+Copyright (C) 2010 Hiroki Ohtani(liris)
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+
+
+import socket
+from urlparse import urlparse
+import os
+import struct
+import uuid
+import sha
+import base64
+import logging
+
+"""
+websocket python client.
+=========================
+
+This version support only hybi-13.
+Please see http://tools.ietf.org/html/rfc6455 for protocol.
+"""
+
+
+# websocket supported version.
+VERSION = 13
+
+# closing frame status codes.
+STATUS_NORMAL = 1000
+STATUS_GOING_AWAY = 1001
+STATUS_PROTOCOL_ERROR = 1002
+STATUS_UNSUPPORTED_DATA_TYPE = 1003
+STATUS_STATUS_NOT_AVAILABLE = 1005
+STATUS_ABNORMAL_CLOSED = 1006
+STATUS_INVALID_PAYLOAD = 1007
+STATUS_POLICY_VIOLATION = 1008
+STATUS_MESSAGE_TOO_BIG = 1009
+STATUS_INVALID_EXTENSION = 1010
+STATUS_UNEXPECTED_CONDITION = 1011
+STATUS_TLS_HANDSHAKE_ERROR = 1015
+
+logger = logging.getLogger()
+
+class WebSocketException(Exception):
+ """
+ websocket exeception class.
+ """
+ pass
+
+default_timeout = None
+traceEnabled = False
+
+def enableTrace(tracable):
+ """
+ turn on/off the tracability.
+
+ tracable: boolean value. if set True, tracability is enabled.
+ """
+ global traceEnabled
+ traceEnabled = tracable
+ if tracable:
+ if not logger.handlers:
+ logger.addHandler(logging.StreamHandler())
+ logger.setLevel(logging.DEBUG)
+
+def setdefaulttimeout(timeout):
+ """
+ Set the global timeout setting to connect.
+
+ timeout: default socket timeout time. This value is second.
+ """
+ global default_timeout
+ default_timeout = timeout
+
+def getdefaulttimeout():
+ """
+ Return the global timeout setting(second) to connect.
+ """
+ return default_timeout
+
+def _parse_url(url):
+ """
+ parse url and the result is tuple of
+ (hostname, port, resource path and the flag of secure mode)
+
+ url: url string.
+ """
+ if ":" not in url:
+ raise ValueError("url is invalid")
+
+ scheme, url = url.split(":", 1)
+ url = url.rstrip("/")
+
+ parsed = urlparse(url, scheme="http")
+ if parsed.hostname:
+ hostname = parsed.hostname
+ else:
+ raise ValueError("hostname is invalid")
+ port = 0
+ if parsed.port:
+ port = parsed.port
+
+ is_secure = False
+ if scheme == "ws":
+ if not port:
+ port = 80
+ elif scheme == "wss":
+ is_secure = True
+ if not port:
+ port = 443
+ else:
+ raise ValueError("scheme %s is invalid" % scheme)
+
+ if parsed.path:
+ resource = parsed.path
+ else:
+ resource = "/"
+
+ return (hostname, port, resource, is_secure)
+
+def create_connection(url, timeout=None, **options):
+ """
+ connect to url and return websocket object.
+
+ Connect to url and return the WebSocket object.
+ Passing optional timeout parameter will set the timeout on the socket.
+ If no timeout is supplied, the global default timeout setting returned by getdefauttimeout() is used.
+ You can customize using 'options'.
+ If you set "headers" dict object, you can set your own custom header.
+
+ >>> conn = create_connection("ws://echo.websocket.org/",
+ ... headers={"User-Agent": "MyProgram"})
+
+ timeout: socket timeout time. This value is integer.
+ if you set None for this value, it means "use default_timeout value"
+
+ options: current support option is only "header".
+ if you set header as dict value, the custom HTTP headers are added.
+ """
+ websock = WebSocket()
+ websock.settimeout(timeout != None and timeout or default_timeout)
+ websock.connect(url, **options)
+ return websock
+
+_MAX_INTEGER = (1 << 32) -1
+_AVAILABLE_KEY_CHARS = range(0x21, 0x2f + 1) + range(0x3a, 0x7e + 1)
+_MAX_CHAR_BYTE = (1<<8) -1
+
+# ref. Websocket gets an update, and it breaks stuff.
+# http://axod.blogspot.com/2010/06/websocket-gets-update-and-it-breaks.html
+
+def _create_sec_websocket_key():
+ uid = uuid.uuid4()
+ return base64.encodestring(uid.bytes).strip()
+
+_HEADERS_TO_CHECK = {
+ "upgrade": "websocket",
+ "connection": "upgrade",
+ }
+
+class _SSLSocketWrapper(object):
+ def __init__(self, sock):
+ self.ssl = socket.ssl(sock)
+
+ def recv(self, bufsize):
+ return self.ssl.read(bufsize)
+
+ def send(self, payload):
+ return self.ssl.write(payload)
+
+_BOOL_VALUES = (0, 1)
+def _is_bool(*values):
+ for v in values:
+ if v not in _BOOL_VALUES:
+ return False
+
+ return True
+
+class ABNF(object):
+ """
+ ABNF frame class.
+ see http://tools.ietf.org/html/rfc5234
+ and http://tools.ietf.org/html/rfc6455#section-5.2
+ """
+
+ # operation code values.
+ OPCODE_TEXT = 0x1
+ OPCODE_BINARY = 0x2
+ OPCODE_CLOSE = 0x8
+ OPCODE_PING = 0x9
+ OPCODE_PONG = 0xa
+
+ # available operation code value tuple
+ OPCODES = (OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE,
+ OPCODE_PING, OPCODE_PONG)
+
+ # opcode human readable string
+ OPCODE_MAP = {
+ OPCODE_TEXT: "text",
+ OPCODE_BINARY: "binary",
+ OPCODE_CLOSE: "close",
+ OPCODE_PING: "ping",
+ OPCODE_PONG: "pong"
+ }
+
+ # data length threashold.
+ LENGTH_7 = 0x7d
+ LENGTH_16 = 1 << 16
+ LENGTH_63 = 1 << 63
+
+ def __init__(self, fin = 0, rsv1 = 0, rsv2 = 0, rsv3 = 0,
+ opcode = OPCODE_TEXT, mask = 1, data = ""):
+ """
+ Constructor for ABNF.
+ please check RFC for arguments.
+ """
+ self.fin = fin
+ self.rsv1 = rsv1
+ self.rsv2 = rsv2
+ self.rsv3 = rsv3
+ self.opcode = opcode
+ self.mask = mask
+ self.data = data
+ self.get_mask_key = os.urandom
+
+ @staticmethod
+ def create_frame(data, opcode):
+ """
+ create frame to send text, binary and other data.
+
+ data: data to send. This is string value(byte array).
+ if opcode is OPCODE_TEXT and this value is uniocde,
+ data value is conveted into unicode string, automatically.
+
+ opcode: operation code. please see OPCODE_XXX.
+ """
+ if opcode == ABNF.OPCODE_TEXT and isinstance(data, unicode):
+ data = data.encode("utf-8")
+ # mask must be set if send data from client
+ return ABNF(1, 0, 0, 0, opcode, 1, data)
+
+ def format(self):
+ """
+ format this object to string(byte array) to send data to server.
+ """
+ if not _is_bool(self.fin, self.rsv1, self.rsv2, self.rsv3):
+ raise ValueError("not 0 or 1")
+ if self.opcode not in ABNF.OPCODES:
+ raise ValueError("Invalid OPCODE")
+ length = len(self.data)
+ if length >= ABNF.LENGTH_63:
+ raise ValueError("data is too long")
+
+ frame_header = chr(self.fin << 7
+ | self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4
+ | self.opcode)
+ if length < ABNF.LENGTH_7:
+ frame_header += chr(self.mask << 7 | length)
+ elif length < ABNF.LENGTH_16:
+ frame_header += chr(self.mask << 7 | 0x7e)
+ frame_header += struct.pack("!H", length)
+ else:
+ frame_header += chr(self.mask << 7 | 0x7f)
+ frame_header += struct.pack("!Q", length)
+
+ if not self.mask:
+ return frame_header + self.data
+ else:
+ mask_key = self.get_mask_key(4)
+ return frame_header + self._get_masked(mask_key)
+
+ def _get_masked(self, mask_key):
+ s = ABNF.mask(mask_key, self.data)
+ return mask_key + "".join(s)
+
+ @staticmethod
+ def mask(mask_key, data):
+ """
+ mask or unmask data. Just do xor for each byte
+
+ mask_key: 4 byte string(byte).
+
+ data: data to mask/unmask.
+ """
+ _m = map(ord, mask_key)
+ _d = map(ord, data)
+ for i in range(len(_d)):
+ _d[i] ^= _m[i % 4]
+ s = map(chr, _d)
+ return "".join(s)
+
+class WebSocket(object):
+ """
+ Low level WebSocket interface.
+ This class is based on
+ The WebSocket protocol draft-hixie-thewebsocketprotocol-76
+ http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76
+
+ We can connect to the websocket server and send/recieve data.
+ The following example is a echo client.
+
+ >>> import websocket
+ >>> ws = websocket.WebSocket()
+ >>> ws.connect("ws://echo.websocket.org")
+ >>> ws.send("Hello, Server")
+ >>> ws.recv()
+ 'Hello, Server'
+ >>> ws.close()
+
+ get_mask_key: a callable to produce new mask keys, see the set_mask_key
+ function's docstring for more details
+ """
+ def __init__(self, get_mask_key = None):
+ """
+ Initalize WebSocket object.
+ """
+ self.connected = False
+ self.io_sock = self.sock = socket.socket()
+ self.get_mask_key = get_mask_key
+
+ def set_mask_key(self, func):
+ """
+ set function to create musk key. You can custumize mask key generator.
+ Mainly, this is for testing purpose.
+
+ func: callable object. the fuct must 1 argument as integer.
+ The argument means length of mask key.
+ This func must be return string(byte array),
+ which length is argument specified.
+ """
+ self.get_mask_key = func
+
+ def settimeout(self, timeout):
+ """
+ Set the timeout to the websocket.
+
+ timeout: timeout time(second).
+ """
+ self.sock.settimeout(timeout)
+
+ def gettimeout(self):
+ """
+ Get the websocket timeout(second).
+ """
+ return self.sock.gettimeout()
+
+ def connect(self, url, **options):
+ """
+ Connect to url. url is websocket url scheme. ie. ws://host:port/resource
+ You can customize using 'options'.
+ If you set "headers" dict object, you can set your own custom header.
+
+ >>> ws = WebSocket()
+ >>> ws.connect("ws://echo.websocket.org/",
+ ... headers={"User-Agent": "MyProgram"})
+
+ timeout: socket timeout time. This value is integer.
+ if you set None for this value,
+ it means "use default_timeout value"
+
+ options: current support option is only "header".
+ if you set header as dict value,
+ the custom HTTP headers are added.
+
+ """
+ hostname, port, resource, is_secure = _parse_url(url)
+ # TODO: we need to support proxy
+ self.sock.connect((hostname, port))
+ if is_secure:
+ self.io_sock = _SSLSocketWrapper(self.sock)
+ self._handshake(hostname, port, resource, **options)
+
+ def _handshake(self, host, port, resource, **options):
+ sock = self.io_sock
+ headers = []
+ headers.append("GET %s HTTP/1.1" % resource)
+ headers.append("Upgrade: websocket")
+ headers.append("Connection: Upgrade")
+ if port == 80:
+ hostport = host
+ else:
+ hostport = "%s:%d" % (host, port)
+ headers.append("Host: %s" % hostport)
+ headers.append("Origin: %s" % hostport)
+
+ key = _create_sec_websocket_key()
+ headers.append("Sec-WebSocket-Key: %s" % key)
+ headers.append("Sec-WebSocket-Protocol: chat, superchat")
+ headers.append("Sec-WebSocket-Version: %s" % VERSION)
+ if "header" in options:
+ headers.extend(options["header"])
+
+ headers.append("")
+ headers.append("")
+
+ header_str = "\r\n".join(headers)
+ sock.send(header_str)
+ if traceEnabled:
+ logger.debug( "--- request header ---")
+ logger.debug( header_str)
+ logger.debug("-----------------------")
+
+ status, resp_headers = self._read_headers()
+ if status != 101:
+ self.close()
+ raise WebSocketException("Handshake Status %d" % status)
+
+ success = self._validate_header(resp_headers, key)
+ if not success:
+ self.close()
+ raise WebSocketException("Invalid WebSocket Header")
+
+ self.connected = True
+
+ def _validate_header(self, headers, key):
+ for k, v in _HEADERS_TO_CHECK.iteritems():
+ r = headers.get(k, None)
+ if not r:
+ return False
+ r = r.lower()
+ if v != r:
+ return False
+
+ result = headers.get("sec-websocket-accept", None)
+ if not result:
+ return False
+ result = result.lower()
+
+ value = key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
+ hashed = base64.encodestring(sha.sha(value).digest()).strip().lower()
+ return hashed == result
+
+ def _read_headers(self):
+ status = None
+ headers = {}
+ if traceEnabled:
+ logger.debug("--- response header ---")
+
+ while True:
+ line = self._recv_line()
+ if line == "\r\n":
+ break
+ line = line.strip()
+ if traceEnabled:
+ logger.debug(line)
+ if not status:
+ status_info = line.split(" ", 2)
+ status = int(status_info[1])
+ else:
+ kv = line.split(":", 1)
+ if len(kv) == 2:
+ key, value = kv
+ headers[key.lower()] = value.strip().lower()
+ else:
+ raise WebSocketException("Invalid header")
+
+ if traceEnabled:
+ logger.debug("-----------------------")
+
+ return status, headers
+
+ def send(self, payload, opcode = ABNF.OPCODE_TEXT):
+ """
+ Send the data as string.
+
+ payload: Payload must be utf-8 string or unicoce,
+ if the opcode is OPCODE_TEXT.
+ Otherwise, it must be string(byte array)
+
+ opcode: operation code to send. Please see OPCODE_XXX.
+ """
+ frame = ABNF.create_frame(payload, opcode)
+ if self.get_mask_key:
+ frame.get_mask_key = self.get_mask_key
+ data = frame.format()
+ self.io_sock.send(data)
+ if traceEnabled:
+ logger.debug("send: " + repr(data))
+
+ def ping(self, payload = ""):
+ """
+ send ping data.
+
+ payload: data payload to send server.
+ """
+ self.send(payload, ABNF.OPCODE_PING)
+
+ def pong(self, payload):
+ """
+ send pong data.
+
+ payload: data payload to send server.
+ """
+ self.send(payload, ABNF.OPCODE_PONG)
+
+ def recv(self):
+ """
+ Receive string data(byte array) from the server.
+
+ return value: string(byte array) value.
+ """
+ opcode, data = self.recv_data()
+ return data
+
+ def recv_data(self):
+ """
+ Recieve data with operation code.
+
+ return value: tuple of operation code and string(byte array) value.
+ """
+ while True:
+ frame = self.recv_frame()
+ if not frame:
+ # handle error:
+ # 'NoneType' object has no attribute 'opcode'
+ raise WebSocketException("Not a valid frame %s" % frame)
+ elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY):
+ return (frame.opcode, frame.data)
+ elif frame.opcode == ABNF.OPCODE_CLOSE:
+ self.send_close()
+ return (frame.opcode, None)
+ elif frame.opcode == ABNF.OPCODE_PING:
+ self.pong("Hi!")
+
+
+ def recv_frame(self):
+ """
+ recieve data as frame from server.
+
+ return value: ABNF frame object.
+ """
+ header_bytes = self._recv(2)
+ if not header_bytes:
+ return None
+ b1 = ord(header_bytes[0])
+ fin = b1 >> 7 & 1
+ rsv1 = b1 >> 6 & 1
+ rsv2 = b1 >> 5 & 1
+ rsv3 = b1 >> 4 & 1
+ opcode = b1 & 0xf
+ b2 = ord(header_bytes[1])
+ mask = b2 >> 7 & 1
+ length = b2 & 0x7f
+
+ length_data = ""
+ if length == 0x7e:
+ length_data = self._recv(2)
+ length = struct.unpack("!H", length_data)[0]
+ elif length == 0x7f:
+ length_data = self._recv(8)
+ length = struct.unpack("!Q", length_data)[0]
+
+ mask_key = ""
+ if mask:
+ mask_key = self._recv(4)
+ data = self._recv_strict(length)
+ if traceEnabled:
+ recieved = header_bytes + length_data + mask_key + data
+ logger.debug("recv: " + repr(recieved))
+
+ if mask:
+ data = ABNF.mask(mask_key, data)
+
+ frame = ABNF(fin, rsv1, rsv2, rsv3, opcode, mask, data)
+ return frame
+
+ def send_close(self, status = STATUS_NORMAL, reason = ""):
+ """
+ send close data to the server.
+
+ status: status code to send. see STATUS_XXX.
+
+ reason: the reason to close. This must be string.
+ """
+ if status < 0 or status >= ABNF.LENGTH_16:
+ raise ValueError("code is invalid range")
+ self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE)
+
+
+
+ def close(self, status = STATUS_NORMAL, reason = ""):
+ """
+ Close Websocket object
+
+ status: status code to send. see STATUS_XXX.
+
+ reason: the reason to close. This must be string.
+ """
+ if self.connected:
+ if status < 0 or status >= ABNF.LENGTH_16:
+ raise ValueError("code is invalid range")
+
+ try:
+ self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE)
+ timeout = self.sock.gettimeout()
+ self.sock.settimeout(3)
+ try:
+ frame = self.recv_frame()
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.error("close status: " + repr(frame.data))
+ except:
+ pass
+ self.sock.settimeout(timeout)
+ self.sock.shutdown(socket.SHUT_RDWR)
+ except:
+ pass
+ self._closeInternal()
+
+ def _closeInternal(self):
+ self.connected = False
+ self.sock.close()
+ self.io_sock = self.sock
+
+ def _recv(self, bufsize):
+ bytes = self.io_sock.recv(bufsize)
+ return bytes
+
+ def _recv_strict(self, bufsize):
+ remaining = bufsize
+ bytes = ""
+ while remaining:
+ bytes += self._recv(remaining)
+ remaining = bufsize - len(bytes)
+
+ return bytes
+
+ def _recv_line(self):
+ line = []
+ while True:
+ c = self._recv(1)
+ line.append(c)
+ if c == "\n":
+ break
+ return "".join(line)
+
+class WebSocketApp(object):
+ """
+ Higher level of APIs are provided.
+ The interface is like JavaScript WebSocket object.
+ """
+ def __init__(self, url,
+ on_open = None, on_message = None, on_error = None,
+ on_close = None, keep_running = True, get_mask_key = None):
+ """
+ url: websocket url.
+ on_open: callable object which is called at opening websocket.
+ this function has one argument. The arugment is this class object.
+ on_message: callbale object which is called when recieved data.
+ on_message has 2 arguments.
+ The 1st arugment is this class object.
+ The passing 2nd arugment is utf-8 string which we get from the server.
+ on_error: callable object which is called when we get error.
+ on_error has 2 arguments.
+ The 1st arugment is this class object.
+ The passing 2nd arugment is exception object.
+ on_close: callable object which is called when closed the connection.
+ this function has one argument. The arugment is this class object.
+ keep_running: a boolean flag indicating whether the app's main loop should
+ keep running, defaults to True
+ get_mask_key: a callable to produce new mask keys, see the WebSocket.set_mask_key's
+ docstring for more information
+ """
+ self.url = url
+ self.on_open = on_open
+ self.on_message = on_message
+ self.on_error = on_error
+ self.on_close = on_close
+ self.keep_running = keep_running
+ self.get_mask_key = get_mask_key
+ self.sock = None
+
+ def send(self, data):
+ """
+ send message. data must be utf-8 string or unicode.
+ """
+ self.sock.send(data)
+
+ def close(self):
+ """
+ close websocket connection.
+ """
+ self.keep_running = False
+ self.sock.close()
+
+ def run_forever(self):
+ """
+ run event loop for WebSocket framework.
+ This loop is infinite loop and is alive during websocket is available.
+ """
+ if self.sock:
+ raise WebSocketException("socket is already opened")
+ try:
+ self.sock = WebSocket(self.get_mask_key)
+ self.sock.connect(self.url)
+ self._run_with_no_err(self.on_open)
+ while self.keep_running:
+ data = self.sock.recv()
+ if data is None:
+ break
+ self._run_with_no_err(self.on_message, data)
+ except Exception, e:
+ self._run_with_no_err(self.on_error, e)
+ finally:
+ self.sock.close()
+ self._run_with_no_err(self.on_close)
+ self.sock = None
+
+ def _run_with_no_err(self, callback, *args):
+ if callback:
+ try:
+ callback(self, *args)
+ except Exception, e:
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.error(e)
+
+
+if __name__ == "__main__":
+ enableTrace(True)
+ ws = create_connection("ws://echo.websocket.org/")
+ print "Sending 'Hello, World'..."
+ ws.send("Hello, World")
+ print "Sent"
+ print "Receiving..."
+ result = ws.recv()
+ print "Received '%s'" % result
+ ws.close()