diff options
author | Walter Bender <walter@sugarlabs.org> | 2013-05-25 22:12:48 (GMT) |
---|---|---|
committer | Walter Bender <walter@sugarlabs.org> | 2013-05-25 22:12:48 (GMT) |
commit | 29851fcbe3b4d392efd5a873f92666da27ec0e99 (patch) | |
tree | 27ce347b8465c907836a116d8752ddb7fab48c69 /plugins/audio_sensors/audiograb.py | |
parent | ed303715f77f79e697e33b3d0df082843ffc1492 (diff) |
patches to create confusion
Diffstat (limited to 'plugins/audio_sensors/audiograb.py')
-rw-r--r-- | plugins/audio_sensors/audiograb.py | 673 |
1 files changed, 0 insertions, 673 deletions
diff --git a/plugins/audio_sensors/audiograb.py b/plugins/audio_sensors/audiograb.py deleted file mode 100644 index 228e4c2..0000000 --- a/plugins/audio_sensors/audiograb.py +++ /dev/null @@ -1,673 +0,0 @@ -#! /usr/bin/python -# -# Author: Arjun Sarwal arjun@laptop.org -# Copyright (C) 2007, Arjun Sarwal -# Copyright (C) 2009-12 Walter Bender -# Copyright (C) 2009, Benjamin Berg, Sebastian Berg -# Copyright (C) 2009, Sayamindu Dasgupta -# Copyright (C) 2010, Sascha Silbe -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# You should have received a copy of the GNU General Public License -# along with this library; if not, write to the Free Software -# Foundation, 51 Franklin Street, Suite 500 Boston, MA 02110-1335 USA - -import pygst -import gst -import gst.interfaces -from numpy import fromstring -import subprocess -import traceback -from string import find -from threading import Timer - -from TurtleArt.taconstants import XO1, XO4 -from TurtleArt.tautils import debug_output - -# Initial device settings -RATE = 48000 -MIC_BOOST = True -DC_MODE_ENABLE = False -CAPTURE_GAIN = 50 -BIAS = True - -# Setting on quit -QUIT_MIC_BOOST = False -QUIT_DC_MODE_ENABLE = False -QUIT_CAPTURE_GAIN = 100 -QUIT_BIAS = True - -# Capture modes -SENSOR_AC_NO_BIAS = 'external' -SENSOR_AC_BIAS = 'sound' -SENSOR_DC_NO_BIAS = 'voltage' -SENSOR_DC_BIAS = 'resistance' - - -class AudioGrab(): - """ The interface between measure and the audio device """ - - def __init__(self, callable1, parent, - mode=None, bias=None, gain=None, boost=None): - """ Initialize the class: callable1 is a data buffer; - parent is the parent class""" - - self.callable1 = callable1 - self.parent = parent - self.sensor = None - - self.temp_buffer = [0] - - self.rate = RATE - # Force XO1 and XO4 to use just 1 channel - if self.parent.hw in [XO1, XO4]: - self.channels = 1 - else: - self.channels = None - - self._dc_control = None - self._mic_bias_control = None - self._capture_control = None - self._mic_boost_control = None - self._labels_available = True # Query controls for device names - - self._query_mixer() - # If Channels was not found in the Capture controller, guess. - if self.channels is None: - debug_output('Guessing there are 2 channels', - self.parent.running_sugar) - self.channels = 2 - - # Set mixer to known state - self.set_dc_mode(DC_MODE_ENABLE) - self.set_bias(BIAS) - self.set_capture_gain(CAPTURE_GAIN) - self.set_mic_boost(MIC_BOOST) - - self.master = self.get_master() - self.dc_mode = self.get_dc_mode() - self.bias = self.get_bias() - self.capture_gain = self.get_capture_gain() - self.mic_boost = self.get_mic_boost() - - # Set mixer to desired state - self._set_sensor_type(mode, bias, gain, boost) - self.dc_mode = self.get_dc_mode() - self.bias = self.get_bias() - self.capture_gain = self.get_capture_gain() - self.mic_boost = self.get_mic_boost() - - # Set up gstreamer pipeline - self._pad_count = 0 - self.pads = [] - self.queue = [] - self.fakesink = [] - self.pipeline = gst.Pipeline('pipeline') - self.alsasrc = gst.element_factory_make('alsasrc', 'alsa-source') - self.pipeline.add(self.alsasrc) - self.caps1 = gst.element_factory_make('capsfilter', 'caps1') - self.pipeline.add(self.caps1) - caps_str = 'audio/x-raw-int,rate=%d,channels=%d,depth=16' % ( - RATE, self.channels) - self.caps1.set_property('caps', gst.caps_from_string(caps_str)) - if self.channels == 1: - self.fakesink.append(gst.element_factory_make('fakesink', 'fsink')) - self.pipeline.add(self.fakesink[0]) - self.fakesink[0].connect('handoff', self.on_buffer, 0) - self.fakesink[0].set_property('signal-handoffs', True) - gst.element_link_many(self.alsasrc, self.caps1, self.fakesink[0]) - else: - if not hasattr(self, 'splitter'): - self.splitter = gst.element_factory_make('deinterleave') - self.pipeline.add(self.splitter) - self.splitter.set_properties('keep-positions=true', 'name=d') - self.splitter.connect('pad-added', self._splitter_pad_added) - gst.element_link_many(self.alsasrc, self.caps1, self.splitter) - for i in range(self.channels): - self.queue.append(gst.element_factory_make('queue')) - self.pipeline.add(self.queue[i]) - self.fakesink.append(gst.element_factory_make('fakesink')) - self.pipeline.add(self.fakesink[i]) - self.fakesink[i].connect('handoff', self.on_buffer, i) - self.fakesink[i].set_property('signal-handoffs', True) - - self.dont_queue_the_buffer = False - - # Timer for interval sampling and switch to indicate when to capture - self.capture_timer = None - self.capture_interval_sample = False - - def _query_mixer(self): - self._mixer = gst.element_factory_make('alsamixer') - rc = self._mixer.set_state(gst.STATE_PAUSED) - assert rc == gst.STATE_CHANGE_SUCCESS - - # Query the available controls - tracks_list = self._mixer.list_tracks() - if hasattr(tracks_list[0].props, 'untranslated_label'): - self._capture_control = self._find_control(['capture', 'axi']) - self._dc_control = self._find_control(['dc mode']) - self._mic_bias_control = self._find_control(['mic bias', - 'dc input bias', - 'v_refout']) - self._mic_boost_control = self._find_control(['mic boost', - 'mic1 boost', - 'mic boost (+20db)', - 'internal mic boost', - 'analog mic boost']) - self._mic_gain_control = self._find_control(['mic']) - self._master_control = self._find_control(['master']) - else: # Use hardwired values - self._labels_available = False - - def _unlink_sink_queues(self): - ''' Build the sink pipelines ''' - - # If there were existing pipelines, unlink them - for i in range(self._pad_count): - try: - self.splitter.unlink(self.queue[i]) - self.queue[i].unlink(self.fakesink[i]) - except: - traceback.print_exc() - - # Build the new pipelines - self._pad_count = 0 - self.pads = [] - - def _splitter_pad_added(self, element, pad): - ''' Seems to be the case that ring is right channel 0, - tip is left channel 1''' - ''' - debug_output('splitter pad %d added' % (self._pad_count), - self.parent.running_sugar) - ''' - self.pads.append(pad) - if self._pad_count < self.channels: - pad.link(self.queue[self._pad_count].get_pad('sink')) - self.queue[self._pad_count].get_pad('src').link( - self.fakesink[self._pad_count].get_pad('sink')) - self._pad_count += 1 - else: - debug_output('ignoring channels > %d' % (self.channels), - self.parent.running_sugar) - - def set_handoff_signal(self, handoff_state): - '''Sets whether the handoff signal would generate an interrupt - or not''' - for i in range(len(self.fakesink)): - self.fakesink[i].set_property('signal-handoffs', handoff_state) - - def _new_buffer(self, buf, channel): - ''' Use a new buffer ''' - if not self.dont_queue_the_buffer: - self.temp_buffer = buf - self.callable1(buf, channel=channel) - else: - pass - - def on_buffer(self, element, data_buffer, pad, channel): - '''The function that is called whenever new data is available - This is the signal handler for the handoff signal''' - temp_buffer = fromstring(data_buffer, 'int16') - if not self.dont_queue_the_buffer: - self._new_buffer(temp_buffer, channel=channel) - return False - - def start_sound_device(self): - '''Start or Restart grabbing data from the audio capture''' - gst.event_new_flush_start() - self.pipeline.set_state(gst.STATE_PLAYING) - - def stop_sound_device(self): - '''Stop grabbing data from capture device''' - gst.event_new_flush_stop() - self.pipeline.set_state(gst.STATE_NULL) - - def sample_now(self): - ''' Log the current sample now. This method is called from the - capture_timer object when the interval expires. ''' - self.capture_interval_sample = True - self.make_timer() - - def set_buffer_interval_logging(self, interval=0): - '''Sets the number of buffers after which a buffer needs to be - emitted''' - self.buffer_interval_logging = interval - - def set_sampling_rate(self, sr): - '''Sets the sampling rate of the capture device - Sampling rate must be given as an integer for example 16000 for - setting 16Khz sampling rate - The sampling rate would be set in the device to the nearest available''' - self.pause_grabbing() - caps_str = 'audio/x-raw-int,rate=%d,channels=%d,depth=16' % ( - sr, self.channels) - self.caps1.set_property('caps', gst.caps_from_string(caps_str)) - self.resume_grabbing() - - def get_sampling_rate(self): - '''Gets the sampling rate of the capture device''' - return int(self.caps1.get_property('caps')[0]['rate']) - - def set_callable1(self, callable1): - '''Sets the callable to the drawing function for giving the - data at the end of idle-add''' - self.callable1 = callable1 - - def start_grabbing(self): - '''Called right at the start of the Activity''' - self.start_sound_device() - self.set_handoff_signal(True) - - def pause_grabbing(self): - '''When Activity goes into background''' - self.save_state() - self.stop_sound_device() - - def resume_grabbing(self): - '''When Activity becomes active after going to background''' - self.start_sound_device() - self.resume_state() - self.set_handoff_signal(True) - - def stop_grabbing(self): - '''Not used ???''' - self.stop_sound_device() - self.set_handoff_signal(False) - - def _find_control(self, prefixes): - '''Try to find a mixer control matching one of the prefixes. - - The control with the best match (smallest difference in length - between label and prefix) will be returned. If no match is found, - None is returned. - ''' - def best_prefix(label, prefixes): - matches =\ - [len(label) - len(p) for p in prefixes if label.startswith(p)] - if not matches: - return None - - matches.sort() - return matches[0] - - controls = [] - for track in self._mixer.list_tracks(): - label = track.props.untranslated_label.lower() - diff = best_prefix(label, prefixes) - if diff is not None: - controls.append((track, diff)) - - controls.sort(key=lambda e: e[1]) - if controls: - ''' - debug_output('Found control: %s' %\ - (str(controls[0][0].props.untranslated_label)), - self.parent.running_sugar) - ''' - if self.channels is None: - if hasattr(controls[0][0], 'num_channels'): - channels = controls[0][0].num_channels - if channels > 0: - self.channels = channels - ''' - debug_output('setting channels to %d' % (self.channels), - self.parent.running_sugar) - ''' - - return controls[0][0] - - return None - - def save_state(self): - '''Saves the state of all audio controls''' - self.master = self.get_master() - self.bias = self.get_bias() - self.dc_mode = self.get_dc_mode() - self.capture_gain = self.get_capture_gain() - self.mic_boost = self.get_mic_boost() - - def resume_state(self): - '''Put back all audio control settings from the saved state''' - self.set_master(self.master) - self.set_bias(self.bias) - self.set_dc_mode(self.dc_mode) - self.set_capture_gain(self.capture_gain) - self.set_mic_boost(self.mic_boost) - - def _get_mute(self, control, name, default): - '''Get mute status of a control''' - if not control: - return default - return bool(control.flags & gst.interfaces.MIXER_TRACK_MUTE) - - def _set_mute(self, control, name, value): - '''Mute a control''' - if not control: - return - self._mixer.set_mute(control, value) - - def _get_volume(self, control, name): - '''Get volume of a control and convert to a scale of 0-100''' - if not control: - return 100 - volume = self._mixer.get_volume(control) - if type(volume) == tuple: - hw_volume = volume[0] - else: - hw_volume = volume - min_vol = control.min_volume - max_vol = control.max_volume - if max_vol == min_vol: - percent = 100 - else: - percent = (hw_volume - min_vol) * 100 // (max_vol - min_vol) - return percent - - def _set_volume(self, control, name, value): - '''Sets the level of a control on a scale of 0-100''' - if not control: - return - # convert value to scale of control - min_vol = control.min_volume - max_vol = control.max_volume - if min_vol != max_vol: - hw_volume = value * (max_vol - min_vol) // 100 + min_vol - self._mixer.set_volume(control, - (hw_volume,) * control.num_channels) - - def amixer_set(self, control, state): - ''' Direct call to amixer for old systems. ''' - if state: - output = check_output( - ['amixer', 'set', "%s" % (control), 'unmute'], - 'Problem with amixer set "%s" unmute' % (control), - self.parent.running_sugar) - else: - output = check_output( - ['amixer', 'set', "%s" % (control), 'mute'], - 'Problem with amixer set "%s" mute' % (control), - self.parent.running_sugar) - - def mute_master(self): - '''Mutes the Master Control''' - if self._labels_available and self.parent.hw != XO1: - self._set_mute(self._master_control, 'Master', True) - else: - self.amixer_set('Master', False) - - def unmute_master(self): - '''Unmutes the Master Control''' - if self._labels_available and self.parent.hw != XO1: - self._set_mute(self._master_control, 'Master', True) - else: - self.amixer_set('Master', True) - - def set_master(self, master_val): - '''Sets the Master gain slider settings - master_val must be given as an integer between 0 and 100 indicating the - percentage of the slider to be set''' - if self._labels_available: - self._set_volume(self._master_control, 'Master', master_val) - else: - output = check_output( - ['amixer', 'set', 'Master', "%d%s" % (master_val, '%')], - 'Problem with amixer set Master', - self.parent.running_sugar) - - def get_master(self): - '''Gets the MIC gain slider settings. The value returned is an - integer between 0-100 and is an indicative of the percentage 0 - 100%''' - if self._labels_available: - return self._get_volume(self._master_control, 'master') - else: - output = check_output(['amixer', 'get', 'Master'], - 'amixer: Could not get Master volume', - self.parent.running_sugar) - if output is None: - return 100 - else: - output = output[find(output, 'Front Left:'):] - output = output[find(output, '[') + 1:] - output = output[:find(output, '%]')] - return int(output) - - def set_bias(self, bias_state=False): - '''Enables / disables bias voltage.''' - if self._labels_available and self.parent.hw != XO1: - if self._mic_bias_control is None: - return - # If there is a flag property, use set_mute - if self._mic_bias_control not in self._mixer.list_tracks() or \ - hasattr(self._mic_bias_control.props, 'flags'): - self._set_mute( - self._mic_bias_control, 'Mic Bias', not bias_state) - # We assume that values are sorted from lowest (=off) to highest. - # Since they are mixed strings ('Off', '50%', etc.), we cannot - # easily ensure this by sorting with the default sort order. - elif bias_state: # Otherwise, set with volume - self._mixer.set_volume(self._mic_bias_control, - self._mic_bias_control.max_volume) - else: - self._mixer.set_volume(self._mic_bias_control, - self._mic_bias_control.min_volume) - elif not self._labels_available: - self.amixer_set('V_REFOUT Enable', bias_state) - else: - self.amixer_set('MIC Bias Enable', bias_state) - - def get_bias(self): - '''Check whether bias voltage is enabled.''' - if self._labels_available: - if self._mic_bias_control is None: - return False - if self._mic_bias_control not in self._mixer.list_tracks() or \ - hasattr(self._mic_bias_control.props, 'flags'): - return not self._get_mute( - self._mic_bias_control, 'Mic Bias', False) - value = self._mixer.get_volume(self._mic_bias_control) - if value == self._mic_bias_control.min_volume: - return False - return True - else: - output = check_output(['amixer', 'get', "V_REFOUT Enable"], - 'amixer: Could not get mic bias voltage', - self.parent.running_sugar) - if output is None: - return False - else: - output = output[find(output, 'Mono:'):] - output = output[find(output, '[') + 1:] - output = output[:find(output, ']')] - if output == 'on': - return True - return False - - def set_dc_mode(self, dc_mode=False): - '''Sets the DC Mode Enable control - pass False to mute and True to unmute''' - if self._labels_available and self.parent.hw != XO1: - if self._dc_control is not None: - self._set_mute(self._dc_control, 'DC mode', not dc_mode) - else: - self.amixer_set('DC Mode Enable', dc_mode) - - def get_dc_mode(self): - '''Returns the setting of DC Mode Enable control - i.e. True: Unmuted and False: Muted''' - if self._labels_available and self.parent.hw != XO1: - if self._dc_control is not None: - return not self._get_mute(self._dc_control, 'DC mode', False) - else: - return False - else: - output = check_output(['amixer', 'get', "DC Mode Enable"], - 'amixer: Could not get DC Mode', - self.parent.running_sugar) - if output is None: - return False - else: - output = output[find(output, 'Mono:'):] - output = output[find(output, '[') + 1:] - output = output[:find(output, ']')] - if output == 'on': - return True - return False - - def set_mic_boost(self, mic_boost=False): - '''Set Mic Boost. - True = +20dB, False = 0dB''' - if self._labels_available: - if self._mic_boost_control is None: - return - # If there is a volume, use set volume - if hasattr(self._mic_boost_control, 'min_volume'): - if mic_boost: - self._set_volume(self._mic_boost_control, 'boost', 100) - else: - self._set_volume(self._mic_boost_control, 'boost', 0) - # Else if there is a flag property, use set_mute - elif self._mic_boost_control not in self._mixer.list_tracks() or \ - hasattr(self._mic_boost_control.props, 'flags'): - self._set_mute( - self._mic_boost_control, 'Mic Boost', not mic_boost) - else: - self.amixer_set('Mic Boost (+20dB)', mic_boost) - - def get_mic_boost(self): - '''Return Mic Boost setting. - True = +20dB, False = 0dB''' - if self._labels_available: - if self._mic_boost_control is None: - return False - if self._mic_boost_control not in self._mixer.list_tracks() or \ - hasattr(self._mic_boost_control.props, 'flags'): - return not self._get_mute( - self._mic_boost_control, 'Mic Boost', False) - else: # Compare to min value - value = self._mixer.get_volume(self._mic_boost_control) - if value != self._mic_boost_control.min_volume: - return True - return False - else: - output = check_output(['amixer', 'get', "Mic Boost (+20dB)"], - 'amixer: Could not get mic boost', - self.parent.running_sugar) - if output is None: - return False - else: - output = output[find(output, 'Mono:'):] - output = output[find(output, '[') + 1:] - output = output[:find(output, ']')] - if output == 'on': - return True - return False - - def set_capture_gain(self, capture_val): - '''Sets the Capture gain slider settings - capture_val must be given as an integer between 0 and 100 indicating the - percentage of the slider to be set''' - if self._labels_available and self.parent.hw != XO1: - if self._capture_control is not None: - self._set_volume(self._capture_control, 'Capture', capture_val) - else: - output = check_output( - ['amixer', 'set', 'Capture', "%d%s" % (capture_val, '%')], - 'Problem with amixer set Capture', - self.parent.running_sugar) - - def get_capture_gain(self): - '''Gets the Capture gain slider settings. The value returned is an - integer between 0-100 and is an indicative of the percentage 0 - 100%''' - if self._labels_available: - if self._capture_control is not None: - return self._get_volume(self._capture_control, 'Capture') - else: - return 0 - else: - output = check_output(['amixer', 'get', 'Capture'], - 'amixer: Could not get Capture level', - self.parent.running_sugar) - if output is None: - return 100 - else: - output = output[find(output, 'Front Left:'):] - output = output[find(output, '[') + 1:] - output = output[:find(output, '%]')] - return int(output) - - def set_mic_gain(self, mic_val): - '''Sets the MIC gain slider settings - mic_val must be given as an integer between 0 and 100 indicating the - percentage of the slider to be set''' - if self._labels_available and self.parent.hw != XO1: - self._set_volume(self._mic_gain_control, 'Mic', mic_val) - else: - output = check_output( - ['amixer', 'set', 'Mic', "%d%s" % (mic_val, '%')], - 'Problem with amixer set Mic', - self.parent.running_sugar) - - def get_mic_gain(self): - '''Gets the MIC gain slider settings. The value returned is an - integer between 0-100 and is an indicative of the percentage 0 - 100%''' - if self._labels_available and self.parent.hw != XO1: - return self._get_volume(self._mic_gain_control, 'Mic') - else: - output = check_output(['amixer', 'get', 'Mic'], - 'amixer: Could not get mic gain level', - self.parent.running_sugar) - if output is None: - return 100 - else: - output = output[find(output, 'Mono:'):] - output = output[find(output, '[') + 1:] - output = output[:find(output, '%]')] - return int(output) - - def _set_sensor_type(self, mode=None, bias=None, gain=None, boost=None): - '''Helper to modify (some) of the sensor settings.''' - if mode is not None: - self.set_dc_mode(mode) - if bias is not None: - self.set_bias(bias) - if gain is not None: - self.set_capture_gain(gain) - if boost is not None: - self.set_mic_boost(boost) - self.save_state() - - def on_activity_quit(self): - '''When Activity quits''' - self.set_mic_boost(QUIT_MIC_BOOST) - self.set_dc_mode(QUIT_DC_MODE_ENABLE) - self.set_capture_gain(QUIT_CAPTURE_GAIN) - self.set_bias(QUIT_BIAS) - self.stop_sound_device() - - -def check_output(command, warning, running_sugar=True): - ''' Workaround for old systems without subprocess.check_output''' - if hasattr(subprocess, 'check_output'): - try: - output = subprocess.check_output(command) - except subprocess.CalledProcessError: - debug_output(warning, running_sugar) - return None - else: - import commands - - cmd = '' - for c in command: - cmd += c - cmd += ' ' - (status, output) = commands.getstatusoutput(cmd) - if status != 0: - debug_output(warning, running_sugar) - return None - return output |