Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/plugins/audio_sensors/audio_sensors.py
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/audio_sensors/audio_sensors.py')
-rw-r--r--plugins/audio_sensors/audio_sensors.py432
1 files changed, 218 insertions, 214 deletions
diff --git a/plugins/audio_sensors/audio_sensors.py b/plugins/audio_sensors/audio_sensors.py
index 8d45395..d62ca65 100644
--- a/plugins/audio_sensors/audio_sensors.py
+++ b/plugins/audio_sensors/audio_sensors.py
@@ -18,7 +18,6 @@
from gettext import gettext as _
try:
- from numpy import append
from numpy.fft import rfft
PITCH_AVAILABLE = True
except:
@@ -33,8 +32,9 @@ from plugins.audio_sensors.ringbuffer import RingBuffer1d
from TurtleArt.tapalette import make_palette
from TurtleArt.taconstants import XO1, XO15, XO175, XO30, XO4
-from TurtleArt.talogo import primitive_dictionary
from TurtleArt.tautils import debug_output
+from TurtleArt.taprimitive import (ConstantArg, Primitive)
+from TurtleArt.tatype import TYPE_NUMBER
import logging
_logger = logging.getLogger('turtleart-activity audio sensors plugin')
@@ -57,7 +57,9 @@ def _avg(array, abs_value=False):
class Audio_sensors(Plugin):
def __init__(self, parent):
+ Plugin.__init__(self)
self._parent = parent
+ self.audio_started = False
self._status = True # TODO: test for audio device
# These flags are referenced by audiograb
self.hw = self._parent.hw
@@ -65,152 +67,145 @@ class Audio_sensors(Plugin):
def setup(self):
''' set up audio-sensor-specific blocks '''
+ self._sound = [0, 0]
+ self._volume = [0, 0]
+ self._pitch = [0, 0]
+ self._resistance = [0, 0]
+ self._voltage = [0, 0]
self.max_samples = 1500
self.input_step = 1
-
self.ringbuffer = []
palette = make_palette('sensor',
colors=["#FF6060", "#A06060"],
help_string=_('Palette of sensor blocks'),
position=6)
-
- primitive_dictionary['sound'] = self.prim_sound
- primitive_dictionary['volume'] = self.prim_volume
+ hidden = True
if self._status:
- palette.add_block('sound',
- style='box-style',
- label=_('sound'),
- help_string=_('raw microphone input signal'),
- value_block=True,
- prim_name='sound')
-
- palette.add_block('volume',
- style='box-style',
- label=_('loudness'),
- help_string=_('microphone input volume'),
- value_block=True,
- prim_name='volume')
- else:
- palette.add_block('sound',
- hidden=True,
- style='box-style',
- label=_('sound'),
- help_string=_('raw microphone input signal'),
- value_block=True,
- prim_name='sound')
- palette.add_block('volume',
- hidden=True,
- style='box-style',
- label=_('loudness'),
- help_string=_('microphone input volume'),
- value_block=True,
- prim_name='volume')
+ hidden = False
+
+ palette.add_block('sound',
+ hidden=hidden,
+ style='box-style',
+ label=_('sound'),
+ help_string=_('raw microphone input signal'),
+ value_block=True,
+ prim_name='sound')
+ palette.add_block('volume',
+ hidden=hidden,
+ style='box-style',
+ label=_('loudness'),
+ help_string=_('microphone input volume'),
+ value_block=True,
+ prim_name='volume')
self._parent.lc.def_prim(
- 'sound', 0, lambda self: primitive_dictionary['sound'](0))
+ 'sound', 0,
+ Primitive(self.prim_sound,
+ return_type=TYPE_NUMBER,
+ kwarg_descs={'channel': ConstantArg(0)},
+ call_afterwards=self.after_sound))
+
self._parent.lc.def_prim(
- 'volume', 0, lambda self: primitive_dictionary['volume'](0))
+ 'volume', 0,
+ Primitive(self.prim_volume,
+ return_type=TYPE_NUMBER,
+ kwarg_descs={'channel': ConstantArg(0)},
+ call_afterwards=self.after_volume))
- primitive_dictionary['pitch'] = self.prim_pitch
+ hidden = True
if PITCH_AVAILABLE and self._status:
- palette.add_block('pitch',
- style='box-style',
- label=_('pitch'),
- help_string=_('microphone input pitch'),
- value_block=True,
- prim_name='pitch')
- else:
- palette.add_block('pitch',
- hidden=True,
- style='box-style',
- label=_('pitch'),
- help_string=_('microphone input pitch'),
- value_block=True,
- prim_name='pitch')
- self._parent.lc.def_prim('pitch', 0,
- lambda self: primitive_dictionary['pitch'](0))
-
- primitive_dictionary['resistance'] = self.prim_resistance
- primitive_dictionary['voltage'] = self.prim_voltage
+ hidden = False
+
+ palette.add_block('pitch',
+ hidden=hidden,
+ style='box-style',
+ label=_('pitch'),
+ help_string=_('microphone input pitch'),
+ value_block=True,
+ prim_name='pitch')
+ self._parent.lc.def_prim(
+ 'pitch', 0,
+ Primitive(self.prim_pitch,
+ return_type=TYPE_NUMBER,
+ kwarg_descs={'channel': ConstantArg(0)},
+ call_afterwards=self.after_pitch))
+
+ hidden = True
if self.hw in [XO1, XO15, XO175, XO4, XO30] and self._status:
+ # Calibration based on http://bugs.sugarlabs.org/ticket/4649
if self.hw == XO1:
self.voltage_gain = 0.000022
self.voltage_bias = 1.14
elif self.hw == XO15:
self.voltage_gain = -0.00015
self.voltage_bias = 1.70
- elif self.hw in [XO175, XO4]: # recalibrate in light of #3675?
- self.voltage_gain = 0.000071
- self.voltage_bias = 0.55
+ elif self.hw == XO175: # Range 0.01V to 3.01V
+ self.voltage_gain = 0.0000516
+ self.voltage_bias = 1.3598
+ elif self.hw == XO4: # Range 0.17V to 3.08V
+ self.voltage_gain = 0.0004073
+ self.voltage_bias = 1.6289
else: # XO 3.0
self.voltage_gain = 0.000077
self.voltage_bias = 0.72
- palette.add_block('resistance',
- style='box-style',
- label=_('resistance'),
- help_string=_('microphone input resistance'),
- value_block=True,
- prim_name='resistance')
- palette.add_block('voltage',
- style='box-style',
- label=_('voltage'),
- help_string=_('microphone input voltage'),
- value_block=True,
- prim_name='voltage')
- else:
- palette.add_block('resistance',
- hidden=True,
- style='box-style',
- label=_('resistance'),
- help_string=_('microphone input resistance'),
- prim_name='resistance')
- palette.add_block('voltage',
- hidden=True,
- style='box-style',
- label=_('voltage'),
- help_string=_('microphone input voltage'),
- prim_name='voltage')
-
- # FIXME: Only add stereo capture for XO15 (broken on ARM #3675)
+ hidden = False
+
+ palette.add_block('resistance',
+ hidden=hidden,
+ style='box-style',
+ label=_('resistance'),
+ help_string=_('microphone input resistance'),
+ prim_name='resistance')
+ palette.add_block('voltage',
+ hidden=hidden,
+ style='box-style',
+ label=_('voltage'),
+ help_string=_('microphone input voltage'),
+ prim_name='voltage')
+
+ hidden = True
+ # Only add stereo capture for XO15 (broken on ARM #3675)
if self.hw in [XO15] and self._status:
- palette.add_block('resistance2',
- style='box-style',
- label=_('resistance') + '2',
- help_string=_('microphone input resistance'),
- value_block=True,
- prim_name='resistance2')
- palette.add_block('voltage2',
- style='box-style',
- label=_('voltage') + '2',
- help_string=_('microphone input voltage'),
- value_block=True,
- prim_name='voltage2')
- else:
- palette.add_block('resistance2',
- hidden=True,
- style='box-style',
- label=_('resistance') + '2',
- help_string=_('microphone input resistance'),
- prim_name='resistance2')
- palette.add_block('voltage2',
- hidden=True,
- style='box-style',
- label=_('voltage') + '2',
- help_string=_('microphone input voltage'),
- prim_name='voltage2')
+ hidden = False
+
+ palette.add_block('resistance2',
+ hidden=hidden,
+ style='box-style',
+ label=_('resistance') + '2',
+ help_string=_('microphone input resistance'),
+ prim_name='resistance2')
+ palette.add_block('voltage2',
+ hidden=hidden,
+ style='box-style',
+ label=_('voltage') + '2',
+ help_string=_('microphone input voltage'),
+ prim_name='voltage2')
self._parent.lc.def_prim(
'resistance', 0,
- lambda self: primitive_dictionary['resistance'](0))
+ Primitive(self.prim_resistance,
+ return_type=TYPE_NUMBER,
+ kwarg_descs={'channel': ConstantArg(0)},
+ call_afterwards=self.after_resistance))
self._parent.lc.def_prim(
- 'voltage', 0, lambda self: primitive_dictionary['voltage'](0))
+ 'voltage', 0,
+ Primitive(self.prim_voltage,
+ return_type=TYPE_NUMBER,
+ kwarg_descs={'channel': ConstantArg(0)},
+ call_afterwards=self.after_voltage))
self._parent.lc.def_prim(
'resistance2', 0,
- lambda self: primitive_dictionary['resistance'](1))
+ Primitive(self.prim_resistance,
+ return_type=TYPE_NUMBER,
+ kwarg_descs={'channel': ConstantArg(1)},
+ call_afterwards=self.after_resistance))
self._parent.lc.def_prim(
- 'voltage2', 0, lambda self: primitive_dictionary['voltage'](1))
+ 'voltage2', 0,
+ Primitive(self.prim_voltage,
+ return_type=TYPE_NUMBER,
+ kwarg_descs={'channel': ConstantArg(1)},
+ call_afterwards=self.after_voltage))
- self.audio_started = False
if self.hw in [XO175, XO30, XO4]:
self.PARAMETERS = {
SENSOR_AC_BIAS: (False, True, 80, True),
@@ -240,6 +235,11 @@ class Audio_sensors(Plugin):
''' Start grabbing audio if there is an audio block in use '''
if not self._status:
return
+ self._sound = [0, 0]
+ self._volume = [0, 0]
+ self._pitch = [0, 0]
+ self._resistance = [0, 0]
+ self._voltage = [0, 0]
if self.audio_started:
self.audiograb.stop_grabbing()
if len(self._parent.block_list.get_similar_blocks(
@@ -292,177 +292,181 @@ class Audio_sensors(Plugin):
self._parent.running_sugar)
return self._status
- # Block primitives used in talogo
+ # Block primitives
- def prim_volume(self, channel):
+ def prim_sound(self, channel=0):
if not self._status:
return 0
+ self._prim_sound(0)
# Return average of both channels if sampling in stereo
if self._channels == 2:
- chan0 = self._prim_volume(0)
- chan1 = self._prim_volume(1)
- return (chan0 + chan1) / 2
+ self._prim_sound(1)
+ return (self._sound[0] + self._sound[1]) / 2.0
else:
- return self._prim_volume(0)
+ return self._sound[0]
- def _prim_volume(self, channel):
- ''' return mic in value '''
+ def _prim_sound(self, channel):
+ ''' return raw mic in value '''
buf = self.ringbuffer[channel].read(None, self.input_step)
if len(buf) > 0:
- volume = float(_avg(buf, abs_value=True))
- self._parent.lc.update_label_value('volume', volume)
- return volume
+ self._sound[channel] = float(buf[0])
else:
- return 0
+ self._sound[channel] = 0
+
+ def after_sound(self, channel=0):
+ if self._parent.lc.update_values:
+ self._parent.lc.update_label_value('sound', self._sound[channel])
- def prim_sound(self, channel):
+ def prim_volume(self, channel=0):
if not self._status:
return 0
+ self._prim_volume(0)
# Return average of both channels if sampling in stereo
if self._channels == 2:
- chan0 = self._prim_sound(0)
- chan1 = self._prim_sound(1)
- return (chan0 + chan1) / 2
+ self._prim_volume(1)
+ return (self._volume[0] + self._volume[1]) / 2.0
else:
- return self._prim_sound(0)
+ return self._volume[0]
- def _prim_sound(self, channel):
+ def _prim_volume(self, channel):
''' return raw mic in value '''
buf = self.ringbuffer[channel].read(None, self.input_step)
if len(buf) > 0:
- sound = float(buf[0])
- if self._parent.lc.update_values:
- self._parent.lc.update_label_value('sound', sound)
- return sound
+ self._volume[channel] = float(_avg(buf, abs_value=True))
else:
- return 0
+ self._volume[channel] = 0
+
+ def after_volume(self, channel=0):
+ if self._parent.lc.update_values:
+ self._parent.lc.update_label_value('volume', self._volume[channel])
- def prim_pitch(self, channel):
- if not PITCH_AVAILABLE or not self._status:
+ def prim_pitch(self, channel=0):
+ if not self._status:
return 0
+ self._prim_pitch(0)
# Return average of both channels if sampling in stereo
if self._channels == 2:
- chan0 = self._prim_pitch(0)
- chan1 = self._prim_pitch(1)
- return (chan0 + chan1) / 2
+ self._prim_pitch(1)
+ return (self._pitch[0] + self._pitch[1]) / 2.0
else:
- return self._prim_pitch(0)
+ return self._pitch[0]
def _prim_pitch(self, channel):
- ''' return index of max value in fft of mic in values '''
+ ''' return raw mic in value '''
buf = self.ringbuffer[channel].read(None, self.input_step)
if len(buf) > 0:
buf = rfft(buf)
buf = abs(buf)
maxi = buf.argmax()
if maxi == 0:
- pitch = 0
+ self._pitch[channel] = 0
else: # Simple interpolation
a, b, c = buf[maxi - 1], buf[maxi], buf[maxi + 1]
maxi -= a / float(a + b + c)
maxi += c / float(a + b + c)
- pitch = maxi * 48000 / (len(buf) * 2)
-
- if self._parent.lc.update_values:
- self._parent.lc.update_label_value('pitch', pitch)
- return pitch
+ self._pitch[channel] = maxi * 48000 / (len(buf) * 2)
else:
- return 0
+ self._pitch[channel] = 0
+
+ def after_pitch(self, channel=0):
+ if self._parent.lc.update_values:
+ self._parent.lc.update_label_value('pitch', self._pitch[channel])
- def prim_resistance(self, channel):
+ def prim_resistance(self, channel=0):
if not self.hw in [XO1, XO15, XO175, XO30, XO4] or not self._status:
return 0
if self.hw in [XO1, XO4]:
- resistance = self._prim_resistance(0)
- if self._parent.lc.update_values:
- self._update_resistance_labels(0, resistance)
- return resistance
+ self._prim_resistance(0)
+ return self._resistance[0]
elif self.hw == XO15:
- resistance = self._prim_resistance(channel)
- if self._parent.lc.update_values:
- self._update_resistance_labels(channel, resistance)
- return resistance
- # FIXME: For XO175: channel assignment is seemingly random
- # (#3675), so sum both channels (one of them will be 0)
+ self._prim_resistance(channel)
+ return self._resistance[channel]
+ # For XO175: channel assignment is seemingly random
+ # (#3675), one of them will be 0
else:
- chan0 = self._prim_resistance(0)
- chan1 = self._prim_resistance(1)
- resistance = chan0 + chan1
- if self._parent.lc.update_values:
- self._update_resistance_labels(0, resistance)
- return resistance
+ self._prim_resistance(0)
+ if self._resistance[0] != 999999999:
+ return self._resistance[0]
+ else:
+ self._prim_resistance(1)
+ return self._resistance[1]
def _prim_resistance(self, channel):
''' return resistance sensor value '''
buf = self.ringbuffer[channel].read(None, self.input_step)
if len(buf) > 0:
- # See <http://bugs.sugarlabs.org/ticket/552#comment:7>
- # TODO: test this calibration on XO 1.5, XO 1.75
+ # See http://bugs.sugarlabs.org/ticket/552#comment:7
+ # and http://bugs.sugarlabs.org/ticket/4649
avg_buf = float(_avg(buf))
if self.hw == XO1:
- resistance = 2.718 ** ((avg_buf * 0.000045788) + 8.0531)
+ self._resistance[channel] = \
+ 2.718 ** ((avg_buf * 0.000045788) + 8.0531)
elif self.hw == XO15:
if avg_buf > 0:
- resistance = (420000000 / avg_buf) - 13500
+ self._resistance[channel] = (420000000 / avg_buf) - 13500
else:
- resistance = 420000000
- elif self.hw in [XO175, XO4]:
- if avg_buf < 30700:
- resistance = .12 * ((180000000 / (30700 - avg_buf)) - 3150)
+ self._resistance[channel] = 420000000
+ elif self.hw == XO175: # Range 0 to inf ohms
+ if avg_buf < 30519:
+ self._resistance[channel] = \
+ (92000000. / (30519 - avg_buf)) - 1620
else:
- resistance = 999999999
+ self._resistance[channel] = 999999999
+ elif self.hw == XO4: # Range 0 to inf ohms
+ if avg_buf < 6629:
+ self._resistance[channel] = \
+ (50000000. / (6629 - avg_buf)) - 3175
+ else:
+ self._resistance[channel] = 999999999
else: # XO 3.0
if avg_buf < 30514:
- resistance = (46000000 / (30514 - avg_buf)) - 1150
+ self._resistance[channel] = \
+ (46000000. / (30514 - avg_buf)) - 1150
else:
- resistance = 999999999
- if resistance < 0:
- resistance = 0
- return resistance
+ self._resistance[channel] = 999999999
+ if self._resistance[channel] < 0:
+ self._resistance[channel] = 0
else:
- return 0
+ self._resistance[channel] = 0
- def _update_resistance_labels(self, channel, resistance):
- if channel == 0:
- self._parent.lc.update_label_value('resistance', resistance)
- else:
- self._parent.lc.update_label_value('resistance2', resistance)
+ def after_resistance(self, channel=0):
+ if self._parent.lc.update_values:
+ self._parent.lc.update_label_value(
+ ['resistance', 'resistance2'][channel],
+ self._resistance[channel])
- def prim_voltage(self, channel):
+ def prim_voltage(self, channel=0):
if not self.hw in [XO1, XO15, XO175, XO30, XO4] or not self._status:
return 0
if self.hw in [XO1, XO4]:
- voltage = self._prim_voltage(0)
- if self._parent.lc.update_values:
- self._update_voltage_labels(0, voltage)
- return voltage
+ self._prim_voltage(0)
+ return self._voltage[0]
elif self.hw == XO15:
- voltage = self._prim_voltage(channel)
- if self._parent.lc.update_values:
- self._update_voltage_labels(channel, voltage)
- return voltage
+ self._prim_voltage(channel)
+ return self._voltage[channel]
# FIXME: For XO175: channel assignment is seemingly random
- # (#3675), so sum both channels (one of them will be 0)
+ # (#3675), one of them will be 0
else:
- chan0 = self._prim_voltage(0)
- chan1 = self._prim_voltage(1)
- voltage = chan0 + chan1
- if self._parent.lc.update_values:
- self._update_voltage_labels(0, voltage)
- return voltage
+ self._prim_voltage(0)
+ if self._voltage[0] != 0:
+ return self._voltage[0]
+ else:
+ self._prim_voltage(1)
+ return self._voltage[1]
def _prim_voltage(self, channel):
''' return voltage sensor value '''
buf = self.ringbuffer[channel].read(None, self.input_step)
+ buf = self.ringbuffer[channel].read(None, self.input_step)
if len(buf) > 0:
# See <http://bugs.sugarlabs.org/ticket/552#comment:7>
- voltage = float(_avg(buf)) * self.voltage_gain + self.voltage_bias
- return voltage
+ self._voltage[channel] = \
+ float(_avg(buf)) * self.voltage_gain + self.voltage_bias
else:
- return 0
+ self._voltage[channel] = 0
- def _update_voltage_labels(self, channel, voltage):
- if channel == 0:
- self._parent.lc.update_label_value('voltage', voltage)
- else:
- self._parent.lc.update_label_value('voltage2', voltage)
+ def after_voltage(self, channel=0):
+ if self._parent.lc.update_values:
+ self._parent.lc.update_label_value(
+ ['voltage', 'voltage2'][channel],
+ self._voltage[channel])