Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/pybot/usb/backend/libusb1.py
diff options
context:
space:
mode:
Diffstat (limited to 'pybot/usb/backend/libusb1.py')
-rw-r--r--pybot/usb/backend/libusb1.py369
1 files changed, 300 insertions, 69 deletions
diff --git a/pybot/usb/backend/libusb1.py b/pybot/usb/backend/libusb1.py
index bd7d16f..5396092 100644
--- a/pybot/usb/backend/libusb1.py
+++ b/pybot/usb/backend/libusb1.py
@@ -1,8 +1,8 @@
-# Copyright (C) 2009-2011 Wander Lairson Costa
-#
+# Copyright (C) 2009-2013 Wander Lairson Costa
+#
# The following terms apply to all files associated
# with the software unless explicitly disclaimed in individual files.
-#
+#
# The authors hereby grant permission to use, copy, modify, distribute,
# and license this software and its documentation for any purpose, provided
# that existing copyright notices are retained in all copies and that this
@@ -12,13 +12,13 @@
# and need not follow the licensing terms described here, provided that
# the new terms are clearly indicated on the first page of each file where
# they apply.
-#
+#
# IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
# FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
# ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
# DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
-#
+#
# THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE
@@ -34,6 +34,8 @@ import logging
from usb._debug import methodtrace
import usb._interop as _interop
import errno
+import math
+from usb.core import USBError
__author__ = 'Wander Lairson Costa'
@@ -53,12 +55,29 @@ __all__ = [
'LIBUSB_ERROR_NO_MEM',
'LIBUSB_ERROR_NOT_SUPPORTED',
'LIBUSB_ERROR_OTHER'
+ 'LIBUSB_TRANSFER_COMPLETED',
+ 'LIBUSB_TRANSFER_ERROR',
+ 'LIBUSB_TRANSFER_TIMED_OUT',
+ 'LIBUSB_TRANSFER_CANCELLED',
+ 'LIBUSB_TRANSFER_STALL',
+ 'LIBUSB_TRANSFER_NO_DEVICE',
+ 'LIBUSB_TRANSFER_OVERFLOW'
]
_logger = logging.getLogger('usb.backend.libusb1')
# libusb.h
+# transfer_type codes
+# Control endpoint
+_LIBUSB_TRANSFER_TYPE_CONTROL = 0,
+# Isochronous endpoint
+_LIBUSB_TRANSFER_TYPE_ISOCHRONOUS = 1
+# Bulk endpoint
+_LIBUSB_TRANSFER_TYPE_BULK = 2
+# Interrupt endpoint
+_LIBUSB_TRANSFER_TYPE_INTERRUPT = 3
+
# return codes
LIBUSB_SUCCESS = 0
@@ -96,7 +115,7 @@ _str_error = {
# map return code to errno values
_libusb_errno = {
- LIBUSB_SUCCESS:None,
+ 0:None,
LIBUSB_ERROR_IO:errno.__dict__.get('EIO', None),
LIBUSB_ERROR_INVALID_PARAM:errno.__dict__.get('EINVAL', None),
LIBUSB_ERROR_ACCESS:errno.__dict__.get('EACCES', None),
@@ -112,6 +131,41 @@ _libusb_errno = {
LIBUSB_ERROR_OTHER:None
}
+# Transfer status codes:
+# Note that this does not indicate
+# that the entire amount of requested data was transferred.
+LIBUSB_TRANSFER_COMPLETED = 0
+LIBUSB_TRANSFER_ERROR = 1
+LIBUSB_TRANSFER_TIMED_OUT = 2
+LIBUSB_TRANSFER_CANCELLED = 3
+LIBUSB_TRANSFER_STALL = 4
+LIBUSB_TRANSFER_NO_DEVICE = 5
+LIBUSB_TRANSFER_OVERFLOW = 6
+
+# map return codes to strings
+_str_transfer_error = {
+ LIBUSB_TRANSFER_COMPLETED:'Success (no error)',
+ LIBUSB_TRANSFER_ERROR:'Transfer failed',
+ LIBUSB_TRANSFER_TIMED_OUT:'Transfer timed out',
+ LIBUSB_TRANSFER_CANCELLED:'Transfer was cancelled',
+ LIBUSB_TRANSFER_STALL:'For bulk/interrupt endpoints: halt condition '\
+ 'detected (endpoint stalled). For control '\
+ 'endpoints: control request not supported.',
+ LIBUSB_TRANSFER_NO_DEVICE:'Device was disconnected',
+ LIBUSB_TRANSFER_OVERFLOW:'Device sent more data than requested'
+}
+
+# map transfer codes to errno codes
+_transfer_errno = {
+ LIBUSB_TRANSFER_COMPLETED:0,
+ LIBUSB_TRANSFER_ERROR:errno.__dict__.get('EIO', None),
+ LIBUSB_TRANSFER_TIMED_OUT:errno.__dict__.get('ETIMEDOUT', None),
+ LIBUSB_TRANSFER_CANCELLED:errno.__dict__.get('EAGAIN', None),
+ LIBUSB_TRANSFER_STALL:errno.__dict__.get('EIO', None),
+ LIBUSB_TRANSFER_NO_DEVICE:errno.__dict__.get('ENODEV', None),
+ LIBUSB_TRANSFER_OVERFLOW:errno.__dict__.get('EOVERFLOW', None)
+}
+
# Data structures
class _libusb_endpoint_descriptor(Structure):
@@ -173,15 +227,49 @@ class _libusb_device_descriptor(Structure):
('iSerialNumber', c_uint8),
('bNumConfigurations', c_uint8)]
-_lib = None
-_init = None
+
+# Isochronous packet descriptor.
+class _libusb_iso_packet_descriptor(Structure):
+ _fields_ = [('length', c_uint),
+ ('actual_length', c_uint),
+ ('status', c_int)] # enum libusb_transfer_status
_libusb_device_handle = c_void_p
+class _libusb_transfer(Structure):
+ pass
+_libusb_transfer_p = POINTER(_libusb_transfer)
+
+_libusb_transfer_cb_fn_p = CFUNCTYPE(None, _libusb_transfer_p)
+
+_libusb_transfer._fields_ = [('dev_handle', _libusb_device_handle),
+ ('flags', c_uint8),
+ ('endpoint', c_uint8),
+ ('type', c_uint8),
+ ('timeout', c_uint),
+ ('status', c_int), # enum libusb_transfer_status
+ ('length', c_int),
+ ('actual_length', c_int),
+ ('callback', _libusb_transfer_cb_fn_p),
+ ('user_data', py_object),
+ ('buffer', c_void_p),
+ ('num_iso_packets', c_int),
+ ('iso_packet_desc', _libusb_iso_packet_descriptor)
+]
+
+def _get_iso_packet_list(transfer):
+ list_type = _libusb_iso_packet_descriptor * transfer.num_iso_packets
+ return list_type.from_address(addressof(transfer.iso_packet_desc))
+
+_lib = None
+
def _load_library():
if sys.platform != 'cygwin':
candidates = ('usb-1.0', 'libusb-1.0', 'usb')
for candidate in candidates:
+ if sys.platform == 'win32':
+ candidate = candidate + '.dll'
+
libname = ctypes.util.find_library(candidate)
if libname is not None: break
else:
@@ -247,7 +335,7 @@ def _setup_prototypes(lib):
lib.libusb_set_configuration.argtypes = [_libusb_device_handle, c_int]
# int libusb_get_configuration(libusb_device_handle *dev,
- # int *config)
+ # int *config)
lib.libusb_get_configuration.argtypes = [_libusb_device_handle, POINTER(c_int)]
# int libusb_claim_interface(libusb_device_handle *dev,
@@ -340,7 +428,7 @@ def _setup_prototypes(lib):
lib.libusb_control_transfer.argtypes = [
_libusb_device_handle,
c_uint8,
- c_uint8,
+ c_uint8,
c_uint16,
c_uint16,
POINTER(c_ubyte),
@@ -382,6 +470,79 @@ def _setup_prototypes(lib):
c_uint
]
+ # libusb_transfer* libusb_alloc_transfer(int iso_packets);
+ lib.libusb_alloc_transfer.argtypes = [c_int]
+ lib.libusb_alloc_transfer.restype = POINTER(_libusb_transfer)
+
+ # void libusb_free_transfer(struct libusb_transfer *transfer)
+ lib.libusb_free_transfer.argtypes = [POINTER(_libusb_transfer)]
+
+ # int libusb_submit_transfer(struct libusb_transfer *transfer);
+ lib.libusb_submit_transfer.argtypes = [POINTER(_libusb_transfer)]
+
+ # void libusb_set_iso_packet_lengths(
+ # libusb_transfer* transfer,
+ # unsigned int length
+ # );
+ def libusb_set_iso_packet_lengths(transfer_p, length):
+ r"""This function is inline in the libusb.h file, so we must implement
+ it.
+
+ lib.libusb_set_iso_packet_lengths.argtypes = [
+ POINTER(_libusb_transfer),
+ c_int
+ ]
+ """
+ transfer = transfer_p.contents
+ for iso_packet_desc in _get_iso_packet_list(transfer):
+ iso_packet_desc.length = length
+ lib.libusb_set_iso_packet_lengths = libusb_set_iso_packet_lengths
+
+ #int libusb_get_max_iso_packet_size(libusb_device* dev,
+ # unsigned char endpoint);
+ lib.libusb_get_max_iso_packet_size.argtypes = [c_void_p,
+ c_ubyte]
+
+ # void libusb_fill_iso_transfer(
+ # struct libusb_transfer* transfer,
+ # libusb_device_handle* dev_handle,
+ # unsigned char endpoint,
+ # unsigned char* buffer,
+ # int length,
+ # int num_iso_packets,
+ # libusb_transfer_cb_fn callback,
+ # void * user_data,
+ # unsigned int timeout
+ # );
+ def libusb_fill_iso_transfer(_libusb_transfer_p, dev_handle, endpoint, buffer, length,
+ num_iso_packets, callback, user_data, timeout):
+ r"""This function is inline in the libusb.h file, so we must implement
+ it.
+
+ lib.libusb_fill_iso_transfer.argtypes = [
+ _libusb_transfer,
+ _libusb_device_handle,
+ c_ubyte,
+ POINTER(c_ubyte),
+ c_int,
+ c_int,
+ _libusb_transfer_cb_fn_p,
+ c_void_p,
+ c_uint
+ ]
+ """
+ transfer = _libusb_transfer_p.contents
+ transfer.dev_handle = dev_handle
+ transfer.endpoint = endpoint
+ transfer.type = _LIBUSB_TRANSFER_TYPE_ISOCHRONOUS
+ transfer.timeout = timeout
+ transfer.buffer = cast(buffer, c_void_p)
+ transfer.length = length
+ transfer.num_iso_packets = num_iso_packets
+ transfer.user_data = user_data
+ transfer.callback = callback
+ lib.libusb_fill_iso_transfer = libusb_fill_iso_transfer
+
# uint8_t libusb_get_bus_number(libusb_device *dev)
lib.libusb_get_bus_number.argtypes = [c_void_p]
lib.libusb_get_bus_number.restype = c_uint8
@@ -389,6 +550,7 @@ def _setup_prototypes(lib):
# uint8_t libusb_get_device_address(libusb_device *dev)
lib.libusb_get_device_address.argtypes = [c_void_p]
lib.libusb_get_device_address.restype = c_uint8
+
try:
# uint8_t libusb_get_port_number(libusb_device *dev)
lib.libusb_get_port_number.argtypes = [c_void_p]
@@ -396,13 +558,15 @@ def _setup_prototypes(lib):
except AttributeError:
pass
+ #int libusb_handle_events(libusb_context *ctx);
+ lib.libusb_handle_events.argtypes = [c_void_p]
+
# check a libusb function call
def _check(retval):
if isinstance(retval, int):
retval = c_int(retval)
if isinstance(retval, c_int):
if retval.value < 0:
- from usb.core import USBError
ret = retval.value
raise USBError(_str_error[ret], ret, _libusb_errno[ret])
return retval
@@ -432,20 +596,13 @@ class _ConfigDescriptor(object):
def __getattr__(self, name):
return getattr(self.desc.contents, name)
-# initialize and finalize the library
-class _Initializer(object):
- def __init__(self):
- _check(_lib.libusb_init(None))
- def __del__(self):
- _lib.libusb_exit(None)
-
# iterator for libusb devices
class _DevIterator(object):
- def __init__(self):
+ def __init__(self, ctx):
self.dev_list = POINTER(c_void_p)()
self.num_devs = _check(_lib.libusb_get_device_list(
- None,
+ ctx,
byref(self.dev_list))
).value
def __iter__(self):
@@ -454,30 +611,104 @@ class _DevIterator(object):
def __del__(self):
_lib.libusb_free_device_list(self.dev_list, 1)
+class _DeviceHandle(object):
+ def __init__(self, dev):
+ self.handle = _libusb_device_handle()
+ self.devid = dev.devid
+ _check(_lib.libusb_open(self.devid, byref(self.handle)))
+
+class _IsoTransferHandler(object):
+ def __init__(self, dev_handle, ep, buff, timeout):
+ address, length = buff.buffer_info()
+
+ packet_length = _lib.libusb_get_max_iso_packet_size(dev_handle.devid, ep)
+ packet_count = int(math.ceil(float(length) / packet_length))
+
+ self.transfer = _lib.libusb_alloc_transfer(packet_count)
+
+ _lib.libusb_fill_iso_transfer(self.transfer,
+ dev_handle.handle,
+ ep,
+ cast(address, POINTER(c_ubyte)),
+ length,
+ packet_count,
+ _libusb_transfer_cb_fn_p(self.__callback),
+ None,
+ timeout)
+
+ self.__set_packets_length(length, packet_length)
+
+ def __del__(self):
+ _lib.libusb_free_transfer(self.transfer)
+
+ def submit(self, ctx = None):
+ self.__callback_done = 0
+ _check(_lib.libusb_submit_transfer(self.transfer))
+
+ while not self.__callback_done:
+ _check(_lib.libusb_handle_events(ctx))
+
+ return self.__compute_size_transf_data()
+
+ def __compute_size_transf_data(self):
+ return sum([t.actual_length for t in
+ _get_iso_packet_list(self.transfer.contents)])
+
+ def __set_packets_length(self, n, packet_length):
+ _lib.libusb_set_iso_packet_lengths(self.transfer, packet_length)
+ r = n % packet_length
+ if r:
+ iso_packets = _get_iso_packet_list(self.transfer.contents)
+ iso_packets[-1].length = r
+
+ def __callback(self, transfer):
+ if transfer.contents.status == LIBUSB_TRANSFER_COMPLETED:
+ self.__callback_done = 1
+ else:
+ status = int(transfer.contents.status)
+ raise usb.USBError(_str_transfer_error[status],
+ status,
+ _transfer_errno[status])
+
# implementation of libusb 1.0 backend
class _LibUSB(usb.backend.IBackend):
@methodtrace(_logger)
+ def __init__(self, lib):
+ usb.backend.IBackend.__init__(self)
+ self.lib = lib
+ self.ctx = c_void_p()
+ _check(self.lib.libusb_init(byref(self.ctx)))
+
+ @methodtrace(_logger)
+ def __del__(self):
+ self.lib.libusb_exit(self.ctx)
+
+
+ @methodtrace(_logger)
def enumerate_devices(self):
- return _DevIterator()
+ return _DevIterator(self.ctx)
@methodtrace(_logger)
def get_device_descriptor(self, dev):
dev_desc = _libusb_device_descriptor()
- _check(_lib.libusb_get_device_descriptor(dev.devid, byref(dev_desc)))
- dev_desc.bus = _lib.libusb_get_bus_number(dev.devid)
- dev_desc.address = _lib.libusb_get_device_address(dev.devid)
+ _check(self.lib.libusb_get_device_descriptor(dev.devid, byref(dev_desc)))
+ dev_desc.bus = self.lib.libusb_get_bus_number(dev.devid)
+ dev_desc.address = self.lib.libusb_get_device_address(dev.devid)
+
#Only available i newer versions of libusb
try:
- dev_desc.port_number = _lib.libusb_get_port_number(dev.devid)
+ dev_desc.port_number = self.lib.libusb_get_port_number(dev.devid)
except AttributeError:
dev_desc.port_number = None
+
return dev_desc
@methodtrace(_logger)
def get_configuration_descriptor(self, dev, config):
cfg = POINTER(_libusb_config_descriptor)()
- _check(_lib.libusb_get_config_descriptor(dev.devid,
- config, byref(cfg)))
+ _check(self.lib.libusb_get_config_descriptor(
+ dev.devid,
+ config, byref(cfg)))
return _ConfigDescriptor(cfg)
@methodtrace(_logger)
@@ -499,41 +730,40 @@ class _LibUSB(usb.backend.IBackend):
@methodtrace(_logger)
def open_device(self, dev):
- handle = _libusb_device_handle()
- _check(_lib.libusb_open(dev.devid, byref(handle)))
- return handle
+ return _DeviceHandle(dev)
@methodtrace(_logger)
def close_device(self, dev_handle):
- _lib.libusb_close(dev_handle)
+ self.lib.libusb_close(dev_handle.handle)
@methodtrace(_logger)
def set_configuration(self, dev_handle, config_value):
- _check(_lib.libusb_set_configuration(dev_handle, config_value))
+ _check(self.lib.libusb_set_configuration(dev_handle.handle, config_value))
@methodtrace(_logger)
def get_configuration(self, dev_handle):
config = c_int()
- _check(_lib.libusb_get_configuration(dev_handle, byref(config)))
+ _check(self.lib.libusb_get_configuration(dev_handle.handle, byref(config)))
return config.value
@methodtrace(_logger)
def set_interface_altsetting(self, dev_handle, intf, altsetting):
- _check(_lib.libusb_set_interface_alt_setting(dev_handle,
- intf,
- altsetting))
+ _check(self.lib.libusb_set_interface_alt_setting(
+ dev_handle.handle,
+ intf,
+ altsetting))
@methodtrace(_logger)
def claim_interface(self, dev_handle, intf):
- _check(_lib.libusb_claim_interface(dev_handle, intf))
+ _check(self.lib.libusb_claim_interface(dev_handle.handle, intf))
@methodtrace(_logger)
def release_interface(self, dev_handle, intf):
- _check(_lib.libusb_release_interface(dev_handle, intf))
+ _check(self.lib.libusb_release_interface(dev_handle.handle, intf))
@methodtrace(_logger)
def bulk_write(self, dev_handle, ep, intf, data, timeout):
- return self.__write(_lib.libusb_bulk_transfer,
+ return self.__write(self.lib.libusb_bulk_transfer,
dev_handle,
ep,
intf,
@@ -542,7 +772,7 @@ class _LibUSB(usb.backend.IBackend):
@methodtrace(_logger)
def bulk_read(self, dev_handle, ep, intf, size, timeout):
- return self.__read(_lib.libusb_bulk_transfer,
+ return self.__read(self.lib.libusb_bulk_transfer,
dev_handle,
ep,
intf,
@@ -551,7 +781,7 @@ class _LibUSB(usb.backend.IBackend):
@methodtrace(_logger)
def intr_write(self, dev_handle, ep, intf, data, timeout):
- return self.__write(_lib.libusb_interrupt_transfer,
+ return self.__write(self.lib.libusb_interrupt_transfer,
dev_handle,
ep,
intf,
@@ -560,22 +790,23 @@ class _LibUSB(usb.backend.IBackend):
@methodtrace(_logger)
def intr_read(self, dev_handle, ep, intf, size, timeout):
- return self.__read(_lib.libusb_interrupt_transfer,
+ return self.__read(self.lib.libusb_interrupt_transfer,
dev_handle,
ep,
intf,
size,
timeout)
-# TODO: implement isochronous
-# @methodtrace(_logger)
-# def iso_write(self, dev_handle, ep, intf, data, timeout):
-# pass
-
+ @methodtrace(_logger)
+ def iso_write(self, dev_handle, ep, intf, data, timeout):
+ handler = _IsoTransferHandler(dev_handle, ep, data, timeout)
+ return handler.submit(self.ctx)
-# @methodtrace(_logger)
-# def iso_read(self, dev_handle, ep, intf, size, timeout):
-# pass
+ @methodtrace(_logger)
+ def iso_read(self, dev_handle, ep, intf, size, timeout):
+ data = _interop.as_array('\x00' * size)
+ handler = _IsoTransferHandler(dev_handle, ep, data, timeout)
+ return data[:handler.submit(self.ctx)]
@methodtrace(_logger)
def ctrl_transfer(self,
@@ -594,15 +825,15 @@ class _LibUSB(usb.backend.IBackend):
addr, length = buff.buffer_info()
length *= buff.itemsize
- ret = _check(_lib.libusb_control_transfer(dev_handle,
- bmRequestType,
- bRequest,
- wValue,
- wIndex,
- cast(addr,
- POINTER(c_ubyte)),
- length,
- timeout))
+ ret = _check(self.lib.libusb_control_transfer(
+ dev_handle.handle,
+ bmRequestType,
+ bRequest,
+ wValue,
+ wIndex,
+ cast(addr, POINTER(c_ubyte)),
+ length,
+ timeout))
if usb.util.ctrl_direction(bmRequestType) == usb.util.CTRL_OUT:
return ret.value
@@ -611,25 +842,26 @@ class _LibUSB(usb.backend.IBackend):
@methodtrace(_logger)
def reset_device(self, dev_handle):
- _check(_lib.libusb_reset_device(dev_handle))
+ _check(self.lib.libusb_reset_device(dev_handle.handle))
@methodtrace(_logger)
def is_kernel_driver_active(self, dev_handle, intf):
- return bool(_check(_lib.libusb_kernel_driver_active(dev_handle, intf)))
+ return bool(_check(self.lib.libusb_kernel_driver_active(dev_handle.handle,
+ intf)))
@methodtrace(_logger)
def detach_kernel_driver(self, dev_handle, intf):
- _check(_lib.libusb_detach_kernel_driver(dev_handle, intf))
+ _check(self.lib.libusb_detach_kernel_driver(dev_handle.handle, intf))
@methodtrace(_logger)
def attach_kernel_driver(self, dev_handle, intf):
- _check(_lib.libusb_attach_kernel_driver(dev_handle, intf))
+ _check(self.lib.libusb_attach_kernel_driver(dev_handle.handle, intf))
def __write(self, fn, dev_handle, ep, intf, data, timeout):
address, length = data.buffer_info()
length *= data.itemsize
transferred = c_int()
- retval = fn(dev_handle,
+ retval = fn(dev_handle.handle,
ep,
cast(address, POINTER(c_ubyte)),
length,
@@ -642,11 +874,11 @@ class _LibUSB(usb.backend.IBackend):
return transferred.value
def __read(self, fn, dev_handle, ep, intf, size, timeout):
- data = _interop.as_array((0,) * size)
+ data = _interop.as_array('\x00' * size)
address, length = data.buffer_info()
length *= data.itemsize
transferred = c_int()
- retval = fn(dev_handle,
+ retval = fn(dev_handle.handle,
ep,
cast(address, POINTER(c_ubyte)),
length,
@@ -658,13 +890,12 @@ class _LibUSB(usb.backend.IBackend):
return data[:transferred.value]
def get_backend():
- global _lib, _init
+ global _lib
try:
if _lib is None:
_lib = _load_library()
_setup_prototypes(_lib)
- _init = _Initializer()
- return _LibUSB()
+ return _LibUSB(_lib)
except Exception:
_logger.error('Error loading libusb 1.0 backend', exc_info=True)
return None