Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/audio_sensors/audio_sensors.py116
-rw-r--r--plugins/audio_sensors/audiograb.py670
2 files changed, 461 insertions, 325 deletions
diff --git a/plugins/audio_sensors/audio_sensors.py b/plugins/audio_sensors/audio_sensors.py
index bf28776..7e2d5a9 100644
--- a/plugins/audio_sensors/audio_sensors.py
+++ b/plugins/audio_sensors/audio_sensors.py
@@ -27,7 +27,7 @@ except:
from plugins.plugin import Plugin
from plugins.audio_sensors.audiograb import AudioGrab_Unknown, AudioGrab_XO1, \
- AudioGrab_XO15, SENSOR_DC_NO_BIAS, SENSOR_DC_BIAS
+ AudioGrab_XO15, AudioGrab_XO175, SENSOR_DC_NO_BIAS, SENSOR_DC_BIAS
from plugins.audio_sensors.ringbuffer import RingBuffer1d
@@ -68,7 +68,7 @@ class Audio_sensors(Plugin):
self.max_samples = 1500
self.input_step = 1
- self.ringbuffer = RingBuffer1d(self.max_samples, dtype='int16')
+ self.ringbuffer = []
palette = make_palette('sensor',
colors=["#FF6060", "#A06060"],
@@ -107,9 +107,9 @@ class Audio_sensors(Plugin):
prim_name='volume')
self._parent.lc.def_prim(
- 'sound', 0, lambda self: primitive_dictionary['sound']())
+ 'sound', 0, lambda self: primitive_dictionary['sound'](0))
self._parent.lc.def_prim(
- 'volume', 0, lambda self: primitive_dictionary['volume']())
+ 'volume', 0, lambda self: primitive_dictionary['volume'](0))
primitive_dictionary['pitch'] = self.prim_pitch
if PITCH_AVAILABLE and self._status:
@@ -128,18 +128,20 @@ class Audio_sensors(Plugin):
value_block=True,
prim_name='pitch')
self._parent.lc.def_prim('pitch', 0,
- lambda self: primitive_dictionary['pitch']())
+ lambda self: primitive_dictionary['pitch'](0))
primitive_dictionary['resistance'] = self.prim_resistance
primitive_dictionary['voltage'] = self.prim_voltage
- # FIXME: XO175 drivers don't work yet
- if self.hw in [XO1, XO15] and self._status:
+ if self.hw in [XO1, XO15, XO175] and self._status:
if self.hw == XO1:
self.voltage_gain = 0.00002225
self.voltage_bias = 1.140
elif self.hw == XO15:
self.voltage_gain = -0.0001471
self.voltage_bias = 1.695
+ else: # FIXME: Calibrate 1.75
+ self.voltage_gain = -0.0001471
+ self.voltage_bias = 1.695
palette.add_block('resistance',
style='box-style',
label=_('resistance'),
@@ -152,6 +154,18 @@ class Audio_sensors(Plugin):
help_string=_('microphone input voltage'),
value_block=True,
prim_name='voltage')
+ palette.add_block('resistance',
+ style='box-style',
+ label=_('resistance') + '2',
+ help_string=_('microphone input resistance'),
+ value_block=True,
+ prim_name='resistance2')
+ palette.add_block('voltage',
+ style='box-style',
+ label=_('voltage') + '2',
+ help_string=_('microphone input voltage'),
+ value_block=True,
+ prim_name='voltage2')
else:
palette.add_block('resistance',
hidden=True,
@@ -166,11 +180,31 @@ class Audio_sensors(Plugin):
label=_('voltage'),
help_string=_('microphone input voltage'),
value_block=True,
- prim_name='resistance')
+ prim_name='voltage')
+ palette.add_block('resistance',
+ hidden=True,
+ style='box-style',
+ label=_('resistance') + '2',
+ help_string=_('microphone input resistance'),
+ value_block=True,
+ prim_name='resistance2')
+ palette.add_block('voltage',
+ hidden=True,
+ style='box-style',
+ label=_('voltage') + '2',
+ help_string=_('microphone input voltage'),
+ value_block=True,
+ prim_name='voltage2')
+ self._parent.lc.def_prim(
+ 'resistance', 0,
+ lambda self: primitive_dictionary['resistance'](0))
+ self._parent.lc.def_prim(
+ 'voltage', 0, lambda self: primitive_dictionary['voltage'](0))
self._parent.lc.def_prim(
- 'resistance', 0, lambda self: primitive_dictionary['resistance']())
+ 'resistance2', 0,
+ lambda self: primitive_dictionary['resistance'](1))
self._parent.lc.def_prim(
- 'voltage', 0, lambda self: primitive_dictionary['voltage']())
+ 'voltage2', 0, lambda self: primitive_dictionary['voltage'](1))
self.audio_started = False
@@ -184,7 +218,9 @@ class Audio_sensors(Plugin):
if self.audio_started:
self.audiograb.resume_grabbing()
else:
- if self.hw == XO15:
+ if self.hw == XO175:
+ self.audiograb = AudioGrab_XO175(self.new_buffer, self)
+ elif self.hw == XO15:
self.audiograb = AudioGrab_XO15(self.new_buffer, self)
elif self.hw == XO1:
self.audiograb = AudioGrab_XO1(self.new_buffer, self)
@@ -192,11 +228,17 @@ class Audio_sensors(Plugin):
self.audiograb = AudioGrab_Unknown(self.new_buffer, self)
self.audiograb.start_grabbing()
self.audio_started = True
+
+ self._channels = self.audiograb.channels
+ for i in range(self._channels):
+ self.ringbuffer.append(RingBuffer1d(self.max_samples,
+ dtype='int16'))
+
self._update_audio_mode()
- def new_buffer(self, buf):
- ''' Append a new buffer to the ringbuffer '''
- self.ringbuffer.append(buf)
+ def new_buffer(self, buf, channel=0):
+ """ Append a new buffer to the ringbuffer """
+ self.ringbuffer[channel].append(buf)
return True
def _update_audio_mode(self):
@@ -208,14 +250,16 @@ class Audio_sensors(Plugin):
if len(self._parent.lc.value_blocks_to_update[name]) > 0:
self.audiograb.set_sensor_type()
return
- if 'resistance' in self._parent.lc.value_blocks_to_update:
- if len(self._parent.lc.value_blocks_to_update['resistance']) > 0:
- self.audiograb.set_sensor_type(SENSOR_DC_BIAS)
- return
- if 'voltage' in self._parent.lc.value_blocks_to_update:
- if len(self._parent.lc.value_blocks_to_update['voltage']) > 0:
- self.audiograb.set_sensor_type(SENSOR_DC_NO_BIAS)
- return
+ for name in ['resistance', 'resistance2']:
+ if name in self._parent.lc.value_blocks_to_update:
+ if len(self._parent.lc.value_blocks_to_update[name]) > 0:
+ self.audiograb.set_sensor_type(SENSOR_DC_BIAS)
+ return
+ for name in ['voltage', 'voltage2']:
+ if name in self._parent.lc.value_blocks_to_update:
+ if len(self._parent.lc.value_blocks_to_update[name]) > 0:
+ self.audiograb.set_sensor_type(SENSOR_DC_NO_BIAS)
+ return
def stop(self):
# This gets called by the stop button
@@ -243,12 +287,11 @@ class Audio_sensors(Plugin):
# Block primitives used in talogo
- def prim_volume(self):
+ def prim_volume(self, channel):
''' return mic in value '''
- #TODO: Adjust gain for different HW
if not self._status:
return 0
- buf = self.ringbuffer.read(None, self.input_step)
+ buf = self.ringbuffer[channel].read(None, self.input_step)
if len(buf) > 0:
volume = float(_avg(buf, abs_value=True))
self._parent.lc.update_label_value('volume', volume)
@@ -256,11 +299,11 @@ class Audio_sensors(Plugin):
else:
return 0
- def prim_sound(self):
+ def prim_sound(self, channel):
''' return raw mic in value '''
if not self._status:
return 0
- buf = self.ringbuffer.read(None, self.input_step)
+ buf = self.ringbuffer[channel].read(None, self.input_step)
if len(buf) > 0:
sound = float(buf[0])
self._parent.lc.update_label_value('sound', sound)
@@ -268,13 +311,14 @@ class Audio_sensors(Plugin):
else:
return 0
- def prim_pitch(self):
+ def prim_pitch(self, channel):
''' return index of max value in fft of mic in values '''
if not PITCH_AVAILABLE or not self._status:
return 0
buf = []
for i in range(4):
- buf = append(buf, self.ringbuffer.read(None, self.input_step))
+ buf = append(
+ buf, self.ringbuffer[channel].read(None, self.input_step))
if len(buf) > 0:
r = []
for j in rfft(buf):
@@ -286,14 +330,14 @@ class Audio_sensors(Plugin):
else:
return 0
- def prim_resistance(self):
+ def prim_resistance(self, channel):
''' return resistance sensor value '''
- if not self.hw in [XO1, XO15] or not self._status:
+ if not self.hw in [XO1, XO15, XO175] or not self._status:
return 0
- buf = self.ringbuffer.read(None, self.input_step)
+ buf = self.ringbuffer[channel].read(None, self.input_step)
if len(buf) > 0:
# See <http://bugs.sugarlabs.org/ticket/552#comment:7>
- # TODO: test this calibration on XO 1.5
+ # TODO: test this calibration on XO 1.5, XO 1.75
if self.hw == XO1:
resistance = 2.718 ** ((float(_avg(buf)) * 0.000045788) + \
8.0531)
@@ -308,11 +352,11 @@ class Audio_sensors(Plugin):
else:
return 0
- def prim_voltage(self):
+ def prim_voltage(self, channel):
''' return voltage sensor value '''
- if not self.hw in [XO1, XO15] or not self._status:
+ if not self.hw in [XO1, XO15, XO175] or not self._status:
return 0
- buf = self.ringbuffer.read(None, self.input_step)
+ buf = self.ringbuffer[channel].read(None, self.input_step)
if len(buf) > 0:
# See <http://bugs.sugarlabs.org/ticket/552#comment:7>
voltage = float(_avg(buf)) * self.voltage_gain + self.voltage_bias
diff --git a/plugins/audio_sensors/audiograb.py b/plugins/audio_sensors/audiograb.py
index 84e0c8d..5aa7780 100644
--- a/plugins/audio_sensors/audiograb.py
+++ b/plugins/audio_sensors/audiograb.py
@@ -1,40 +1,33 @@
#! /usr/bin/python
#
-# Author: Arjun Sarwal arjun@laptop.org
-# Copyright (C) 2007, Arjun Sarwal
-# Copyright (C) 2009,10 Walter Bender
-# Copyright (C) 2009, Benjamin Berg, Sebastian Berg
-# Copyright (C) 2009, Sayamindu Dasgupta
-# Copyright (C) 2010, Sascha Silbe
+# Author: Arjun Sarwal arjun@laptop.org
+# Copyright (C) 2007, Arjun Sarwal
+# Copyright (C) 2009-11 Walter Bender
+# Copyright (C) 2009, Benjamin Berg, Sebastian Berg
+# Copyright (C) 2009, Sayamindu Dasgupta
+# Copyright (C) 2010, Sascha Silbe
#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-SENSOR_AC_NO_BIAS = 'external'
-SENSOR_AC_BIAS = 'sound'
-SENSOR_DC_NO_BIAS = 'voltage'
-SENSOR_DC_BIAS = 'resistance'
+# You should have received a copy of the GNU General Public License
+# along with this library; if not, write to the Free Software
+# Foundation, 51 Franklin Street, Suite 500 Boston, MA 02110-1335 USA
import pygst
import gst
import gst.interfaces
from numpy import fromstring
-import os
-import subprocess
+import commands
+import traceback
from string import find
from threading import Timer
+from TurtleArt.taconstants import XO1
+from TurtleArt.tautils import debug_output
+
# Initial device settings
RATE = 48000
MIC_BOOST = True
@@ -48,11 +41,14 @@ QUIT_DC_MODE_ENABLE = False
QUIT_CAPTURE_GAIN = 100
QUIT_BIAS = True
-from TurtleArt.taconstants import XO1
-from TurtleArt.tautils import debug_output
+# Capture modes
+SENSOR_AC_NO_BIAS = 'external'
+SENSOR_AC_BIAS = 'sound'
+SENSOR_DC_NO_BIAS = 'voltage'
+SENSOR_DC_BIAS = 'resistance'
-class AudioGrab:
+class AudioGrab():
""" The interface between measure and the audio device """
def __init__(self, callable1, parent):
@@ -64,15 +60,14 @@ class AudioGrab:
self.sensor = None
self.temp_buffer = [0]
- self.picture_buffer = [] # place to hold screen grabs
- self.draw_graph_status = False
- self.screenshot = True
+ self.rate = RATE
+ if self.parent.hw == XO1:
+ self.channels = 1
+ else:
+ self.channels = None
- self.rate = 48000
- self.final_count = 0
- self.count_temp = 0
- self.entry_count = 0
+ self.waveform_id = 1
self.counter_buffer = 0
@@ -80,33 +75,72 @@ class AudioGrab:
self._mic_bias_control = None
self._capture_control = None
self._mic_boost_control = None
- self._hardwired = False # Query controls or use hardwired names
+ self._labels_available = True # Query controls for device names
+
+ self._query_mixer()
+ # If Channels was not found in the Capture controller, guess.
+ if self.channels is None:
+ debug_output('Guessing there are 2 channels',
+ self.parent.running_sugar)
+ self.channels = 2
+
+ # Variables for saving and resuming state of sound device
+ self.master = self.get_master()
+ self.bias = BIAS
+ self.dc_mode = DC_MODE_ENABLE
+ self.capture_gain = CAPTURE_GAIN
+ self.mic_boost = MIC_BOOST
+ self.mic = self.get_mic_gain()
- # Set up gst pipeline
- self.pipeline = gst.Pipeline("pipeline")
- self.alsasrc = gst.element_factory_make("alsasrc", "alsa-source")
+ # Set up gstreamer pipeline
+ self._pad_count = 0
+ self.pads = []
+ self.queue = []
+ self.fakesink = []
+ self.pipeline = gst.Pipeline('pipeline')
+ self.alsasrc = gst.element_factory_make('alsasrc', 'alsa-source')
self.pipeline.add(self.alsasrc)
- self.caps1 = gst.element_factory_make("capsfilter", "caps1")
+ self.caps1 = gst.element_factory_make('capsfilter', 'caps1')
self.pipeline.add(self.caps1)
- caps_str = "audio/x-raw-int,rate=%d,channels=1,depth=16" % (RATE)
- self.caps1.set_property("caps", gst.caps_from_string(caps_str))
- self.fakesink = gst.element_factory_make("fakesink", "fsink")
- self.pipeline.add(self.fakesink)
- self.fakesink.connect("handoff", self.on_buffer)
- self.fakesink.set_property("signal-handoffs", True)
- gst.element_link_many(self.alsasrc, self.caps1, self.fakesink)
+ caps_str = 'audio/x-raw-int,rate=%d,channels=%d,depth=16' % (
+ RATE, self.channels)
+ self.caps1.set_property('caps', gst.caps_from_string(caps_str))
+ if self.channels == 1:
+ self.fakesink.append(gst.element_factory_make('fakesink', 'fsink'))
+ self.pipeline.add(self.fakesink[0])
+ self.fakesink[0].connect('handoff', self.on_buffer, 0)
+ self.fakesink[0].set_property('signal-handoffs', True)
+ gst.element_link_many(self.alsasrc, self.caps1, self.fakesink[0])
+ else:
+ if not hasattr(self, 'splitter'):
+ self.splitter = gst.element_factory_make('deinterleave')
+ self.pipeline.add(self.splitter)
+ self.splitter.set_properties('keep-positions=true', 'name=d')
+ self.splitter.connect('pad-added', self._splitter_pad_added)
+ gst.element_link_many(self.alsasrc, self.caps1, self.splitter)
+ for i in range(self.channels):
+ self.queue.append(gst.element_factory_make('queue'))
+ self.pipeline.add(self.queue[i])
+ self.fakesink.append(gst.element_factory_make('fakesink'))
+ self.pipeline.add(self.fakesink[i])
+ self.fakesink[i].connect('handoff', self.on_buffer, i)
+ self.fakesink[i].set_property('signal-handoffs', True)
self.dont_queue_the_buffer = False
+ # Timer for interval sampling and switch to indicate when to capture
+ self.capture_timer = None
+ self.capture_interval_sample = False
+
+ def _query_mixer(self):
self._mixer = gst.element_factory_make('alsamixer')
rc = self._mixer.set_state(gst.STATE_PAUSED)
assert rc == gst.STATE_CHANGE_SUCCESS
# Query the available controls
- try: # F11+
- debug_output('controls: %r' % \
- ([t.props.untranslated_label for t in self._mixer.list_tracks()]),
- self.parent.running_sugar)
+ tracks_list = self._mixer.list_tracks()
+ if hasattr(tracks_list[0].props, 'untranslated_label'):
+ self._capture_control = self._find_control(['capture', 'axi'])
self._dc_control = self._find_control(['dc mode'])
self._mic_bias_control = self._find_control(['mic bias',
'dc input bias',
@@ -116,106 +150,139 @@ class AudioGrab:
'internal mic boost',
'analog mic boost'])
self._mic_gain_control = self._find_control(['mic'])
- self._capture_control = self._find_control(['capture'])
self._master_control = self._find_control(['master'])
- except AttributeError: # F9- (no untranslated_label attribute)
- self._hardwired = True
+ else: # Use hardwired values
+ self._labels_available = False
- # Variables for saving and resuming state of sound device
- self.master = self.get_master()
- self.bias = BIAS
- self.dcmode = DC_MODE_ENABLE
- self.capture_gain = CAPTURE_GAIN
- self.mic_boost = MIC_BOOST
- self.mic = self.get_mic_gain()
+ def _unlink_sink_queues(self):
+ ''' Build the sink pipelines '''
- # Timer for interval sampling and switch to indicate when to capture
- self.capture_timer = None
- self.capture_interval_sample = False
+ # If there were existing pipelines, unlink them
+ for i in range(self._pad_count):
+ try:
+ self.splitter.unlink(self.queue[i])
+ self.queue[i].unlink(self.fakesink[i])
+ except:
+ traceback.print_exc()
+
+ # Build the new pipelines
+ self._pad_count = 0
+ self.pads = []
+
+ def _splitter_pad_added(self, element, pad):
+ ''' Seems to be the case that ring is right channel 0,
+ tip is left channel 1'''
+ debug_output('splitter pad %d added' % (self._pad_count),
+ self.parent.running_sugar)
+ self.pads.append(pad)
+ if self._pad_count < self.channels:
+ pad.link(self.queue[self._pad_count].get_pad('sink'))
+ self.queue[self._pad_count].get_pad('src').link(
+ self.fakesink[self._pad_count].get_pad('sink'))
+ self._pad_count += 1
+ else:
+ debug_output('ignoring channels > %d' % (self.channels),
+ self.parent.running_sugar)
def set_handoff_signal(self, handoff_state):
- """Sets whether the handoff signal would generate an interrupt or not"""
- self.fakesink.set_property("signal-handoffs", handoff_state)
+ '''Sets whether the handoff signal would generate an interrupt or not'''
+ for i in range(len(self.fakesink)):
+ self.fakesink[i].set_property('signal-handoffs', handoff_state)
- def _new_buffer(self, buf):
- """ Use a new buffer """
+ def _new_buffer(self, buf, channel):
+ ''' Use a new buffer '''
if not self.dont_queue_the_buffer:
self.temp_buffer = buf
- self.callable1(buf)
+ self.callable1(buf, channel=channel)
else:
pass
- def on_buffer(self, element, buffer, pad):
- """The function that is called whenever new data is available
- This is the signal handler for the handoff signal"""
- if buffer is None:
- debug_output('audiograb buffer is None', self.parent.running_sugar)
- return False
-
+ def on_buffer(self, element, buffer, pad, channel):
+ '''The function that is called whenever new data is available
+ This is the signal handler for the handoff signal'''
temp_buffer = fromstring(buffer, 'int16')
if not self.dont_queue_the_buffer:
- self._new_buffer(temp_buffer)
+ self._new_buffer(temp_buffer, channel=channel)
return False
def set_sensor(self, sensor):
- """Keep a reference to the sensot toolbar for logging"""
+ '''Keep a reference to the sensot toolbar for logging'''
self.sensor = sensor
def start_sound_device(self):
- """Start or Restart grabbing data from the audio capture"""
+ '''Start or Restart grabbing data from the audio capture'''
gst.event_new_flush_start()
self.pipeline.set_state(gst.STATE_PLAYING)
def stop_sound_device(self):
- """Stop grabbing data from capture device"""
+ '''Stop grabbing data from capture device'''
gst.event_new_flush_stop()
self.pipeline.set_state(gst.STATE_NULL)
+ def sample_now(self):
+ ''' Log the current sample now. This method is called from the
+ capture_timer object when the interval expires. '''
+ self.capture_interval_sample = True
+ self.make_timer()
+
+ def set_buffer_interval_logging(self, interval=0):
+ '''Sets the number of buffers after which a buffer needs to be
+ emitted'''
+ self.buffer_interval_logging = interval
+
+ def reset_counter_buffer(self):
+ '''Resets the counter buffer used to keep track of after how many
+ buffers to emit a buffer for logging'''
+ self.counter_buffer = 0
+
def set_sampling_rate(self, sr):
- """Sets the sampling rate of the capture device
+ '''Sets the sampling rate of the capture device
Sampling rate must be given as an integer for example 16000 for
setting 16Khz sampling rate
- The sampling rate would be set in the device to the nearest available"""
+ The sampling rate would be set in the device to the nearest available'''
self.pause_grabbing()
- caps_str = "audio/x-raw-int,rate=%d,channels=1,depth=16" % (sr, )
- self.caps1.set_property("caps", gst.caps_from_string(caps_str))
+ caps_str = 'audio/x-raw-int,rate=%d,channels=%d,depth=16' % (
+ sr, self.channels)
+ self.caps1.set_property('caps', gst.caps_from_string(caps_str))
self.resume_grabbing()
def get_sampling_rate(self):
- """Gets the sampling rate of the capture device"""
- return int(self.caps1.get_property("caps")[0]['rate'])
+ '''Gets the sampling rate of the capture device'''
+ return int(self.caps1.get_property('caps')[0]['rate'])
def set_callable1(self, callable1):
- """Sets the callable to the drawing function for giving the
- data at the end of idle-add"""
+ '''Sets the callable to the drawing function for giving the
+ data at the end of idle-add'''
self.callable1 = callable1
def start_grabbing(self):
- """Called right at the start of the Activity"""
+ '''Called right at the start of the Activity'''
self.start_sound_device()
+ self.set_handoff_signal(True)
def pause_grabbing(self):
- """When Activity goes into background"""
+ '''When Activity goes into background'''
self.save_state()
self.stop_sound_device()
def resume_grabbing(self):
- """When Activity becomes active after going to background"""
+ '''When Activity becomes active after going to background'''
self.start_sound_device()
self.resume_state()
+ self.set_handoff_signal(True)
def stop_grabbing(self):
- """Not used ???"""
+ '''Not used ???'''
self.stop_sound_device()
self.set_handoff_signal(False)
def _find_control(self, prefixes):
- """Try to find a mixer control matching one of the prefixes.
+ '''Try to find a mixer control matching one of the prefixes.
The control with the best match (smallest difference in length
between label and prefix) will be returned. If no match is found,
None is returned.
- """
+ '''
def best_prefix(label, prefixes):
matches =\
[len(label) - len(p) for p in prefixes if label.startswith(p)]
@@ -234,305 +301,329 @@ class AudioGrab:
controls.sort(key=lambda e: e[1])
if controls:
- debug_output('Found control: %s' % \
+ debug_output('Found control: %s' %\
(str(controls[0][0].props.untranslated_label)),
self.parent.running_sugar)
+
+ if self.channels is None:
+ if hasattr(controls[0][0], 'num_channels'):
+ channels = controls[0][0].num_channels
+ if channels > 0:
+ self.channels = channels
+ debug_output('setting channels to %d' % (self.channels),
+ self.parent.running_sugar)
+
return controls[0][0]
return None
def save_state(self):
- """Saves the state of all audio controls"""
+ '''Saves the state of all audio controls'''
self.master = self.get_master()
self.bias = self.get_bias()
- self.dcmode = self.get_dc_mode()
+ self.dc_mode = self.get_dc_mode()
self.capture_gain = self.get_capture_gain()
self.mic_boost = self.get_mic_boost()
def resume_state(self):
- """Put back all audio control settings from the saved state"""
+ '''Put back all audio control settings from the saved state'''
self.set_master(self.master)
self.set_bias(self.bias)
- self.set_dc_mode(self.dcmode)
+ self.set_dc_mode(self.dc_mode)
self.set_capture_gain(self.capture_gain)
self.set_mic_boost(self.mic_boost)
def _get_mute(self, control, name, default):
- """Get mute status of a control"""
+ '''Get mute status of a control'''
if not control:
return default
-
- value = bool(control.flags & gst.interfaces.MIXER_TRACK_MUTE)
- debug_output('Getting %s (%s) mute status: %r' % (name,
- control.props.untranslated_label, value),
- self.parent.running_sugar)
- return value
+ return bool(control.flags & gst.interfaces.MIXER_TRACK_MUTE)
def _set_mute(self, control, name, value):
- """Mute a control"""
+ '''Mute a control'''
if not control:
return
-
self._mixer.set_mute(control, value)
- debug_output('Set mute for %s (%s) to %r' % (name,
- control.props.untranslated_label, value),
- self.parent.running_sugar)
def _get_volume(self, control, name):
- """Get volume of a control and convert to a scale of 0-100"""
+ '''Get volume of a control and convert to a scale of 0-100'''
if not control:
return 100
-
- try: # sometimes get_volume does not return a tuple
- hw_volume = self._mixer.get_volume(control)[0]
- except IndexError:
- return 100
-
+ volume = self._mixer.get_volume(control)
+ if type(volume) == tuple:
+ hw_volume = volume[0]
+ else:
+ hw_volume = volume
min_vol = control.min_volume
max_vol = control.max_volume
- percent = (hw_volume - min_vol)*100//(max_vol - min_vol)
+ if max_vol == min_vol:
+ percent = 100
+ else:
+ percent = (hw_volume - min_vol) * 100 // (max_vol - min_vol)
return percent
def _set_volume(self, control, name, value):
- """Sets the level of a control on a scale of 0-100"""
+ '''Sets the level of a control on a scale of 0-100'''
if not control:
return
-
# convert value to scale of control
min_vol = control.min_volume
max_vol = control.max_volume
if min_vol != max_vol:
- hw_volume = value*(max_vol - min_vol)//100 + min_vol
- self._mixer.set_volume(control, (hw_volume,)*control.num_channels)
+ hw_volume = value * (max_vol - min_vol) // 100 + min_vol
+ self._mixer.set_volume(control,
+ (hw_volume,) * control.num_channels)
def amixer_set(self, control, state):
- """ Direct call to amixer for old systems. """
+ ''' Direct call to amixer for old systems. '''
if state:
- os.system("amixer set '%s' unmute" % (control))
+ (status, output) = commands.getstatusoutput(
+ 'amixer set "%s" unmute' % (control))
+ if status != 0:
+ debug_output('Problem with amixer set "%s" unmute' % (control),
+ self.parent.running_sugar)
else:
- os.system("amixer set '%s' mute" % (control))
+ (status, output) = commands.getstatusoutput(
+ 'amixer set "%s" mute' % (control))
+ if status != 0:
+ debug_output('Problem with amixer set "%s" mute' % (control),
+ self.parent.running_sugar)
def mute_master(self):
- """Mutes the Master Control"""
- if not self._hardwired and self.parent.hw != XO1:
+ '''Mutes the Master Control'''
+ if self._labels_available and self.parent.hw != XO1:
self._set_mute(self._master_control, 'Master', True)
else:
self.amixer_set('Master', False)
def unmute_master(self):
- """Unmutes the Master Control"""
- if not self._hardwired and self.parent.hw != XO1:
+ '''Unmutes the Master Control'''
+ if self._labels_available and self.parent.hw != XO1:
self._set_mute(self._master_control, 'Master', True)
else:
self.amixer_set('Master', True)
def set_master(self, master_val):
- """Sets the Master gain slider settings
+ '''Sets the Master gain slider settings
master_val must be given as an integer between 0 and 100 indicating the
- percentage of the slider to be set"""
- if not self._hardwired:
+ percentage of the slider to be set'''
+ if self._labels_available:
self._set_volume(self._master_control, 'Master', master_val)
else:
- os.system("amixer set Master " + str(master_val) + "%")
+ (status, output) = commands.getstatusoutput(
+ 'amixer set Master %d%s' % (master_val, '%'))
+ if status != 0:
+ debug_output('Problem with amixer set Master',
+ self.parent.running_sugar)
def get_master(self):
- """Gets the Master gain slider settings. The value returned is an
- integer between 0-100 and is an indicative of the percentage 0 - 100%"""
- if not self._hardwired:
+ '''Gets the MIC gain slider settings. The value returned is an
+ integer between 0-100 and is an indicative of the percentage 0 - 100%'''
+ if self._labels_available:
return self._get_volume(self._master_control, 'master')
else:
- p = str(subprocess.Popen(["amixer", "get", "Master"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Front Left:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "%]")]
- return int(p)
+ (status, output) = commands.getstatusoutput('amixer get Master')
+ if status == 0:
+ output = output[find(output, 'Front Left:'):]
+ output = output[find(output, '[') + 1:]
+ output = output[:find(output, '%]')]
+ return int(output)
+ else:
+ return 100
def set_bias(self, bias_state=False):
- """Enables / disables bias voltage."""
- if not self._hardwired and self.parent.hw != XO1:
+ '''Enables / disables bias voltage.'''
+ if self._labels_available and self.parent.hw != XO1:
if self._mic_bias_control is None:
return
- # if not isinstance(self._mic_bias_control,
- # gst.interfaces.MixerOptions):
- if self._mic_bias_control not in self._mixer.list_tracks():
- return self._set_mute(self._mic_bias_control, 'Mic Bias',
- not bias_state)
-
+ # If there is a flag property, use set_mute
+ if self._mic_bias_control not in self._mixer.list_tracks() or \
+ hasattr(self._mic_bias_control.props, 'flags'):
+ self._set_mute(
+ self._mic_bias_control, 'Mic Bias', not bias_state)
# We assume that values are sorted from lowest (=off) to highest.
- # Since they are mixed strings ("Off", "50%", etc.), we cannot
+ # Since they are mixed strings ('Off', '50%', etc.), we cannot
# easily ensure this by sorting with the default sort order.
- try:
- if bias_state:
- # self._mixer.set_option(self._mic_bias_control, values[-1])
- self._mixer.set_volume(self._mic_bias_control,
- self._mic_bias_control.max_volume)
- else:
- self._mixer.set_volume(self._mic_bias_control,
- self._mic_bias_control.min_volume)
- # self._mixer.set_option(self._mic_bias_control, values[0])
- except TypeError:
- self._set_mute(self._mic_bias_control, 'Mic Bias',
- not bias_state)
- elif self._hardwired:
+ elif bias_state: # Otherwise, set with volume
+ self._mixer.set_volume(self._mic_bias_control,
+ self._mic_bias_control.max_volume)
+ else:
+ self._mixer.set_volume(self._mic_bias_control,
+ self._mic_bias_control.min_volume)
+ elif not self._labels_available:
self.amixer_set('V_REFOUT Enable', bias_state)
else:
self.amixer_set('MIC Bias Enable', bias_state)
def get_bias(self):
- """Check whether bias voltage is enabled."""
- if not self._hardwired:
+ '''Check whether bias voltage is enabled.'''
+ if self._labels_available:
if self._mic_bias_control is None:
return False
- if self._mic_bias_control not in self._mixer.list_tracks():
- return not self._get_mute(self._mic_bias_control, 'Mic Bias',
- False)
- current = self._mixer.get_volume(self._mic_bias_control)
- # same ordering assertion as in set_bias() applies
- # if current == values[0]:
- if current == self._mic_bias_control.min_volume:
+ if self._mic_bias_control not in self._mixer.list_tracks() or \
+ hasattr(self._mic_bias_control.props, 'flags'):
+ return not self._get_mute(
+ self._mic_bias_control, 'Mic Bias', False)
+ value = self._mixer.get_volume(self._mic_bias_control)
+ if value == self._mic_bias_control.min_volume:
return False
return True
else:
- p = str(subprocess.Popen(["amixer", "get", "'V_REFOUT Enable'"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Mono:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "]")]
- if p == "on":
- return True
- return False
+ (status, output) = commands.getstatusoutput(
+ 'amixer get "V_REFOUT Enable"')
+ if status == 0:
+ output = output[find(output, 'Mono:'):]
+ output = output[find(output, '[') + 1:]
+ output = output[:find(output, ']')]
+ if output == 'on':
+ return True
+ return False
+ else:
+ return False
def set_dc_mode(self, dc_mode=False):
- """Sets the DC Mode Enable control
- pass False to mute and True to unmute"""
- if not self._hardwired and self.parent.hw != XO1:
+ '''Sets the DC Mode Enable control
+ pass False to mute and True to unmute'''
+ if self._labels_available and self.parent.hw != XO1:
if self._dc_control is not None:
self._set_mute(self._dc_control, 'DC mode', not dc_mode)
else:
self.amixer_set('DC Mode Enable', dc_mode)
def get_dc_mode(self):
- """Returns the setting of DC Mode Enable control
- i .e. True: Unmuted and False: Muted"""
- if not self._hardwired:
+ '''Returns the setting of DC Mode Enable control
+ i.e. True: Unmuted and False: Muted'''
+ if self._labels_available:
if self._dc_control is not None:
return not self._get_mute(self._dc_control, 'DC mode', False)
else:
return False
else:
- p = str(subprocess.Popen(["amixer", "get", "'DC Mode Enable'"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Mono:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "]")]
- if p == "on":
- return True
+ (status, output) = commands.getstatusoutput(
+ 'amixer get "DC Mode Enable"')
+ if status == 0:
+ output = output[find(output, 'Mono:'):]
+ output = output[find(output, '[') + 1:]
+ output = output[:find(output, ']')]
+ if output == 'on':
+ return True
+ return False
else:
return False
def set_mic_boost(self, mic_boost=False):
- """Set Mic Boost.
- True = +20dB, False = 0dB"""
- if not self._hardwired:
+ '''Set Mic Boost.
+ True = +20dB, False = 0dB'''
+ if self._labels_available:
if self._mic_boost_control is None:
return
- if self._mic_boost_control not in self._mixer.list_tracks():
- return self._set_mute(self._mic_boost_control, 'Mic Boost',
- mic_boost)
- value = self._mixer.get_volume(self._mic_boost_control)
- try:
- if mic_boost:
- self._mixer.set_volume(self._mic_boost_control,
- self._mic_boost_control.max_volume)
- else:
- self._mixer.set_volume(self._mic_boost_control,
- self._mic_boost_control.min_volume)
- except TypeError:
- return self._set_mute(self._mic_boost_control, 'Mic Boost',
- not mic_boost)
+ # If there is a flag property, use set_mute
+ if self._mic_boost_control not in self._mixer.list_tracks() or \
+ hasattr(self._mic_boost_control.props, 'flags'):
+ self._set_mute(
+ self._mic_boost_control, 'Mic Boost', not mic_boost)
+ # Otherwise, set volume to max or min value
+ elif mic_boost:
+ self._mixer.set_volume(self._mic_boost_control,
+ self._mic_boost_control.max_volume)
+ else:
+ self._mixer.set_volume(self._mic_boost_control,
+ self._mic_boost_control.min_volume)
else:
self.amixer_set('Mic Boost (+20dB)', mic_boost)
def get_mic_boost(self):
- """Return Mic Boost setting.
- True = +20dB, False = 0dB"""
- if not self._hardwired:
+ '''Return Mic Boost setting.
+ True = +20dB, False = 0dB'''
+ if self._labels_available:
if self._mic_boost_control is None:
return False
- if self._mic_boost_control not in self._mixer.list_tracks():
- return self._get_mute(self._mic_boost_control, 'Mic Boost',
- False)
- current = self._mixer.get_volume(self._mic_boost_control)
- debug_output('current: %s' % (str(current)),
- self.parent.running_sugar)
- if current != self._mic_boost_control.min_volume:
- return True
- return False
+ if self._mic_boost_control not in self._mixer.list_tracks() or \
+ hasattr(self._mic_boost_control.props, 'flags'):
+ return not self._get_mute(
+ self._mic_boost_control, 'Mic Boost', False)
+ else: # Compare to min value
+ value = self._mixer.get_volume(self._mic_boost_control)
+ if value != self._mic_boost_control.min_volume:
+ return True
+ return False
else:
- p = str(subprocess.Popen(["amixer", "get", "'Mic Boost (+20dB)'"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Mono:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "]")]
- if p == "on":
- return True
+ (status, output) = commands.getstatusoutput(
+ 'amixer get "Mic Boost (+20dB)"')
+ if status == 0:
+ output = output[find(output, 'Mono:'):]
+ output = output[find(output, '[') + 1:]
+ output = output[:find(output, ']')]
+ if output == 'on':
+ return True
+ return False
else:
return False
def set_capture_gain(self, capture_val):
- """Sets the Capture gain slider settings
+ '''Sets the Capture gain slider settings
capture_val must be given as an integer between 0 and 100 indicating the
- percentage of the slider to be set"""
- if not self._hardwired and self.parent.hw != XO1:
+ percentage of the slider to be set'''
+ if self._labels_available and self.parent.hw != XO1:
if self._capture_control is not None:
self._set_volume(self._capture_control, 'Capture', capture_val)
else:
- os.system("amixer set Capture " + str(capture_val) + "%")
+ (status, output) = commands.getstatusoutput(
+ 'amixer set Capture %d%s' % (capture_val, '%'))
+ if status != 0:
+ debug_output('Problem with amixer set Capture',
+ self.parent.running_sugar)
def get_capture_gain(self):
- """Gets the Capture gain slider settings. The value returned is an
- integer between 0-100 and is an indicative of the percentage 0 - 100%"""
- if not self._hardwired:
+ '''Gets the Capture gain slider settings. The value returned is an
+ integer between 0-100 and is an indicative of the percentage 0 - 100%'''
+ if self._labels_available:
if self._capture_control is not None:
return self._get_volume(self._capture_control, 'Capture')
else:
return 0
else:
- p = str(subprocess.Popen(["amixer", "get", "Capture"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Front Left:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "%]")]
- return int(p)
+ (status, output) = commands.getstatusoutput('amixer get Capture')
+ if status == 0:
+ output = output[find(output, 'Front Left:'):]
+ output = output[find(output, '[') + 1:]
+ output = output[:find(output, '%]')]
+ return int(output)
+ else:
+ debug_output('amixer: Could not get Capture level',
+ self.parent.running_sugar)
+ return 100
def set_mic_gain(self, mic_val):
- """Sets the MIC gain slider settings
+ '''Sets the MIC gain slider settings
mic_val must be given as an integer between 0 and 100 indicating the
- percentage of the slider to be set"""
- if not self._hardwired and self.parent.hw != XO1:
+ percentage of the slider to be set'''
+ if self._labels_available and self.parent.hw != XO1:
self._set_volume(self._mic_gain_control, 'Mic', mic_val)
else:
- os.system("amixer set Mic " + str(mic_val) + "%")
+ (status, output) = commands.getstatusoutput(
+ 'amixer set Mic %d%s' % (mic_val, '%'))
+ if status != 0:
+ debug_output('Problem with amixer set Mic',
+ self.parent.running_sugar)
def get_mic_gain(self):
- """Gets the MIC gain slider settings. The value returned is an
- integer between 0-100 and is an indicative of the percentage 0 - 100%"""
- if not self._hardwired:
+ '''Gets the MIC gain slider settings. The value returned is an
+ integer between 0-100 and is an indicative of the percentage 0 - 100%'''
+ if self._labels_available:
return self._get_volume(self._mic_gain_control, 'Mic')
else:
- p = str(subprocess.Popen(["amixer", "get", "Mic"],
- stdout=subprocess.PIPE).communicate()[0])
- try:
- p = p[find(p, "Mono:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "%]")]
- return int(p)
- except:
- return(0)
+ (status, output) = commands.getstatusoutput('amixer get Mic')
+ if status == 0:
+ output = output[find(output, 'Mono:'):]
+ output = output[find(output, '[') + 1:]
+ output = output[:find(output, '%]')]
+ return int(output)
+ else:
+ return 100
def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS):
- """Set the type of sensor you want to use. Set sensor_type according
+ '''Set the type of sensor you want to use. Set sensor_type according
to the following
SENSOR_AC_NO_BIAS - AC coupling with Bias Off --> Very rarely used.
Use when connecting a dynamic microphone externally
@@ -541,8 +632,8 @@ class AudioGrab:
SENSOR_DC_NO_BIAS - DC coupling with Bias Off --> measuring voltage
output sensor. For example LM35 which gives output proportional
to temperature
- SENSOR_DC_BIAS - DC coupling with Bias On --> measuing resistance.
- """
+ SENSOR_DC_BIAS - DC coupling with Bias On --> measuring resistance.
+ '''
PARAMETERS = {
SENSOR_AC_NO_BIAS: (False, False, 50, True),
SENSOR_AC_BIAS: (False, True, 40, True),
@@ -550,35 +641,27 @@ class AudioGrab:
SENSOR_DC_BIAS: (True, True, 0, False)
}
mode, bias, gain, boost = PARAMETERS[sensor_type]
- debug_output('Set Sensor Type to %s' % (str(sensor_type)),
+ debug_output('Set sensor type to %s' % (str(sensor_type)),
self.parent.running_sugar)
self._set_sensor_type(mode, bias, gain, boost)
def _set_sensor_type(self, mode=None, bias=None, gain=None, boost=None):
- """Helper to modify (some) of the sensor settings."""
- if mode is not None:
+ '''Helper to modify (some) of the sensor settings.'''
+ if mode is not None and mode != self.get_dc_mode():
+ # If we change to/from dc mode, we need to rebuild the pipelines
+ self.stop_grabbing()
+ self._unlink_sink_queues()
self.set_dc_mode(mode)
- if self._dc_control is not None:
- os.system("amixer get '%s'" %\
- (self._dc_control.props.untranslated_label))
+ self.start_grabbing()
if bias is not None:
self.set_bias(bias)
- if self._mic_bias_control is not None:
- os.system("amixer get '%s'" %\
- (self._mic_bias_control.props.untranslated_label))
if gain is not None:
self.set_capture_gain(gain)
- if self._capture_control is not None:
- os.system("amixer get '%s'" %\
- (self._capture_control.props.untranslated_label))
if boost is not None:
self.set_mic_boost(boost)
- if self._mic_boost_control is not None:
- os.system("amixer get '%s'" %\
- (self._mic_boost_control.props.untranslated_label))
def on_activity_quit(self):
- """When Activity quits"""
+ '''When Activity quits'''
self.set_mic_boost(QUIT_MIC_BOOST)
self.set_dc_mode(QUIT_DC_MODE_ENABLE)
self.set_capture_gain(QUIT_CAPTURE_GAIN)
@@ -587,37 +670,46 @@ class AudioGrab:
class AudioGrab_XO1(AudioGrab):
- """ Use default parameters for OLPC XO 1.0 laptop """
+ ''' Use default parameters for OLPC XO 1.0 laptop '''
pass
-
class AudioGrab_XO15(AudioGrab):
- """ Override parameters for OLPC XO 1.5 laptop """
+ ''' Override parameters for OLPC XO 1.5 laptop '''
def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS):
- """Helper to modify (some) of the sensor settings."""
+ '''Helper to modify (some) of the sensor settings.'''
+ PARAMETERS = {
+ SENSOR_AC_NO_BIAS: (False, False, 80, True),
+ SENSOR_AC_BIAS: (False, True, 80, True),
+ SENSOR_DC_NO_BIAS: (True, False, 80, False),
+ SENSOR_DC_BIAS: (True, True, 90, False)
+ }
+ mode, bias, gain, boost = PARAMETERS[sensor_type]
+ self._set_sensor_type(mode, bias, gain, boost)
+
+
+class AudioGrab_XO175(AudioGrab):
+ ''' Override parameters for OLPC XO 1.75 laptop '''
+ def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS):
+ '''Helper to modify (some) of the sensor settings.'''
PARAMETERS = {
SENSOR_AC_NO_BIAS: (False, False, 80, True),
SENSOR_AC_BIAS: (False, True, 80, True),
SENSOR_DC_NO_BIAS: (True, False, 80, False),
SENSOR_DC_BIAS: (True, True, 90, False)
}
- debug_output('Set Sensor Type to %s' % (str(sensor_type)),
- self.parent.running_sugar)
mode, bias, gain, boost = PARAMETERS[sensor_type]
self._set_sensor_type(mode, bias, gain, boost)
class AudioGrab_Unknown(AudioGrab):
- """ Override parameters for generic hardware """
+ ''' Override parameters for generic hardware '''
def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS):
- """Helper to modify (some) of the sensor settings."""
+ '''Helper to modify (some) of the sensor settings.'''
PARAMETERS = {
SENSOR_AC_NO_BIAS: (None, False, 50, True),
SENSOR_AC_BIAS: (None, True, 40, True),
SENSOR_DC_NO_BIAS: (True, False, 80, False),
SENSOR_DC_BIAS: (True, True, 90, False)
}
- debug_output('Set Sensor Type to %s' % (str(sensor_type)),
- self.parent.running_sugar)
mode, bias, gain, boost = PARAMETERS[sensor_type]
self._set_sensor_type(mode, bias, gain, boost)