diff options
Diffstat (limited to 'pybot/usb/backend/libusb1.py')
-rw-r--r-- | pybot/usb/backend/libusb1.py | 369 |
1 files changed, 300 insertions, 69 deletions
diff --git a/pybot/usb/backend/libusb1.py b/pybot/usb/backend/libusb1.py index bd7d16f..5396092 100644 --- a/pybot/usb/backend/libusb1.py +++ b/pybot/usb/backend/libusb1.py @@ -1,8 +1,8 @@ -# Copyright (C) 2009-2011 Wander Lairson Costa -# +# Copyright (C) 2009-2013 Wander Lairson Costa +# # The following terms apply to all files associated # with the software unless explicitly disclaimed in individual files. -# +# # The authors hereby grant permission to use, copy, modify, distribute, # and license this software and its documentation for any purpose, provided # that existing copyright notices are retained in all copies and that this @@ -12,13 +12,13 @@ # and need not follow the licensing terms described here, provided that # the new terms are clearly indicated on the first page of each file where # they apply. -# +# # IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY # FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES # ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY # DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -# +# # THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, # INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE @@ -34,6 +34,8 @@ import logging from usb._debug import methodtrace import usb._interop as _interop import errno +import math +from usb.core import USBError __author__ = 'Wander Lairson Costa' @@ -53,12 +55,29 @@ __all__ = [ 'LIBUSB_ERROR_NO_MEM', 'LIBUSB_ERROR_NOT_SUPPORTED', 'LIBUSB_ERROR_OTHER' + 'LIBUSB_TRANSFER_COMPLETED', + 'LIBUSB_TRANSFER_ERROR', + 'LIBUSB_TRANSFER_TIMED_OUT', + 'LIBUSB_TRANSFER_CANCELLED', + 'LIBUSB_TRANSFER_STALL', + 'LIBUSB_TRANSFER_NO_DEVICE', + 'LIBUSB_TRANSFER_OVERFLOW' ] _logger = logging.getLogger('usb.backend.libusb1') # libusb.h +# transfer_type codes +# Control endpoint +_LIBUSB_TRANSFER_TYPE_CONTROL = 0, +# Isochronous endpoint +_LIBUSB_TRANSFER_TYPE_ISOCHRONOUS = 1 +# Bulk endpoint +_LIBUSB_TRANSFER_TYPE_BULK = 2 +# Interrupt endpoint +_LIBUSB_TRANSFER_TYPE_INTERRUPT = 3 + # return codes LIBUSB_SUCCESS = 0 @@ -96,7 +115,7 @@ _str_error = { # map return code to errno values _libusb_errno = { - LIBUSB_SUCCESS:None, + 0:None, LIBUSB_ERROR_IO:errno.__dict__.get('EIO', None), LIBUSB_ERROR_INVALID_PARAM:errno.__dict__.get('EINVAL', None), LIBUSB_ERROR_ACCESS:errno.__dict__.get('EACCES', None), @@ -112,6 +131,41 @@ _libusb_errno = { LIBUSB_ERROR_OTHER:None } +# Transfer status codes: +# Note that this does not indicate +# that the entire amount of requested data was transferred. +LIBUSB_TRANSFER_COMPLETED = 0 +LIBUSB_TRANSFER_ERROR = 1 +LIBUSB_TRANSFER_TIMED_OUT = 2 +LIBUSB_TRANSFER_CANCELLED = 3 +LIBUSB_TRANSFER_STALL = 4 +LIBUSB_TRANSFER_NO_DEVICE = 5 +LIBUSB_TRANSFER_OVERFLOW = 6 + +# map return codes to strings +_str_transfer_error = { + LIBUSB_TRANSFER_COMPLETED:'Success (no error)', + LIBUSB_TRANSFER_ERROR:'Transfer failed', + LIBUSB_TRANSFER_TIMED_OUT:'Transfer timed out', + LIBUSB_TRANSFER_CANCELLED:'Transfer was cancelled', + LIBUSB_TRANSFER_STALL:'For bulk/interrupt endpoints: halt condition '\ + 'detected (endpoint stalled). For control '\ + 'endpoints: control request not supported.', + LIBUSB_TRANSFER_NO_DEVICE:'Device was disconnected', + LIBUSB_TRANSFER_OVERFLOW:'Device sent more data than requested' +} + +# map transfer codes to errno codes +_transfer_errno = { + LIBUSB_TRANSFER_COMPLETED:0, + LIBUSB_TRANSFER_ERROR:errno.__dict__.get('EIO', None), + LIBUSB_TRANSFER_TIMED_OUT:errno.__dict__.get('ETIMEDOUT', None), + LIBUSB_TRANSFER_CANCELLED:errno.__dict__.get('EAGAIN', None), + LIBUSB_TRANSFER_STALL:errno.__dict__.get('EIO', None), + LIBUSB_TRANSFER_NO_DEVICE:errno.__dict__.get('ENODEV', None), + LIBUSB_TRANSFER_OVERFLOW:errno.__dict__.get('EOVERFLOW', None) +} + # Data structures class _libusb_endpoint_descriptor(Structure): @@ -173,15 +227,49 @@ class _libusb_device_descriptor(Structure): ('iSerialNumber', c_uint8), ('bNumConfigurations', c_uint8)] -_lib = None -_init = None + +# Isochronous packet descriptor. +class _libusb_iso_packet_descriptor(Structure): + _fields_ = [('length', c_uint), + ('actual_length', c_uint), + ('status', c_int)] # enum libusb_transfer_status _libusb_device_handle = c_void_p +class _libusb_transfer(Structure): + pass +_libusb_transfer_p = POINTER(_libusb_transfer) + +_libusb_transfer_cb_fn_p = CFUNCTYPE(None, _libusb_transfer_p) + +_libusb_transfer._fields_ = [('dev_handle', _libusb_device_handle), + ('flags', c_uint8), + ('endpoint', c_uint8), + ('type', c_uint8), + ('timeout', c_uint), + ('status', c_int), # enum libusb_transfer_status + ('length', c_int), + ('actual_length', c_int), + ('callback', _libusb_transfer_cb_fn_p), + ('user_data', py_object), + ('buffer', c_void_p), + ('num_iso_packets', c_int), + ('iso_packet_desc', _libusb_iso_packet_descriptor) +] + +def _get_iso_packet_list(transfer): + list_type = _libusb_iso_packet_descriptor * transfer.num_iso_packets + return list_type.from_address(addressof(transfer.iso_packet_desc)) + +_lib = None + def _load_library(): if sys.platform != 'cygwin': candidates = ('usb-1.0', 'libusb-1.0', 'usb') for candidate in candidates: + if sys.platform == 'win32': + candidate = candidate + '.dll' + libname = ctypes.util.find_library(candidate) if libname is not None: break else: @@ -247,7 +335,7 @@ def _setup_prototypes(lib): lib.libusb_set_configuration.argtypes = [_libusb_device_handle, c_int] # int libusb_get_configuration(libusb_device_handle *dev, - # int *config) + # int *config) lib.libusb_get_configuration.argtypes = [_libusb_device_handle, POINTER(c_int)] # int libusb_claim_interface(libusb_device_handle *dev, @@ -340,7 +428,7 @@ def _setup_prototypes(lib): lib.libusb_control_transfer.argtypes = [ _libusb_device_handle, c_uint8, - c_uint8, + c_uint8, c_uint16, c_uint16, POINTER(c_ubyte), @@ -382,6 +470,79 @@ def _setup_prototypes(lib): c_uint ] + # libusb_transfer* libusb_alloc_transfer(int iso_packets); + lib.libusb_alloc_transfer.argtypes = [c_int] + lib.libusb_alloc_transfer.restype = POINTER(_libusb_transfer) + + # void libusb_free_transfer(struct libusb_transfer *transfer) + lib.libusb_free_transfer.argtypes = [POINTER(_libusb_transfer)] + + # int libusb_submit_transfer(struct libusb_transfer *transfer); + lib.libusb_submit_transfer.argtypes = [POINTER(_libusb_transfer)] + + # void libusb_set_iso_packet_lengths( + # libusb_transfer* transfer, + # unsigned int length + # ); + def libusb_set_iso_packet_lengths(transfer_p, length): + r"""This function is inline in the libusb.h file, so we must implement + it. + + lib.libusb_set_iso_packet_lengths.argtypes = [ + POINTER(_libusb_transfer), + c_int + ] + """ + transfer = transfer_p.contents + for iso_packet_desc in _get_iso_packet_list(transfer): + iso_packet_desc.length = length + lib.libusb_set_iso_packet_lengths = libusb_set_iso_packet_lengths + + #int libusb_get_max_iso_packet_size(libusb_device* dev, + # unsigned char endpoint); + lib.libusb_get_max_iso_packet_size.argtypes = [c_void_p, + c_ubyte] + + # void libusb_fill_iso_transfer( + # struct libusb_transfer* transfer, + # libusb_device_handle* dev_handle, + # unsigned char endpoint, + # unsigned char* buffer, + # int length, + # int num_iso_packets, + # libusb_transfer_cb_fn callback, + # void * user_data, + # unsigned int timeout + # ); + def libusb_fill_iso_transfer(_libusb_transfer_p, dev_handle, endpoint, buffer, length, + num_iso_packets, callback, user_data, timeout): + r"""This function is inline in the libusb.h file, so we must implement + it. + + lib.libusb_fill_iso_transfer.argtypes = [ + _libusb_transfer, + _libusb_device_handle, + c_ubyte, + POINTER(c_ubyte), + c_int, + c_int, + _libusb_transfer_cb_fn_p, + c_void_p, + c_uint + ] + """ + transfer = _libusb_transfer_p.contents + transfer.dev_handle = dev_handle + transfer.endpoint = endpoint + transfer.type = _LIBUSB_TRANSFER_TYPE_ISOCHRONOUS + transfer.timeout = timeout + transfer.buffer = cast(buffer, c_void_p) + transfer.length = length + transfer.num_iso_packets = num_iso_packets + transfer.user_data = user_data + transfer.callback = callback + lib.libusb_fill_iso_transfer = libusb_fill_iso_transfer + # uint8_t libusb_get_bus_number(libusb_device *dev) lib.libusb_get_bus_number.argtypes = [c_void_p] lib.libusb_get_bus_number.restype = c_uint8 @@ -389,6 +550,7 @@ def _setup_prototypes(lib): # uint8_t libusb_get_device_address(libusb_device *dev) lib.libusb_get_device_address.argtypes = [c_void_p] lib.libusb_get_device_address.restype = c_uint8 + try: # uint8_t libusb_get_port_number(libusb_device *dev) lib.libusb_get_port_number.argtypes = [c_void_p] @@ -396,13 +558,15 @@ def _setup_prototypes(lib): except AttributeError: pass + #int libusb_handle_events(libusb_context *ctx); + lib.libusb_handle_events.argtypes = [c_void_p] + # check a libusb function call def _check(retval): if isinstance(retval, int): retval = c_int(retval) if isinstance(retval, c_int): if retval.value < 0: - from usb.core import USBError ret = retval.value raise USBError(_str_error[ret], ret, _libusb_errno[ret]) return retval @@ -432,20 +596,13 @@ class _ConfigDescriptor(object): def __getattr__(self, name): return getattr(self.desc.contents, name) -# initialize and finalize the library -class _Initializer(object): - def __init__(self): - _check(_lib.libusb_init(None)) - def __del__(self): - _lib.libusb_exit(None) - # iterator for libusb devices class _DevIterator(object): - def __init__(self): + def __init__(self, ctx): self.dev_list = POINTER(c_void_p)() self.num_devs = _check(_lib.libusb_get_device_list( - None, + ctx, byref(self.dev_list)) ).value def __iter__(self): @@ -454,30 +611,104 @@ class _DevIterator(object): def __del__(self): _lib.libusb_free_device_list(self.dev_list, 1) +class _DeviceHandle(object): + def __init__(self, dev): + self.handle = _libusb_device_handle() + self.devid = dev.devid + _check(_lib.libusb_open(self.devid, byref(self.handle))) + +class _IsoTransferHandler(object): + def __init__(self, dev_handle, ep, buff, timeout): + address, length = buff.buffer_info() + + packet_length = _lib.libusb_get_max_iso_packet_size(dev_handle.devid, ep) + packet_count = int(math.ceil(float(length) / packet_length)) + + self.transfer = _lib.libusb_alloc_transfer(packet_count) + + _lib.libusb_fill_iso_transfer(self.transfer, + dev_handle.handle, + ep, + cast(address, POINTER(c_ubyte)), + length, + packet_count, + _libusb_transfer_cb_fn_p(self.__callback), + None, + timeout) + + self.__set_packets_length(length, packet_length) + + def __del__(self): + _lib.libusb_free_transfer(self.transfer) + + def submit(self, ctx = None): + self.__callback_done = 0 + _check(_lib.libusb_submit_transfer(self.transfer)) + + while not self.__callback_done: + _check(_lib.libusb_handle_events(ctx)) + + return self.__compute_size_transf_data() + + def __compute_size_transf_data(self): + return sum([t.actual_length for t in + _get_iso_packet_list(self.transfer.contents)]) + + def __set_packets_length(self, n, packet_length): + _lib.libusb_set_iso_packet_lengths(self.transfer, packet_length) + r = n % packet_length + if r: + iso_packets = _get_iso_packet_list(self.transfer.contents) + iso_packets[-1].length = r + + def __callback(self, transfer): + if transfer.contents.status == LIBUSB_TRANSFER_COMPLETED: + self.__callback_done = 1 + else: + status = int(transfer.contents.status) + raise usb.USBError(_str_transfer_error[status], + status, + _transfer_errno[status]) + # implementation of libusb 1.0 backend class _LibUSB(usb.backend.IBackend): @methodtrace(_logger) + def __init__(self, lib): + usb.backend.IBackend.__init__(self) + self.lib = lib + self.ctx = c_void_p() + _check(self.lib.libusb_init(byref(self.ctx))) + + @methodtrace(_logger) + def __del__(self): + self.lib.libusb_exit(self.ctx) + + + @methodtrace(_logger) def enumerate_devices(self): - return _DevIterator() + return _DevIterator(self.ctx) @methodtrace(_logger) def get_device_descriptor(self, dev): dev_desc = _libusb_device_descriptor() - _check(_lib.libusb_get_device_descriptor(dev.devid, byref(dev_desc))) - dev_desc.bus = _lib.libusb_get_bus_number(dev.devid) - dev_desc.address = _lib.libusb_get_device_address(dev.devid) + _check(self.lib.libusb_get_device_descriptor(dev.devid, byref(dev_desc))) + dev_desc.bus = self.lib.libusb_get_bus_number(dev.devid) + dev_desc.address = self.lib.libusb_get_device_address(dev.devid) + #Only available i newer versions of libusb try: - dev_desc.port_number = _lib.libusb_get_port_number(dev.devid) + dev_desc.port_number = self.lib.libusb_get_port_number(dev.devid) except AttributeError: dev_desc.port_number = None + return dev_desc @methodtrace(_logger) def get_configuration_descriptor(self, dev, config): cfg = POINTER(_libusb_config_descriptor)() - _check(_lib.libusb_get_config_descriptor(dev.devid, - config, byref(cfg))) + _check(self.lib.libusb_get_config_descriptor( + dev.devid, + config, byref(cfg))) return _ConfigDescriptor(cfg) @methodtrace(_logger) @@ -499,41 +730,40 @@ class _LibUSB(usb.backend.IBackend): @methodtrace(_logger) def open_device(self, dev): - handle = _libusb_device_handle() - _check(_lib.libusb_open(dev.devid, byref(handle))) - return handle + return _DeviceHandle(dev) @methodtrace(_logger) def close_device(self, dev_handle): - _lib.libusb_close(dev_handle) + self.lib.libusb_close(dev_handle.handle) @methodtrace(_logger) def set_configuration(self, dev_handle, config_value): - _check(_lib.libusb_set_configuration(dev_handle, config_value)) + _check(self.lib.libusb_set_configuration(dev_handle.handle, config_value)) @methodtrace(_logger) def get_configuration(self, dev_handle): config = c_int() - _check(_lib.libusb_get_configuration(dev_handle, byref(config))) + _check(self.lib.libusb_get_configuration(dev_handle.handle, byref(config))) return config.value @methodtrace(_logger) def set_interface_altsetting(self, dev_handle, intf, altsetting): - _check(_lib.libusb_set_interface_alt_setting(dev_handle, - intf, - altsetting)) + _check(self.lib.libusb_set_interface_alt_setting( + dev_handle.handle, + intf, + altsetting)) @methodtrace(_logger) def claim_interface(self, dev_handle, intf): - _check(_lib.libusb_claim_interface(dev_handle, intf)) + _check(self.lib.libusb_claim_interface(dev_handle.handle, intf)) @methodtrace(_logger) def release_interface(self, dev_handle, intf): - _check(_lib.libusb_release_interface(dev_handle, intf)) + _check(self.lib.libusb_release_interface(dev_handle.handle, intf)) @methodtrace(_logger) def bulk_write(self, dev_handle, ep, intf, data, timeout): - return self.__write(_lib.libusb_bulk_transfer, + return self.__write(self.lib.libusb_bulk_transfer, dev_handle, ep, intf, @@ -542,7 +772,7 @@ class _LibUSB(usb.backend.IBackend): @methodtrace(_logger) def bulk_read(self, dev_handle, ep, intf, size, timeout): - return self.__read(_lib.libusb_bulk_transfer, + return self.__read(self.lib.libusb_bulk_transfer, dev_handle, ep, intf, @@ -551,7 +781,7 @@ class _LibUSB(usb.backend.IBackend): @methodtrace(_logger) def intr_write(self, dev_handle, ep, intf, data, timeout): - return self.__write(_lib.libusb_interrupt_transfer, + return self.__write(self.lib.libusb_interrupt_transfer, dev_handle, ep, intf, @@ -560,22 +790,23 @@ class _LibUSB(usb.backend.IBackend): @methodtrace(_logger) def intr_read(self, dev_handle, ep, intf, size, timeout): - return self.__read(_lib.libusb_interrupt_transfer, + return self.__read(self.lib.libusb_interrupt_transfer, dev_handle, ep, intf, size, timeout) -# TODO: implement isochronous -# @methodtrace(_logger) -# def iso_write(self, dev_handle, ep, intf, data, timeout): -# pass - + @methodtrace(_logger) + def iso_write(self, dev_handle, ep, intf, data, timeout): + handler = _IsoTransferHandler(dev_handle, ep, data, timeout) + return handler.submit(self.ctx) -# @methodtrace(_logger) -# def iso_read(self, dev_handle, ep, intf, size, timeout): -# pass + @methodtrace(_logger) + def iso_read(self, dev_handle, ep, intf, size, timeout): + data = _interop.as_array('\x00' * size) + handler = _IsoTransferHandler(dev_handle, ep, data, timeout) + return data[:handler.submit(self.ctx)] @methodtrace(_logger) def ctrl_transfer(self, @@ -594,15 +825,15 @@ class _LibUSB(usb.backend.IBackend): addr, length = buff.buffer_info() length *= buff.itemsize - ret = _check(_lib.libusb_control_transfer(dev_handle, - bmRequestType, - bRequest, - wValue, - wIndex, - cast(addr, - POINTER(c_ubyte)), - length, - timeout)) + ret = _check(self.lib.libusb_control_transfer( + dev_handle.handle, + bmRequestType, + bRequest, + wValue, + wIndex, + cast(addr, POINTER(c_ubyte)), + length, + timeout)) if usb.util.ctrl_direction(bmRequestType) == usb.util.CTRL_OUT: return ret.value @@ -611,25 +842,26 @@ class _LibUSB(usb.backend.IBackend): @methodtrace(_logger) def reset_device(self, dev_handle): - _check(_lib.libusb_reset_device(dev_handle)) + _check(self.lib.libusb_reset_device(dev_handle.handle)) @methodtrace(_logger) def is_kernel_driver_active(self, dev_handle, intf): - return bool(_check(_lib.libusb_kernel_driver_active(dev_handle, intf))) + return bool(_check(self.lib.libusb_kernel_driver_active(dev_handle.handle, + intf))) @methodtrace(_logger) def detach_kernel_driver(self, dev_handle, intf): - _check(_lib.libusb_detach_kernel_driver(dev_handle, intf)) + _check(self.lib.libusb_detach_kernel_driver(dev_handle.handle, intf)) @methodtrace(_logger) def attach_kernel_driver(self, dev_handle, intf): - _check(_lib.libusb_attach_kernel_driver(dev_handle, intf)) + _check(self.lib.libusb_attach_kernel_driver(dev_handle.handle, intf)) def __write(self, fn, dev_handle, ep, intf, data, timeout): address, length = data.buffer_info() length *= data.itemsize transferred = c_int() - retval = fn(dev_handle, + retval = fn(dev_handle.handle, ep, cast(address, POINTER(c_ubyte)), length, @@ -642,11 +874,11 @@ class _LibUSB(usb.backend.IBackend): return transferred.value def __read(self, fn, dev_handle, ep, intf, size, timeout): - data = _interop.as_array((0,) * size) + data = _interop.as_array('\x00' * size) address, length = data.buffer_info() length *= data.itemsize transferred = c_int() - retval = fn(dev_handle, + retval = fn(dev_handle.handle, ep, cast(address, POINTER(c_ubyte)), length, @@ -658,13 +890,12 @@ class _LibUSB(usb.backend.IBackend): return data[:transferred.value] def get_backend(): - global _lib, _init + global _lib try: if _lib is None: _lib = _load_library() _setup_prototypes(_lib) - _init = _Initializer() - return _LibUSB() + return _LibUSB(_lib) except Exception: _logger.error('Error loading libusb 1.0 backend', exc_info=True) return None |