From 6acdbc3db543f2692ee336a99722f5ab0b46c77e Mon Sep 17 00:00:00 2001 From: Walter Bender Date: Wed, 13 Nov 2013 22:42:18 +0000 Subject: convert to new primitive type --- (limited to 'plugins/audio_sensors') diff --git a/plugins/audio_sensors/audio_sensors.py b/plugins/audio_sensors/audio_sensors.py index 8d45395..d62ca65 100644 --- a/plugins/audio_sensors/audio_sensors.py +++ b/plugins/audio_sensors/audio_sensors.py @@ -18,7 +18,6 @@ from gettext import gettext as _ try: - from numpy import append from numpy.fft import rfft PITCH_AVAILABLE = True except: @@ -33,8 +32,9 @@ from plugins.audio_sensors.ringbuffer import RingBuffer1d from TurtleArt.tapalette import make_palette from TurtleArt.taconstants import XO1, XO15, XO175, XO30, XO4 -from TurtleArt.talogo import primitive_dictionary from TurtleArt.tautils import debug_output +from TurtleArt.taprimitive import (ConstantArg, Primitive) +from TurtleArt.tatype import TYPE_NUMBER import logging _logger = logging.getLogger('turtleart-activity audio sensors plugin') @@ -57,7 +57,9 @@ def _avg(array, abs_value=False): class Audio_sensors(Plugin): def __init__(self, parent): + Plugin.__init__(self) self._parent = parent + self.audio_started = False self._status = True # TODO: test for audio device # These flags are referenced by audiograb self.hw = self._parent.hw @@ -65,152 +67,145 @@ class Audio_sensors(Plugin): def setup(self): ''' set up audio-sensor-specific blocks ''' + self._sound = [0, 0] + self._volume = [0, 0] + self._pitch = [0, 0] + self._resistance = [0, 0] + self._voltage = [0, 0] self.max_samples = 1500 self.input_step = 1 - self.ringbuffer = [] palette = make_palette('sensor', colors=["#FF6060", "#A06060"], help_string=_('Palette of sensor blocks'), position=6) - - primitive_dictionary['sound'] = self.prim_sound - primitive_dictionary['volume'] = self.prim_volume + hidden = True if self._status: - palette.add_block('sound', - style='box-style', - label=_('sound'), - help_string=_('raw microphone input signal'), - value_block=True, - prim_name='sound') - - palette.add_block('volume', - style='box-style', - label=_('loudness'), - help_string=_('microphone input volume'), - value_block=True, - prim_name='volume') - else: - palette.add_block('sound', - hidden=True, - style='box-style', - label=_('sound'), - help_string=_('raw microphone input signal'), - value_block=True, - prim_name='sound') - palette.add_block('volume', - hidden=True, - style='box-style', - label=_('loudness'), - help_string=_('microphone input volume'), - value_block=True, - prim_name='volume') + hidden = False + + palette.add_block('sound', + hidden=hidden, + style='box-style', + label=_('sound'), + help_string=_('raw microphone input signal'), + value_block=True, + prim_name='sound') + palette.add_block('volume', + hidden=hidden, + style='box-style', + label=_('loudness'), + help_string=_('microphone input volume'), + value_block=True, + prim_name='volume') self._parent.lc.def_prim( - 'sound', 0, lambda self: primitive_dictionary['sound'](0)) + 'sound', 0, + Primitive(self.prim_sound, + return_type=TYPE_NUMBER, + kwarg_descs={'channel': ConstantArg(0)}, + call_afterwards=self.after_sound)) + self._parent.lc.def_prim( - 'volume', 0, lambda self: primitive_dictionary['volume'](0)) + 'volume', 0, + Primitive(self.prim_volume, + return_type=TYPE_NUMBER, + kwarg_descs={'channel': ConstantArg(0)}, + call_afterwards=self.after_volume)) - primitive_dictionary['pitch'] = self.prim_pitch + hidden = True if PITCH_AVAILABLE and self._status: - palette.add_block('pitch', - style='box-style', - label=_('pitch'), - help_string=_('microphone input pitch'), - value_block=True, - prim_name='pitch') - else: - palette.add_block('pitch', - hidden=True, - style='box-style', - label=_('pitch'), - help_string=_('microphone input pitch'), - value_block=True, - prim_name='pitch') - self._parent.lc.def_prim('pitch', 0, - lambda self: primitive_dictionary['pitch'](0)) - - primitive_dictionary['resistance'] = self.prim_resistance - primitive_dictionary['voltage'] = self.prim_voltage + hidden = False + + palette.add_block('pitch', + hidden=hidden, + style='box-style', + label=_('pitch'), + help_string=_('microphone input pitch'), + value_block=True, + prim_name='pitch') + self._parent.lc.def_prim( + 'pitch', 0, + Primitive(self.prim_pitch, + return_type=TYPE_NUMBER, + kwarg_descs={'channel': ConstantArg(0)}, + call_afterwards=self.after_pitch)) + + hidden = True if self.hw in [XO1, XO15, XO175, XO4, XO30] and self._status: + # Calibration based on http://bugs.sugarlabs.org/ticket/4649 if self.hw == XO1: self.voltage_gain = 0.000022 self.voltage_bias = 1.14 elif self.hw == XO15: self.voltage_gain = -0.00015 self.voltage_bias = 1.70 - elif self.hw in [XO175, XO4]: # recalibrate in light of #3675? - self.voltage_gain = 0.000071 - self.voltage_bias = 0.55 + elif self.hw == XO175: # Range 0.01V to 3.01V + self.voltage_gain = 0.0000516 + self.voltage_bias = 1.3598 + elif self.hw == XO4: # Range 0.17V to 3.08V + self.voltage_gain = 0.0004073 + self.voltage_bias = 1.6289 else: # XO 3.0 self.voltage_gain = 0.000077 self.voltage_bias = 0.72 - palette.add_block('resistance', - style='box-style', - label=_('resistance'), - help_string=_('microphone input resistance'), - value_block=True, - prim_name='resistance') - palette.add_block('voltage', - style='box-style', - label=_('voltage'), - help_string=_('microphone input voltage'), - value_block=True, - prim_name='voltage') - else: - palette.add_block('resistance', - hidden=True, - style='box-style', - label=_('resistance'), - help_string=_('microphone input resistance'), - prim_name='resistance') - palette.add_block('voltage', - hidden=True, - style='box-style', - label=_('voltage'), - help_string=_('microphone input voltage'), - prim_name='voltage') - - # FIXME: Only add stereo capture for XO15 (broken on ARM #3675) + hidden = False + + palette.add_block('resistance', + hidden=hidden, + style='box-style', + label=_('resistance'), + help_string=_('microphone input resistance'), + prim_name='resistance') + palette.add_block('voltage', + hidden=hidden, + style='box-style', + label=_('voltage'), + help_string=_('microphone input voltage'), + prim_name='voltage') + + hidden = True + # Only add stereo capture for XO15 (broken on ARM #3675) if self.hw in [XO15] and self._status: - palette.add_block('resistance2', - style='box-style', - label=_('resistance') + '2', - help_string=_('microphone input resistance'), - value_block=True, - prim_name='resistance2') - palette.add_block('voltage2', - style='box-style', - label=_('voltage') + '2', - help_string=_('microphone input voltage'), - value_block=True, - prim_name='voltage2') - else: - palette.add_block('resistance2', - hidden=True, - style='box-style', - label=_('resistance') + '2', - help_string=_('microphone input resistance'), - prim_name='resistance2') - palette.add_block('voltage2', - hidden=True, - style='box-style', - label=_('voltage') + '2', - help_string=_('microphone input voltage'), - prim_name='voltage2') + hidden = False + + palette.add_block('resistance2', + hidden=hidden, + style='box-style', + label=_('resistance') + '2', + help_string=_('microphone input resistance'), + prim_name='resistance2') + palette.add_block('voltage2', + hidden=hidden, + style='box-style', + label=_('voltage') + '2', + help_string=_('microphone input voltage'), + prim_name='voltage2') self._parent.lc.def_prim( 'resistance', 0, - lambda self: primitive_dictionary['resistance'](0)) + Primitive(self.prim_resistance, + return_type=TYPE_NUMBER, + kwarg_descs={'channel': ConstantArg(0)}, + call_afterwards=self.after_resistance)) self._parent.lc.def_prim( - 'voltage', 0, lambda self: primitive_dictionary['voltage'](0)) + 'voltage', 0, + Primitive(self.prim_voltage, + return_type=TYPE_NUMBER, + kwarg_descs={'channel': ConstantArg(0)}, + call_afterwards=self.after_voltage)) self._parent.lc.def_prim( 'resistance2', 0, - lambda self: primitive_dictionary['resistance'](1)) + Primitive(self.prim_resistance, + return_type=TYPE_NUMBER, + kwarg_descs={'channel': ConstantArg(1)}, + call_afterwards=self.after_resistance)) self._parent.lc.def_prim( - 'voltage2', 0, lambda self: primitive_dictionary['voltage'](1)) + 'voltage2', 0, + Primitive(self.prim_voltage, + return_type=TYPE_NUMBER, + kwarg_descs={'channel': ConstantArg(1)}, + call_afterwards=self.after_voltage)) - self.audio_started = False if self.hw in [XO175, XO30, XO4]: self.PARAMETERS = { SENSOR_AC_BIAS: (False, True, 80, True), @@ -240,6 +235,11 @@ class Audio_sensors(Plugin): ''' Start grabbing audio if there is an audio block in use ''' if not self._status: return + self._sound = [0, 0] + self._volume = [0, 0] + self._pitch = [0, 0] + self._resistance = [0, 0] + self._voltage = [0, 0] if self.audio_started: self.audiograb.stop_grabbing() if len(self._parent.block_list.get_similar_blocks( @@ -292,177 +292,181 @@ class Audio_sensors(Plugin): self._parent.running_sugar) return self._status - # Block primitives used in talogo + # Block primitives - def prim_volume(self, channel): + def prim_sound(self, channel=0): if not self._status: return 0 + self._prim_sound(0) # Return average of both channels if sampling in stereo if self._channels == 2: - chan0 = self._prim_volume(0) - chan1 = self._prim_volume(1) - return (chan0 + chan1) / 2 + self._prim_sound(1) + return (self._sound[0] + self._sound[1]) / 2.0 else: - return self._prim_volume(0) + return self._sound[0] - def _prim_volume(self, channel): - ''' return mic in value ''' + def _prim_sound(self, channel): + ''' return raw mic in value ''' buf = self.ringbuffer[channel].read(None, self.input_step) if len(buf) > 0: - volume = float(_avg(buf, abs_value=True)) - self._parent.lc.update_label_value('volume', volume) - return volume + self._sound[channel] = float(buf[0]) else: - return 0 + self._sound[channel] = 0 + + def after_sound(self, channel=0): + if self._parent.lc.update_values: + self._parent.lc.update_label_value('sound', self._sound[channel]) - def prim_sound(self, channel): + def prim_volume(self, channel=0): if not self._status: return 0 + self._prim_volume(0) # Return average of both channels if sampling in stereo if self._channels == 2: - chan0 = self._prim_sound(0) - chan1 = self._prim_sound(1) - return (chan0 + chan1) / 2 + self._prim_volume(1) + return (self._volume[0] + self._volume[1]) / 2.0 else: - return self._prim_sound(0) + return self._volume[0] - def _prim_sound(self, channel): + def _prim_volume(self, channel): ''' return raw mic in value ''' buf = self.ringbuffer[channel].read(None, self.input_step) if len(buf) > 0: - sound = float(buf[0]) - if self._parent.lc.update_values: - self._parent.lc.update_label_value('sound', sound) - return sound + self._volume[channel] = float(_avg(buf, abs_value=True)) else: - return 0 + self._volume[channel] = 0 + + def after_volume(self, channel=0): + if self._parent.lc.update_values: + self._parent.lc.update_label_value('volume', self._volume[channel]) - def prim_pitch(self, channel): - if not PITCH_AVAILABLE or not self._status: + def prim_pitch(self, channel=0): + if not self._status: return 0 + self._prim_pitch(0) # Return average of both channels if sampling in stereo if self._channels == 2: - chan0 = self._prim_pitch(0) - chan1 = self._prim_pitch(1) - return (chan0 + chan1) / 2 + self._prim_pitch(1) + return (self._pitch[0] + self._pitch[1]) / 2.0 else: - return self._prim_pitch(0) + return self._pitch[0] def _prim_pitch(self, channel): - ''' return index of max value in fft of mic in values ''' + ''' return raw mic in value ''' buf = self.ringbuffer[channel].read(None, self.input_step) if len(buf) > 0: buf = rfft(buf) buf = abs(buf) maxi = buf.argmax() if maxi == 0: - pitch = 0 + self._pitch[channel] = 0 else: # Simple interpolation a, b, c = buf[maxi - 1], buf[maxi], buf[maxi + 1] maxi -= a / float(a + b + c) maxi += c / float(a + b + c) - pitch = maxi * 48000 / (len(buf) * 2) - - if self._parent.lc.update_values: - self._parent.lc.update_label_value('pitch', pitch) - return pitch + self._pitch[channel] = maxi * 48000 / (len(buf) * 2) else: - return 0 + self._pitch[channel] = 0 + + def after_pitch(self, channel=0): + if self._parent.lc.update_values: + self._parent.lc.update_label_value('pitch', self._pitch[channel]) - def prim_resistance(self, channel): + def prim_resistance(self, channel=0): if not self.hw in [XO1, XO15, XO175, XO30, XO4] or not self._status: return 0 if self.hw in [XO1, XO4]: - resistance = self._prim_resistance(0) - if self._parent.lc.update_values: - self._update_resistance_labels(0, resistance) - return resistance + self._prim_resistance(0) + return self._resistance[0] elif self.hw == XO15: - resistance = self._prim_resistance(channel) - if self._parent.lc.update_values: - self._update_resistance_labels(channel, resistance) - return resistance - # FIXME: For XO175: channel assignment is seemingly random - # (#3675), so sum both channels (one of them will be 0) + self._prim_resistance(channel) + return self._resistance[channel] + # For XO175: channel assignment is seemingly random + # (#3675), one of them will be 0 else: - chan0 = self._prim_resistance(0) - chan1 = self._prim_resistance(1) - resistance = chan0 + chan1 - if self._parent.lc.update_values: - self._update_resistance_labels(0, resistance) - return resistance + self._prim_resistance(0) + if self._resistance[0] != 999999999: + return self._resistance[0] + else: + self._prim_resistance(1) + return self._resistance[1] def _prim_resistance(self, channel): ''' return resistance sensor value ''' buf = self.ringbuffer[channel].read(None, self.input_step) if len(buf) > 0: - # See - # TODO: test this calibration on XO 1.5, XO 1.75 + # See http://bugs.sugarlabs.org/ticket/552#comment:7 + # and http://bugs.sugarlabs.org/ticket/4649 avg_buf = float(_avg(buf)) if self.hw == XO1: - resistance = 2.718 ** ((avg_buf * 0.000045788) + 8.0531) + self._resistance[channel] = \ + 2.718 ** ((avg_buf * 0.000045788) + 8.0531) elif self.hw == XO15: if avg_buf > 0: - resistance = (420000000 / avg_buf) - 13500 + self._resistance[channel] = (420000000 / avg_buf) - 13500 else: - resistance = 420000000 - elif self.hw in [XO175, XO4]: - if avg_buf < 30700: - resistance = .12 * ((180000000 / (30700 - avg_buf)) - 3150) + self._resistance[channel] = 420000000 + elif self.hw == XO175: # Range 0 to inf ohms + if avg_buf < 30519: + self._resistance[channel] = \ + (92000000. / (30519 - avg_buf)) - 1620 else: - resistance = 999999999 + self._resistance[channel] = 999999999 + elif self.hw == XO4: # Range 0 to inf ohms + if avg_buf < 6629: + self._resistance[channel] = \ + (50000000. / (6629 - avg_buf)) - 3175 + else: + self._resistance[channel] = 999999999 else: # XO 3.0 if avg_buf < 30514: - resistance = (46000000 / (30514 - avg_buf)) - 1150 + self._resistance[channel] = \ + (46000000. / (30514 - avg_buf)) - 1150 else: - resistance = 999999999 - if resistance < 0: - resistance = 0 - return resistance + self._resistance[channel] = 999999999 + if self._resistance[channel] < 0: + self._resistance[channel] = 0 else: - return 0 + self._resistance[channel] = 0 - def _update_resistance_labels(self, channel, resistance): - if channel == 0: - self._parent.lc.update_label_value('resistance', resistance) - else: - self._parent.lc.update_label_value('resistance2', resistance) + def after_resistance(self, channel=0): + if self._parent.lc.update_values: + self._parent.lc.update_label_value( + ['resistance', 'resistance2'][channel], + self._resistance[channel]) - def prim_voltage(self, channel): + def prim_voltage(self, channel=0): if not self.hw in [XO1, XO15, XO175, XO30, XO4] or not self._status: return 0 if self.hw in [XO1, XO4]: - voltage = self._prim_voltage(0) - if self._parent.lc.update_values: - self._update_voltage_labels(0, voltage) - return voltage + self._prim_voltage(0) + return self._voltage[0] elif self.hw == XO15: - voltage = self._prim_voltage(channel) - if self._parent.lc.update_values: - self._update_voltage_labels(channel, voltage) - return voltage + self._prim_voltage(channel) + return self._voltage[channel] # FIXME: For XO175: channel assignment is seemingly random - # (#3675), so sum both channels (one of them will be 0) + # (#3675), one of them will be 0 else: - chan0 = self._prim_voltage(0) - chan1 = self._prim_voltage(1) - voltage = chan0 + chan1 - if self._parent.lc.update_values: - self._update_voltage_labels(0, voltage) - return voltage + self._prim_voltage(0) + if self._voltage[0] != 0: + return self._voltage[0] + else: + self._prim_voltage(1) + return self._voltage[1] def _prim_voltage(self, channel): ''' return voltage sensor value ''' buf = self.ringbuffer[channel].read(None, self.input_step) + buf = self.ringbuffer[channel].read(None, self.input_step) if len(buf) > 0: # See - voltage = float(_avg(buf)) * self.voltage_gain + self.voltage_bias - return voltage + self._voltage[channel] = \ + float(_avg(buf)) * self.voltage_gain + self.voltage_bias else: - return 0 + self._voltage[channel] = 0 - def _update_voltage_labels(self, channel, voltage): - if channel == 0: - self._parent.lc.update_label_value('voltage', voltage) - else: - self._parent.lc.update_label_value('voltage2', voltage) + def after_voltage(self, channel=0): + if self._parent.lc.update_values: + self._parent.lc.update_label_value( + ['voltage', 'voltage2'][channel], + self._voltage[channel]) -- cgit v0.9.1