Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/pybot/usb4butia.py
diff options
context:
space:
mode:
Diffstat (limited to 'pybot/usb4butia.py')
-rw-r--r--[-rwxr-xr-x]pybot/usb4butia.py439
1 files changed, 191 insertions, 248 deletions
diff --git a/pybot/usb4butia.py b/pybot/usb4butia.py
index eb964d7..2e2e60a 100755..100644
--- a/pybot/usb4butia.py
+++ b/pybot/usb4butia.py
@@ -3,6 +3,7 @@
#
# USB4Butia main
#
+# Copyright (c) 2012-2013 Alan Aguiar alanjas@hotmail.com
# Copyright (c) 2012-2013 ButiĆ” Team butia@fing.edu.uy
# Butia is a free and open robotic platform
# www.fing.edu.uy/inco/proyectos/butia
@@ -24,97 +25,76 @@
import os
import imp
+import inspect
import com_usb
from baseboard import Baseboard
from device import Device
+from functions import ButiaFunctions
ERROR = -1
-class USB4Butia():
+class USB4Butia(ButiaFunctions):
def __init__(self, debug=False, get_modules=True):
- self._debug = debug
+ self._debug_flag = debug
self._hotplug = []
self._openables = []
self._drivers_loaded = {}
self._bb = []
- self._modules = []
+ self._b_ports = []
self._get_all_drivers()
- self.find_butias(get_modules)
+ self.refresh()
+ if get_modules:
+ self.getModulesList(refresh=False)
+
+ def _debug(self, message, err=''):
+ if self._debug_flag:
+ print message, err
- def get_butia_count(self):
+ def getButiaCount(self):
"""
Gets the number of boards detected
"""
return len(self._bb)
- def find_butias(self, get_modules=True):
- """
- Search for connected USB4Butia boards and open it
- """
- devices = com_usb.find()
- for dev in devices:
- b = Baseboard(dev)
- try:
- b.open_baseboard()
- self._bb.append(b)
- except:
- if self._debug:
- print 'error open baseboard'
- if get_modules:
- self.get_modules_list()
-
- def get_modules_list(self, normal=True):
+ def getModulesList(self, refresh=True):
"""
Get the list of modules loaded in the board
"""
- self._modules = []
- n_boards = self.get_butia_count()
-
- if self._debug:
- print '=Listing Devices'
-
+ self._debug('=Listing Devices')
+ modules = []
+ if refresh:
+ self.refresh()
+ n_boards = self.getButiaCount()
for i, b in enumerate(self._bb):
try:
listi = b.get_listi()
s = b.get_handler_size()
-
- if self._debug:
- print '===board', i
-
+ self._debug('===board', i)
for m in range(0, s + 1):
- module_name = listi[b.get_handler_type(m)]
- if n_boards > 1:
- complete_name = module_name + '@' + str(i) + ':' + str(m)
+ t = b.get_handler_type(m)
+ if (t == 255):
+ self._debug('unknow module in port:%s' % m)
else:
- complete_name = module_name + ':' + str(m)
-
- if self._debug:
- print '=====module', module_name, (8 - len(module_name)) * ' ', complete_name
-
- if not(module_name == 'port'):
-
- if normal:
- self._modules.append(complete_name)
+ module_name = listi[t]
+ if n_boards > 1:
+ complete_name = module_name + '@' + str(i) + ':' + str(m)
else:
- self._modules.append((str(m), module_name, str(i)))
-
- if not(b.devices.has_key(m) and (b.devices[m].name == module_name)):
- d = Device(b, module_name, m)
- d.add_functions(self._drivers_loaded[module_name])
- b.add_device(m, d)
-
- if module_name in self._openables:
- b.add_openable_loaded(module_name)
- else:
- if b.devices.has_key(m):
- b.devices.pop(m)
-
+ complete_name = module_name + ':' + str(m)
+ self._debug('=====module ' + module_name + (9 - len(module_name)) * ' ' + complete_name)
+ if not(module_name == 'port'):
+ modules.append(complete_name)
+ if not(b.devices.has_key(m) and (b.devices[m].name == module_name)):
+ d = Device(b, module_name, m, self._drivers_loaded[module_name])
+ b.add_device(m, d)
+ if module_name in self._openables:
+ b.add_openable_loaded(module_name)
+ else:
+ if b.devices.has_key(m):
+ b.devices.pop(m)
except Exception, err:
- if self._debug:
- print 'error module list', err
-
- return self._modules
+ self._debug('ERROR:usb4butia:get_modules_list', err)
+ return modules
def _get_all_drivers(self):
"""
@@ -122,8 +102,7 @@ class USB4Butia():
"""
# current folder
path_drivers = os.path.join(os.path.dirname(__file__), 'drivers')
- if self._debug:
- print 'Searching drivers in: ', path_drivers
+ self._debug('Searching drivers in: ', str(path_drivers))
# normal drivers
tmp = os.listdir(path_drivers)
tmp.sort()
@@ -146,230 +125,194 @@ class USB4Butia():
"""
Get a specify driver
"""
- if self._debug:
- print 'Loading driver %s...' % driver
+ self._debug('Loading driver %s...' % driver)
abs_path = os.path.abspath(os.path.join(path, driver + '.py'))
- f = None
try:
- f = imp.load_source(driver, abs_path)
+ self._drivers_loaded[driver] = imp.load_source(driver, abs_path)
except:
- if self._debug:
- print 'Cannot load %s' % driver, abs_path
- if f and hasattr(f, 'FUNCTIONS'):
- self._drivers_loaded[driver] = f.FUNCTIONS
- else:
- if self._debug:
- print 'Driver not have FUNCTIONS'
-
- def callModule(self, modulename, board_number, number, function, params = []):
+ self._debug('ERROR:usb4butia:_get_driver cannot load %s' % driver, abs_path)
+
+ def callModule(self, modulename, board_number, number, function, params = [], ret_type = int):
"""
Call one function: function for module: modulename in board: board_name
with handler: number (only if the module is pnp, else, the parameter is
None) with parameteres: params
"""
try:
+ number = int(number)
+ board_number = int(board_number)
+ if len(self._bb) < (board_number + 1):
+ return ERROR
board = self._bb[board_number]
if board.devices.has_key(number) and (board.devices[number].name == modulename):
return board.devices[number].call_function(function, params)
else:
- if modulename in self._openables:
- if modulename in board.get_openables_loaded():
- number = board.get_device_handler(modulename)
- else:
- board.add_openable_loaded(modulename)
- dev = Device(board, modulename)
- number = dev.module_open()
- dev.add_functions(self._drivers_loaded[modulename])
- board.add_device(number, dev)
- return board.devices[number].call_function(function, params)
- else:
- if self._debug:
- print 'no open and no openable'
+ number = self._open_or_validate(modulename, board)
+ if number == ERROR:
return ERROR
+ return board.devices[number].call_function(function, params)
except Exception, err:
- if self._debug:
- print 'error call module', err
+ if hasattr(err, 'errno'):
+ if (err.errno == 5) or (err.errno == 19):
+ self.closeB(board)
+ self._debug('ERROR:usb4butia:callModule', err)
return ERROR
- def reconnect(self):
- """
- Not implemented
- """
- pass
-
def refresh(self):
"""
- Refresh: if no boards presents, search for them.. else, check if
- the boards continues present
+ Search for connected USB4Butia boards and open it
"""
- if self._bb == []:
- self.find_butias(False)
- else:
- for b in self._bb:
- info = ERROR
- try:
- info = b.get_info()
- except:
- if self._debug:
- print 'error refresh getinfo'
-
- if info == ERROR:
- self._bb.remove(b)
+ devices_ports = []
+ devices = com_usb.find()
+ for dev in devices:
+ n = dev.get_address()
+ if not(n == None):
+ devices_ports.append(n)
+ if not(n in self._b_ports):
+ b = Baseboard(dev)
try:
- b.close_baseboard()
- except:
- pass
+ b.open_baseboard()
+ self._bb.append(b)
+ self._b_ports.append(n)
+ except Exception, err:
+ self._debug('ERROR:usb4butia:refresh', err)
+
+ for b in self._bb:
+ n = b.dev.get_address()
+ if not(n in devices_ports):
+ self.closeB(b)
+
+ def closeB(self, b):
+ try:
+ n = b.dev.get_address()
+ self._bb.remove(b)
+ b.close_baseboard()
+ if n in self._b_ports:
+ self._b_ports.remove(n)
+ except:
+ pass
def close(self):
"""
Closes all open baseboards
"""
for b in self._bb:
- try:
- b.close_baseboard()
- except:
- if self._debug:
- print 'error in close baseboard'
+ self.closeB(b)
self._bb = []
+ self._b_ports = []
- def isPresent(self, module_name):
+ def moduleOpen(self, mod):
"""
- Check if module: module_name is present
- """
- module_list = self.get_modules_list()
- return (module_name in module_list)
-
- def loopBack(self, data, board=0):
- """
- LoopBack command: send data to the board and get the result. If all is ok
- the return must be exactly of the data parameter
- """
- return self.callModule('lback', board, 0, 'send', [data])
-
- ################################ Movement calls ################################
-
- def set2MotorSpeed(self, leftSense = 0, leftSpeed = 0, rightSense = 0, rightSpeed = 0, board = 0):
+ Open the module mod
"""
- Set the speed of 2 motors. The sense is 0 or 1, and the speed is
- between 0 and 1023
- """
- msg = [int(leftSense), int(leftSpeed / 256.0), leftSpeed % 256, int(rightSense), int(rightSpeed / 256.0) , rightSpeed % 256]
- return self.callModule('motors', board, 0, 'setvel2mtr', msg)
-
- def setMotorSpeed(self, idMotor = 0, sense = 0, speed = 0, board = 0):
- """
- Set the speed of one motor. idMotor = 0 for left motor and 1 for the
- right motor. The sense is 0 or 1, and the speed is between 0 and 1023
- """
- msg = [idMotor, sense, int(speed / 256.0), speed % 256]
- return self.callModule('motors', board, 0, 'setvelmtr', msg)
-
- ############################### General calls ###############################
-
- def getBatteryCharge(self, board=0):
- """
- Gets the battery level charge
- """
- return self.callModule('butia', board, 0, 'get_volt')
-
- def getVersion(self, board=0):
- """
- Gets the version of ButiĆ” module. 22 for new version
- """
- return self.callModule('butia', board, 0, 'read_ver')
-
- def getFirmwareVersion(self, board=0):
- """
- Gets the version of the Firmware
- """
- return self.callModule('admin', board, 0, 'getVersion')
-
- ############################### Sensors calls ###############################
+ split = self._split_module(mod)
+ modulename = split[1]
+ b = int(split[2])
+ if len(self._bb) < (b + 1):
+ return ERROR
+ board = self._bb[b]
+ return self._open_or_validate(modulename, board)
- def getButton(self, number, board=0):
- """
- Gets the value of the button connected in port: number
+ def _open_or_validate(self, modulename, board):
"""
- res = self.callModule('button', board, number, 'getValue')
- if res != ERROR:
- return (1 - res)
- else:
- return res
-
- def getLight(self, number, board=0):
- """
- Gets the value of the light sensor connected in port: number
+ Open o check if modulename module is open in board: board
"""
- m = 65535
- res = self.callModule('light', board, number, 'getValue')
- if res != ERROR:
- return (m - res)
+ if modulename in self._openables:
+ if modulename in board.get_openables_loaded():
+ return board.get_device_handler(modulename)
+ else:
+ dev = Device(board, modulename, func=self._drivers_loaded[modulename])
+ number = dev.module_open()
+ if number == 255:
+ self._debug('cannot open module', modulename)
+ return ERROR
+ else:
+ board.add_openable_loaded(modulename)
+ board.add_device(number, dev)
+ return number
+ return ERROR
+
+ def moduleClose(self, mod):
+ """
+ Close the module mod
+ """
+ split = self._split_module(mod)
+ modulename = split[1]
+ if modulename in self._openables:
+ b = int(split[2])
+ if len(self._bb) < (b + 1):
+ return ERROR
+ board = self._bb[b]
+ if modulename in board.get_openables_loaded():
+ number = board.get_device_handler(modulename)
+ try:
+ res = board.devices[number].moduleClose()
+ if res == 1:
+ board.remove_openable_loaded(modulename)
+ return res
+ except Exception, err:
+ self._debug('ERROR:usb4butia:moduleClose', err)
+ return ERROR
+ else:
+ self._debug('cannot close no opened module')
+ return ERROR
else:
- return res
-
- def getDistance(self, number, board=0):
- """
- Gets the value of the distance sensor connected in port: number
- """
- return self.callModule('distanc', board, number, 'getValue')
-
- def getGray(self, number, board=0):
- """
- Gets the value of the gray sensor connected in port: number
- """
- return self.callModule('grey', board, number, 'getValue')
-
- def getTemperature(self, number, board=0):
- """
- Gets the value of the temperature sensor connected in port: number
- """
- return self.callModule('temp', board, number, 'getValue')
-
- def getResistance(self, number, board=0):
- """
- Gets the value of the resistance sensor connected in port: number
- """
- vcc = 65535
- raw = self.callModule('res', board, number, 'getValue')
- if not(raw == ERROR):
- return raw * 6800 / (vcc - raw)
- return raw
-
- def getVoltage(self, number, board=0):
- """
- Gets the value of the voltage sensor connected in port: number
- """
- vcc = 65535
- raw = self.callModule('volt', board, number, 'getValue')
- if not(raw == ERROR):
- return raw * 5 / vcc
- return raw
-
- def setLed(self, number, on_off, board=0):
- """
- Sets on or off the LED connected in port: number (0 is off, 1 is on)
- """
- return self.callModule('led', board, number, 'turn', [int(on_off)])
-
- ################################ Extras ################################
+ self._debug('cannot close no openable module')
+ return ERROR
- def modeHack(self, pin, mode, board = 0):
+ def getListi(self, board_number=0):
"""
- Sets the mode of hack pin. If mode 0 = input, mode 1 = output
+ returns a list of instanciables modules
"""
- msg = [int(pin), int(mode)]
- return self.callModule('hackp', board, 0, 'setMode', msg)
+ board_number = int(board_number)
+ if len(self._bb) < (board_number + 1):
+ return []
+ board = self._bb[board_number]
+ listi = board.get_listi(True)
+ return listi.values()
- def setHack(self, pin, value, board = 0):
+ def _split_module(self, mbn):
"""
- Sets the value of hack pin configured as output. Value is 0 or 1
+ Split a modulename: module@board:port to (number, modulename, board)
"""
- msg = [int(pin), int(value)]
- return self.callModule('hackp', board, 0, 'write', msg)
-
- def getHack(self, pin, board = 0):
- """
- Gets the value of hack pin configured as input. Returns 0 or 1
- """
- return self.callModule('hackp', board, 0, 'read', [int(pin)])
+ board = '0'
+ number = '0'
+ if mbn.count('@') > 0:
+ modulename, bn = mbn.split('@')
+ if bn.count(':') > 0:
+ board, number = bn.split(':')
+ else:
+ board = bn
+ else:
+ if mbn.count(':') > 0:
+ modulename, number = mbn.split(':')
+ else:
+ modulename = mbn
+ return (number, modulename, board)
+
+ def describe(self, mod):
+ """
+ Describe the functions of a modulename
+ """
+ split = self._split_module(mod)
+ mod = split[1]
+ funcs = []
+ d = {}
+ if self._drivers_loaded.has_key(mod):
+ driver = self._drivers_loaded[mod]
+ a = dir(driver)
+ flag = False
+ for p in a:
+ if flag:
+ funcs.append(p)
+ if p == '__package__':
+ flag = True
+ for f in funcs:
+ h = getattr(driver, f)
+ i = inspect.getargspec(h)
+ parameters = i[0]
+ if 'dev' in parameters:
+ parameters.remove('dev')
+ d[f] = parameters
+ return d