diff options
Diffstat (limited to 'pybot/usb4butia.py')
-rw-r--r--[-rwxr-xr-x] | pybot/usb4butia.py | 439 |
1 files changed, 191 insertions, 248 deletions
diff --git a/pybot/usb4butia.py b/pybot/usb4butia.py index eb964d7..2e2e60a 100755..100644 --- a/pybot/usb4butia.py +++ b/pybot/usb4butia.py @@ -3,6 +3,7 @@ # # USB4Butia main # +# Copyright (c) 2012-2013 Alan Aguiar alanjas@hotmail.com # Copyright (c) 2012-2013 ButiĆ” Team butia@fing.edu.uy # Butia is a free and open robotic platform # www.fing.edu.uy/inco/proyectos/butia @@ -24,97 +25,76 @@ import os import imp +import inspect import com_usb from baseboard import Baseboard from device import Device +from functions import ButiaFunctions ERROR = -1 -class USB4Butia(): +class USB4Butia(ButiaFunctions): def __init__(self, debug=False, get_modules=True): - self._debug = debug + self._debug_flag = debug self._hotplug = [] self._openables = [] self._drivers_loaded = {} self._bb = [] - self._modules = [] + self._b_ports = [] self._get_all_drivers() - self.find_butias(get_modules) + self.refresh() + if get_modules: + self.getModulesList(refresh=False) + + def _debug(self, message, err=''): + if self._debug_flag: + print message, err - def get_butia_count(self): + def getButiaCount(self): """ Gets the number of boards detected """ return len(self._bb) - def find_butias(self, get_modules=True): - """ - Search for connected USB4Butia boards and open it - """ - devices = com_usb.find() - for dev in devices: - b = Baseboard(dev) - try: - b.open_baseboard() - self._bb.append(b) - except: - if self._debug: - print 'error open baseboard' - if get_modules: - self.get_modules_list() - - def get_modules_list(self, normal=True): + def getModulesList(self, refresh=True): """ Get the list of modules loaded in the board """ - self._modules = [] - n_boards = self.get_butia_count() - - if self._debug: - print '=Listing Devices' - + self._debug('=Listing Devices') + modules = [] + if refresh: + self.refresh() + n_boards = self.getButiaCount() for i, b in enumerate(self._bb): try: listi = b.get_listi() s = b.get_handler_size() - - if self._debug: - print '===board', i - + self._debug('===board', i) for m in range(0, s + 1): - module_name = listi[b.get_handler_type(m)] - if n_boards > 1: - complete_name = module_name + '@' + str(i) + ':' + str(m) + t = b.get_handler_type(m) + if (t == 255): + self._debug('unknow module in port:%s' % m) else: - complete_name = module_name + ':' + str(m) - - if self._debug: - print '=====module', module_name, (8 - len(module_name)) * ' ', complete_name - - if not(module_name == 'port'): - - if normal: - self._modules.append(complete_name) + module_name = listi[t] + if n_boards > 1: + complete_name = module_name + '@' + str(i) + ':' + str(m) else: - self._modules.append((str(m), module_name, str(i))) - - if not(b.devices.has_key(m) and (b.devices[m].name == module_name)): - d = Device(b, module_name, m) - d.add_functions(self._drivers_loaded[module_name]) - b.add_device(m, d) - - if module_name in self._openables: - b.add_openable_loaded(module_name) - else: - if b.devices.has_key(m): - b.devices.pop(m) - + complete_name = module_name + ':' + str(m) + self._debug('=====module ' + module_name + (9 - len(module_name)) * ' ' + complete_name) + if not(module_name == 'port'): + modules.append(complete_name) + if not(b.devices.has_key(m) and (b.devices[m].name == module_name)): + d = Device(b, module_name, m, self._drivers_loaded[module_name]) + b.add_device(m, d) + if module_name in self._openables: + b.add_openable_loaded(module_name) + else: + if b.devices.has_key(m): + b.devices.pop(m) except Exception, err: - if self._debug: - print 'error module list', err - - return self._modules + self._debug('ERROR:usb4butia:get_modules_list', err) + return modules def _get_all_drivers(self): """ @@ -122,8 +102,7 @@ class USB4Butia(): """ # current folder path_drivers = os.path.join(os.path.dirname(__file__), 'drivers') - if self._debug: - print 'Searching drivers in: ', path_drivers + self._debug('Searching drivers in: ', str(path_drivers)) # normal drivers tmp = os.listdir(path_drivers) tmp.sort() @@ -146,230 +125,194 @@ class USB4Butia(): """ Get a specify driver """ - if self._debug: - print 'Loading driver %s...' % driver + self._debug('Loading driver %s...' % driver) abs_path = os.path.abspath(os.path.join(path, driver + '.py')) - f = None try: - f = imp.load_source(driver, abs_path) + self._drivers_loaded[driver] = imp.load_source(driver, abs_path) except: - if self._debug: - print 'Cannot load %s' % driver, abs_path - if f and hasattr(f, 'FUNCTIONS'): - self._drivers_loaded[driver] = f.FUNCTIONS - else: - if self._debug: - print 'Driver not have FUNCTIONS' - - def callModule(self, modulename, board_number, number, function, params = []): + self._debug('ERROR:usb4butia:_get_driver cannot load %s' % driver, abs_path) + + def callModule(self, modulename, board_number, number, function, params = [], ret_type = int): """ Call one function: function for module: modulename in board: board_name with handler: number (only if the module is pnp, else, the parameter is None) with parameteres: params """ try: + number = int(number) + board_number = int(board_number) + if len(self._bb) < (board_number + 1): + return ERROR board = self._bb[board_number] if board.devices.has_key(number) and (board.devices[number].name == modulename): return board.devices[number].call_function(function, params) else: - if modulename in self._openables: - if modulename in board.get_openables_loaded(): - number = board.get_device_handler(modulename) - else: - board.add_openable_loaded(modulename) - dev = Device(board, modulename) - number = dev.module_open() - dev.add_functions(self._drivers_loaded[modulename]) - board.add_device(number, dev) - return board.devices[number].call_function(function, params) - else: - if self._debug: - print 'no open and no openable' + number = self._open_or_validate(modulename, board) + if number == ERROR: return ERROR + return board.devices[number].call_function(function, params) except Exception, err: - if self._debug: - print 'error call module', err + if hasattr(err, 'errno'): + if (err.errno == 5) or (err.errno == 19): + self.closeB(board) + self._debug('ERROR:usb4butia:callModule', err) return ERROR - def reconnect(self): - """ - Not implemented - """ - pass - def refresh(self): """ - Refresh: if no boards presents, search for them.. else, check if - the boards continues present + Search for connected USB4Butia boards and open it """ - if self._bb == []: - self.find_butias(False) - else: - for b in self._bb: - info = ERROR - try: - info = b.get_info() - except: - if self._debug: - print 'error refresh getinfo' - - if info == ERROR: - self._bb.remove(b) + devices_ports = [] + devices = com_usb.find() + for dev in devices: + n = dev.get_address() + if not(n == None): + devices_ports.append(n) + if not(n in self._b_ports): + b = Baseboard(dev) try: - b.close_baseboard() - except: - pass + b.open_baseboard() + self._bb.append(b) + self._b_ports.append(n) + except Exception, err: + self._debug('ERROR:usb4butia:refresh', err) + + for b in self._bb: + n = b.dev.get_address() + if not(n in devices_ports): + self.closeB(b) + + def closeB(self, b): + try: + n = b.dev.get_address() + self._bb.remove(b) + b.close_baseboard() + if n in self._b_ports: + self._b_ports.remove(n) + except: + pass def close(self): """ Closes all open baseboards """ for b in self._bb: - try: - b.close_baseboard() - except: - if self._debug: - print 'error in close baseboard' + self.closeB(b) self._bb = [] + self._b_ports = [] - def isPresent(self, module_name): + def moduleOpen(self, mod): """ - Check if module: module_name is present - """ - module_list = self.get_modules_list() - return (module_name in module_list) - - def loopBack(self, data, board=0): - """ - LoopBack command: send data to the board and get the result. If all is ok - the return must be exactly of the data parameter - """ - return self.callModule('lback', board, 0, 'send', [data]) - - ################################ Movement calls ################################ - - def set2MotorSpeed(self, leftSense = 0, leftSpeed = 0, rightSense = 0, rightSpeed = 0, board = 0): + Open the module mod """ - Set the speed of 2 motors. The sense is 0 or 1, and the speed is - between 0 and 1023 - """ - msg = [int(leftSense), int(leftSpeed / 256.0), leftSpeed % 256, int(rightSense), int(rightSpeed / 256.0) , rightSpeed % 256] - return self.callModule('motors', board, 0, 'setvel2mtr', msg) - - def setMotorSpeed(self, idMotor = 0, sense = 0, speed = 0, board = 0): - """ - Set the speed of one motor. idMotor = 0 for left motor and 1 for the - right motor. The sense is 0 or 1, and the speed is between 0 and 1023 - """ - msg = [idMotor, sense, int(speed / 256.0), speed % 256] - return self.callModule('motors', board, 0, 'setvelmtr', msg) - - ############################### General calls ############################### - - def getBatteryCharge(self, board=0): - """ - Gets the battery level charge - """ - return self.callModule('butia', board, 0, 'get_volt') - - def getVersion(self, board=0): - """ - Gets the version of ButiĆ” module. 22 for new version - """ - return self.callModule('butia', board, 0, 'read_ver') - - def getFirmwareVersion(self, board=0): - """ - Gets the version of the Firmware - """ - return self.callModule('admin', board, 0, 'getVersion') - - ############################### Sensors calls ############################### + split = self._split_module(mod) + modulename = split[1] + b = int(split[2]) + if len(self._bb) < (b + 1): + return ERROR + board = self._bb[b] + return self._open_or_validate(modulename, board) - def getButton(self, number, board=0): - """ - Gets the value of the button connected in port: number + def _open_or_validate(self, modulename, board): """ - res = self.callModule('button', board, number, 'getValue') - if res != ERROR: - return (1 - res) - else: - return res - - def getLight(self, number, board=0): - """ - Gets the value of the light sensor connected in port: number + Open o check if modulename module is open in board: board """ - m = 65535 - res = self.callModule('light', board, number, 'getValue') - if res != ERROR: - return (m - res) + if modulename in self._openables: + if modulename in board.get_openables_loaded(): + return board.get_device_handler(modulename) + else: + dev = Device(board, modulename, func=self._drivers_loaded[modulename]) + number = dev.module_open() + if number == 255: + self._debug('cannot open module', modulename) + return ERROR + else: + board.add_openable_loaded(modulename) + board.add_device(number, dev) + return number + return ERROR + + def moduleClose(self, mod): + """ + Close the module mod + """ + split = self._split_module(mod) + modulename = split[1] + if modulename in self._openables: + b = int(split[2]) + if len(self._bb) < (b + 1): + return ERROR + board = self._bb[b] + if modulename in board.get_openables_loaded(): + number = board.get_device_handler(modulename) + try: + res = board.devices[number].moduleClose() + if res == 1: + board.remove_openable_loaded(modulename) + return res + except Exception, err: + self._debug('ERROR:usb4butia:moduleClose', err) + return ERROR + else: + self._debug('cannot close no opened module') + return ERROR else: - return res - - def getDistance(self, number, board=0): - """ - Gets the value of the distance sensor connected in port: number - """ - return self.callModule('distanc', board, number, 'getValue') - - def getGray(self, number, board=0): - """ - Gets the value of the gray sensor connected in port: number - """ - return self.callModule('grey', board, number, 'getValue') - - def getTemperature(self, number, board=0): - """ - Gets the value of the temperature sensor connected in port: number - """ - return self.callModule('temp', board, number, 'getValue') - - def getResistance(self, number, board=0): - """ - Gets the value of the resistance sensor connected in port: number - """ - vcc = 65535 - raw = self.callModule('res', board, number, 'getValue') - if not(raw == ERROR): - return raw * 6800 / (vcc - raw) - return raw - - def getVoltage(self, number, board=0): - """ - Gets the value of the voltage sensor connected in port: number - """ - vcc = 65535 - raw = self.callModule('volt', board, number, 'getValue') - if not(raw == ERROR): - return raw * 5 / vcc - return raw - - def setLed(self, number, on_off, board=0): - """ - Sets on or off the LED connected in port: number (0 is off, 1 is on) - """ - return self.callModule('led', board, number, 'turn', [int(on_off)]) - - ################################ Extras ################################ + self._debug('cannot close no openable module') + return ERROR - def modeHack(self, pin, mode, board = 0): + def getListi(self, board_number=0): """ - Sets the mode of hack pin. If mode 0 = input, mode 1 = output + returns a list of instanciables modules """ - msg = [int(pin), int(mode)] - return self.callModule('hackp', board, 0, 'setMode', msg) + board_number = int(board_number) + if len(self._bb) < (board_number + 1): + return [] + board = self._bb[board_number] + listi = board.get_listi(True) + return listi.values() - def setHack(self, pin, value, board = 0): + def _split_module(self, mbn): """ - Sets the value of hack pin configured as output. Value is 0 or 1 + Split a modulename: module@board:port to (number, modulename, board) """ - msg = [int(pin), int(value)] - return self.callModule('hackp', board, 0, 'write', msg) - - def getHack(self, pin, board = 0): - """ - Gets the value of hack pin configured as input. Returns 0 or 1 - """ - return self.callModule('hackp', board, 0, 'read', [int(pin)]) + board = '0' + number = '0' + if mbn.count('@') > 0: + modulename, bn = mbn.split('@') + if bn.count(':') > 0: + board, number = bn.split(':') + else: + board = bn + else: + if mbn.count(':') > 0: + modulename, number = mbn.split(':') + else: + modulename = mbn + return (number, modulename, board) + + def describe(self, mod): + """ + Describe the functions of a modulename + """ + split = self._split_module(mod) + mod = split[1] + funcs = [] + d = {} + if self._drivers_loaded.has_key(mod): + driver = self._drivers_loaded[mod] + a = dir(driver) + flag = False + for p in a: + if flag: + funcs.append(p) + if p == '__package__': + flag = True + for f in funcs: + h = getattr(driver, f) + i = inspect.getargspec(h) + parameters = i[0] + if 'dev' in parameters: + parameters.remove('dev') + d[f] = parameters + return d |