Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/plugins/wedo_plugin/wedo/__init__.py
blob: 8f77bb5d40b0799f614ea7f3fc7aa5c238f8cb3c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from functools import wraps

from wedo.distance import interpolate_distance_data
from wedo.motor import processMotorValues
from wedo.tilt import process_tilt
from wedo.tilt import FLAT, TILT_BACK, TILT_FORWARD, TILT_LEFT, TILT_RIGHT

import usb.core
import logging

logger = logging.getLogger('wedo')

ID_VENDOR, ID_PRODUCT = 0x0694, 0x0003
UNAVAILABLE = None
TILTSENSOR = (38, 39)
DISTANCESENSOR = (176, 177, 178, 179)
MOTOR = (0, 1, 2, 3, 238, 239)

# limit the visibility to simplify the usage
__all__ = ["scan_for_devices", "WeDo", "FLAT", "TILT_BACK", "TILT_FORWARD", "TILT_LEFT", "TILT_RIGHT"]

def scan_for_devices():
    """ Find all available devices """
    return usb.core.find(find_all=True, idVendor=ID_VENDOR, idProduct=ID_PRODUCT)


class WeDo(object):
    """
        Each instance of this class represents a physical WeDo device.

        Usage :

        >>> from wedo import WeDo
        >>> wd = WeDo()

        Activating the first motor full forward:
        >>> wd.motor_a = 100

        Activating the second motor half speed/force backward:
        >>> wd.motor_b = -50

        Current value of the tilt sensor:
        >>> wd.tilt

        Current distance value in meters of the distance sensor:
        >>> wd.distance
    """

    def __init__(self, device=None):
        """
        If a device is not given, it will attach this instance to the first one found.
        Otherwise you can pass a specific one from the list returned by scan_for_devices.
        """
        self.number = 0
        self.dev = device
        if self.dev is None:
            devices = scan_for_devices()
            if not devices:
                raise OSError("Could not find a connected WeDo device")
            self.dev = devices[0]
        self.init_device()
        self.valMotorA = 0
        self.valMotorB = 0

    def init_device(self):
        """
        Reinit device associated with the WeDo instance
        """
        if self.dev is None:
            raise ValueError("No device attached to this instance")
        try:
            if self.dev.is_kernel_driver_active(0):
                try:
                    self.dev.detach_kernel_driver(0)
                except usb.core.USBError as e:
                    logger.error(
                        "Could not detatch kernel driver: %s" % (str(e)))
            self.endpoint = self.dev[0][(0, 0)][0]
        except usb.core.USBError as e:
            logger.error("Could not talk to WeDo device: %s" % (str(e)))

    def getRawData(self):
        """Read 64 bytes from the WeDo's endpoint, but only
        return the last eight."""
        try:
            return self.endpoint.read(64)[-8:]
        except usb.core.USBError as e:
            logger.exception("Could not read from WeDo device")
        return None

    def setMotors(self):
        """
        Arguments should be in form of a number between 0
        and 100, positive or negative. Magic numbers used for
        the ctrl_transfer derived from sniffing USB coms.
        """
        data = [64, processMotorValues(self.valMotorA) & 0xFF, processMotorValues(self.valMotorB) & 0xFF,
                0x00, 0x00, 0x00, 0x00, 0x00]
        try:
            self.dev.ctrl_transfer(bmRequestType=0x21, bRequest=0x09, wValue=0x0200, wIndex=0, data_or_wLength=data)
        except usb.core.USBError as e:
            logger.exception("Could not write to driver")

    def getData(self):
        """
        Sensor data is contained in the 2nd and 4th byte, with
        sensor IDs being contained in the 3rd and 5th byte
        respectively.
        """
        rawData = self.getRawData()
        if rawData is not None:
            sensorData = {rawData[3]: rawData[2], rawData[5]: rawData[4]}
        else:
            sensorData = {}
        return sensorData

    @property
    def raw_tilt(self):
        """
        Returns the raw tilt direction (arbitrary units)
        """
        data = self.getData()
        for num in data:
            if num in TILTSENSOR:
                return data[num]
        return UNAVAILABLE

    @property
    def tilt(self):
        """
        Returns the tilt direction (one of the FLAT, TILT_FORWARD, TILT_LEFT, TILT_RIGHT, TILT_BACK constants)
        """
        raw_data = self.raw_tilt
        if raw_data is UNAVAILABLE:
            return UNAVAILABLE
        return process_tilt(raw_data)

    @property
    def raw_distance(self):
        """
        Return the raw evaluated distance from the distance meter (arbitrary units)
        """
        data = self.getData()
        for num in data:
            if num in DISTANCESENSOR:
                return data[num]
        return UNAVAILABLE

    @property
    def distance(self):
        """
        Return the evaluated distance in meters from the distance meter.
        (Note: this is the ideal distance without any objets on the side, you might have to adapt it depending on your construction)
        """

        raw_data = self.raw_distance
        if raw_data is UNAVAILABLE:
            return UNAVAILABLE
        return interpolate_distance_data(raw_data)

    @property
    def motor_a(self):
        """ Get back the last speed/force set for motor A
        """
        return self.valMotorA

    @property
    def motor_b(self):
        """ Get back the last speed/force set for motor A
        """
        return self.valMotorB

    @motor_a.setter
    def motor_a(self, value):
        """ Sets the speed/force of the motor A, expects a value between -100 and 100
        """
        if value > 100 or value < -100:
            raise ValueError("A motor can only be between -100 and 100")
        self.valMotorA = value
        self.setMotors()

    @motor_b.setter
    def motor_b(self, value):
        """ Sets the speed/force of the motor B, expects a value between -100 and 100
        """
        if value > 100 or value < -100:
            raise ValueError("A motor can only be between -100 and 100")
        self.valMotorB = value
        self.setMotors()