From 554d6fd62c516ba5131e6d5da5c46e5472a15c94 Mon Sep 17 00:00:00 2001 From: Walter Bender Date: Fri, 29 Oct 2010 00:52:50 +0000 Subject: adding sensor blocks --- diff --git a/TurtleArt/audiograb.py b/TurtleArt/audiograb.py new file mode 100644 index 0000000..f2556cd --- /dev/null +++ b/TurtleArt/audiograb.py @@ -0,0 +1,733 @@ +#! /usr/bin/python +# +# Author: Arjun Sarwal arjun@laptop.org +# Copyright (C) 2007, Arjun Sarwal +# Copyright (C) 2009,10 Walter Bender +# Copyright (C) 2009, Benjamin Berg, Sebastian Berg +# Copyright (C) 2009, Sayamindu Dasgupta +# Copyright (C) 2010, Sascha Silbe +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +import pygst +pygst.require("0.10") +import gst +import gst.interfaces +from numpy import fromstring +import os +import subprocess +from string import find +from threading import Timer + +# Initial device settings +RATE = 48000 +MIC_BOOST = True +DC_MODE_ENABLE = False +CAPTURE_GAIN = 50 +BIAS = True + +# Setting on quit +QUIT_MIC_BOOST = False +QUIT_DC_MODE_ENABLE = False +QUIT_CAPTURE_GAIN = 100 +QUIT_BIAS = True + +import logging + +_logger = logging.getLogger('TurtleArt') +_logger.setLevel(logging.DEBUG) +logging.basicConfig() + +from taconstants import SENSOR_AC_NO_BIAS, SENSOR_AC_BIAS, SENSOR_DC_NO_BIAS, \ + SENSOR_DC_BIAS, XO1 + + +class AudioGrab: + """ The interface between measure and the audio device """ + + def __init__(self, callable1, activity): + """ Initialize the class: callable1 is a data buffer; + activity is the parent class""" + + self.callable1 = callable1 + self.activity = activity + self.sensor = None + + self.temp_buffer = [0] + self.picture_buffer = [] # place to hold screen grabs + + self.draw_graph_status = False + self.screenshot = True + + self.rate = 48000 + self.final_count = 0 + self.count_temp = 0 + self.entry_count = 0 + + self.waveform_id = 1 + self.logging_state = False + self.buffer_interval_logging = 0 + + self.counter_buffer = 0 + + self._dc_control = None + self._mic_bias_control = None + self._capture_control = None + self._mic_boost_control = None + self._hardwired = False # Query controls or use hardwired names + + # Set up gst pipeline + self.pipeline = gst.Pipeline("pipeline") + self.alsasrc = gst.element_factory_make("alsasrc", "alsa-source") + self.pipeline.add(self.alsasrc) + self.caps1 = gst.element_factory_make("capsfilter", "caps1") + self.pipeline.add(self.caps1) + caps_str = "audio/x-raw-int,rate=%d,channels=1,depth=16" % (RATE) + self.caps1.set_property("caps", gst.caps_from_string(caps_str)) + self.fakesink = gst.element_factory_make("fakesink", "fsink") + self.pipeline.add(self.fakesink) + self.fakesink.connect("handoff", self.on_buffer) + self.fakesink.set_property("signal-handoffs", True) + gst.element_link_many(self.alsasrc, self.caps1, self.fakesink) + + self.dont_queue_the_buffer = False + + self._mixer = gst.element_factory_make('alsamixer') + rc = self._mixer.set_state(gst.STATE_PAUSED) + assert rc == gst.STATE_CHANGE_SUCCESS + + # Query the available controls + try: # F11+ + _logger.debug('controls: %r', [t.props.untranslated_label \ + for t in self._mixer.list_tracks()]) + self._dc_control = self._find_control(['dc mode']) + self._mic_bias_control = self._find_control(['mic bias', + 'dc input bias', + 'v_refout']) + if self._mic_bias_control is not None: + _logger.debug("Mic Bias is %s" % ( + self._mic_bias_control.props.untranslated_label)) + _logger.debug("Min %s" % (str(self._mic_bias_control.min_volume))) + _logger.debug("Max %s" % (str(self._mic_bias_control.max_volume))) + _logger.debug("Channels %s" % ( + str(self._mic_bias_control.num_channels))) + self._mic_boost_control = self._find_control(['mic boost', + 'mic boost (+20db)', + 'internal mic boost', + 'analog mic boost']) + if self._mic_boost_control is not None: + _logger.debug("Mic Boost is %s" % ( + self._mic_boost_control.props.untranslated_label)) + _logger.debug("Min %s" % (str(self._mic_boost_control.min_volume))) + _logger.debug("Max %s" % (str(self._mic_boost_control.max_volume))) + _logger.debug("Channels %s" % ( + str(self._mic_boost_control.num_channels))) + + self._mic_gain_control = self._find_control(['mic']) + self._capture_control = self._find_control(['capture']) + if self._capture_control is not None: + _logger.debug("Capture is %s" % ( + self._capture_control.props.untranslated_label)) + _logger.debug("Min %s" % (str(self._capture_control.min_volume))) + _logger.debug("Max %s" % (str(self._capture_control.max_volume))) + _logger.debug("Channels %s" % ( + str(self._capture_control.num_channels))) + self._master_control = self._find_control(['master']) + except AttributeError: # F9- (no untranslated_label attribute) + self._hardwired = True + + # Variables for saving and resuming state of sound device + self.master = self.get_master() + self.bias = BIAS + self.dcmode = DC_MODE_ENABLE + self.capture_gain = CAPTURE_GAIN + self.mic_boost = MIC_BOOST + self.mic = self.get_mic_gain() + + # Timer for interval sampling and switch to indicate when to capture + self.capture_timer = None + self.capture_interval_sample = False + + def set_handoff_signal(self, handoff_state): + """Sets whether the handoff signal would generate an interrupt or not""" + self.fakesink.set_property("signal-handoffs", handoff_state) + + def _new_buffer(self, buf): + """ Use a new buffer """ + if not self.dont_queue_the_buffer: + self.temp_buffer = buf + self.callable1(buf) + else: + pass + + def on_buffer(self, element, buffer, pad): + """The function that is called whenever new data is available + This is the signal handler for the handoff signal""" + if buffer is None: + _logger.debug('audiograb buffer is None') + return False + + temp_buffer = fromstring(buffer, 'int16') + if not self.dont_queue_the_buffer: + self._new_buffer(temp_buffer) + return False + + def set_sensor(self, sensor): + """Keep a reference to the sensot toolbar for logging""" + self.sensor = sensor + + def start_sound_device(self): + """Start or Restart grabbing data from the audio capture""" + gst.event_new_flush_start() + self.pipeline.set_state(gst.STATE_PLAYING) + + def stop_sound_device(self): + """Stop grabbing data from capture device""" + gst.event_new_flush_stop() + self.pipeline.set_state(gst.STATE_NULL) + + def set_sampling_rate(self, sr): + """Sets the sampling rate of the capture device + Sampling rate must be given as an integer for example 16000 for + setting 16Khz sampling rate + The sampling rate would be set in the device to the nearest available""" + self.pause_grabbing() + caps_str = "audio/x-raw-int,rate=%d,channels=1,depth=16" % (sr, ) + self.caps1.set_property("caps", gst.caps_from_string(caps_str)) + self.resume_grabbing() + + def get_sampling_rate(self): + """Gets the sampling rate of the capture device""" + return int(self.caps1.get_property("caps")[0]['rate']) + + def set_callable1(self, callable1): + """Sets the callable to the drawing function for giving the + data at the end of idle-add""" + self.callable1 = callable1 + + def start_grabbing(self): + """Called right at the start of the Activity""" + self.start_sound_device() + + def pause_grabbing(self): + """When Activity goes into background""" + self.save_state() + self.stop_sound_device() + + def resume_grabbing(self): + """When Activity becomes active after going to background""" + self.start_sound_device() + self.resume_state() + + def stop_grabbing(self): + """Not used ???""" + self.stop_sound_device() + self.set_handoff_signal(False) + + def _find_control(self, prefixes): + """Try to find a mixer control matching one of the prefixes. + + The control with the best match (smallest difference in length + between label and prefix) will be returned. If no match is found, + None is returned. + """ + def best_prefix(label, prefixes): + matches =\ + [len(label) - len(p) for p in prefixes if label.startswith(p)] + if not matches: + return None + + matches.sort() + return matches[0] + + controls = [] + for track in self._mixer.list_tracks(): + label = track.props.untranslated_label.lower() + diff = best_prefix(label, prefixes) + if diff is not None: + controls.append((track, diff)) + + controls.sort(key=lambda e: e[1]) + if controls: + _logger.debug("found control: %s" %\ + (str(controls[0][0].props.untranslated_label))) + return controls[0][0] + + return None + + def save_state(self): + """Saves the state of all audio controls""" + _logger.debug("====================================") + _logger.debug("Save state") + self.master = self.get_master() + self.bias = self.get_bias() + self.dcmode = self.get_dc_mode() + self.capture_gain = self.get_capture_gain() + self.mic_boost = self.get_mic_boost() + _logger.debug("====================================") + + def resume_state(self): + """Put back all audio control settings from the saved state""" + _logger.debug("====================================") + _logger.debug("Resume state") + self.set_master(self.master) + self.set_bias(self.bias) + self.set_dc_mode(self.dcmode) + self.set_capture_gain(self.capture_gain) + self.set_mic_boost(self.mic_boost) + _logger.debug("====================================") + + """ + self.set_PCM_gain(self.PCM ) + self.set_mic_gain(self.mic) + """ + + def _get_mute(self, control, name, default): + """Get mute status of a control""" + if not control: + _logger.warning('No %s control, returning constant mute status', + name) + return default + + value = bool(control.flags & gst.interfaces.MIXER_TRACK_MUTE) + _logger.debug('Getting %s (%s) mute status: %r', name, + control.props.untranslated_label, value) + return value + + def _set_mute(self, control, name, value): + """Mute a control""" + if not control: + _logger.warning('No %s control, not setting mute', name) + return + + self._mixer.set_mute(control, value) + _logger.debug('Set mute for %s (%s) to %r', name, + control.props.untranslated_label, value) + + def _get_volume(self, control, name): + """Get volume of a control and convert to a scale of 0-100""" + if not control: + _logger.warning('No %s control, returning constant volume', name) + return 100 + + try: # sometimes get_volume does not return a tuple + hw_volume = self._mixer.get_volume(control)[0] + except IndexError: + _logger.warning('_get_volume: %s (%d-%d) %d channels' % ( + control.props.untranslated_label, control.min_volume, + control.max_volume, control.num_channels)) + return 100 + + min_vol = control.min_volume + max_vol = control.max_volume + percent = (hw_volume - min_vol)*100//(max_vol - min_vol) + _logger.debug('Getting %s (%s) volume: %d (%d)', name, + control.props.untranslated_label, percent, hw_volume) + return percent + + def _set_volume(self, control, name, value): + """Sets the level of a control on a scale of 0-100""" + if not control: + _logger.warning('No %s control, not setting volume', name) + return + + # convert value to scale of control + min_vol = control.min_volume + max_vol = control.max_volume + if min_vol != max_vol: + hw_volume = value*(max_vol - min_vol)//100 + min_vol + self._mixer.set_volume(control, (hw_volume,)*control.num_channels) + _logger.debug('Set volume of %s (%s) to %d (%d)', name, + control.props.untranslated_label, value, hw_volume) + else: + _logger.warning('_set_volume: %s (%d-%d) %d channels' % ( + control.props.untranslated_label, control.min_volume, + control.max_volume, control.num_channels)) + + def amixer_set(self, control, state): + """ Direct call to amixer for old systems. """ + if state: + os.system("amixer set '%s' unmute" % (control)) + else: + os.system("amixer set '%s' mute" % (control)) + + def mute_master(self): + """Mutes the Master Control""" + if not self._hardwired and self.activity.hw != XO1: + self._set_mute(self._master_control, 'Master', True) + else: + self.amixer_set('Master', False) + + def unmute_master(self): + """Unmutes the Master Control""" + if not self._hardwired and self.activity.hw != XO1: + self._set_mute(self._master_control, 'Master', True) + else: + self.amixer_set('Master', True) + + def set_master(self, master_val): + """Sets the Master gain slider settings + master_val must be given as an integer between 0 and 100 indicating the + percentage of the slider to be set""" + if not self._hardwired: + self._set_volume(self._master_control, 'Master', master_val) + else: + os.system("amixer set Master " + str(master_val) + "%") + + def get_master(self): + """Gets the Master gain slider settings. The value returned is an + integer between 0-100 and is an indicative of the percentage 0 - 100%""" + if not self._hardwired: + return self._get_volume(self._master_control, 'master') + else: + p = str(subprocess.Popen(["amixer", "get", "Master"], + stdout=subprocess.PIPE).communicate()[0]) + p = p[find(p, "Front Left:"):] + p = p[find(p, "[")+1:] + p = p[:find(p, "%]")] + return int(p) + + def set_bias(self, bias_state=False): + """Enables / disables bias voltage.""" + if not self._hardwired and self.activity.hw != XO1: + if self._mic_bias_control is None: + return + # if not isinstance(self._mic_bias_control, + # gst.interfaces.MixerOptions): + if self._mic_bias_control not in self._mixer.list_tracks(): + _logger.warning("set_bias: not in mixer") + return self._set_mute(self._mic_bias_control, 'Mic Bias', + not bias_state) + + #values = self._mic_bias_control.get_values() + # We assume that values are sorted from lowest (=off) to highest. + # Since they are mixed strings ("Off", "50%", etc.), we cannot + # easily ensure this by sorting with the default sort order. + _logger.debug("set bias max is %s" % (str( + self._mic_bias_control.max_volume))) + try: + if bias_state: + # self._mixer.set_option(self._mic_bias_control, values[-1]) + self._mixer.set_volume(self._mic_bias_control, + self._mic_bias_control.max_volume) + else: + self._mixer.set_volume(self._mic_bias_control, + self._mic_bias_control.min_volume) + # self._mixer.set_option(self._mic_bias_control, values[0]) + except TypeError: + _logger.warning('set_bias: %s (%d-%d) %d channels' % ( + self._mic_bias_control.props.untranslated_label, + self._mic_bias_control.min_volume, + self._mic_bias_control.max_volume, + self._mic_bias_control.num_channels)) + self._set_mute(self._mic_bias_control, 'Mic Bias', + not bias_state) + elif self._hardwired: + self.amixer_set('V_REFOUT Enable', bias_state) + else: + self.amixer_set('MIC Bias Enable', bias_state) + + def get_bias(self): + """Check whether bias voltage is enabled.""" + if not self._hardwired: + if self._mic_bias_control is None: + return False + if self._mic_bias_control not in self._mixer.list_tracks(): + # gst.interfaces.MixerOptions): + _logger.warning("get_bias: not in mixer") + return not self._get_mute(self._mic_bias_control, 'Mic Bias', + False) + #values = self._mic_bias_control.get_option() + #values = self._mic_bias_control.get_values() + _logger.warning('get_bias: %s (%d-%d) %d channels' % ( + self._mic_bias_control.props.untranslated_label, + self._mic_bias_control.min_volume, + self._mic_bias_control.max_volume, + self._mic_bias_control.num_channels)) + current = self._mixer.get_volume(self._mic_bias_control) + # same ordering assertion as in set_bias() applies + # if current == values[0]: + _logger.debug('current: %s' % (str(current))) + if current == self._mic_bias_control.min_volume: + return False + return True + else: + p = str(subprocess.Popen(["amixer", "get", "'V_REFOUT Enable'"], + stdout=subprocess.PIPE).communicate()[0]) + p = p[find(p, "Mono:"):] + p = p[find(p, "[")+1:] + p = p[:find(p, "]")] + if p == "on": + return True + return False + + def set_dc_mode(self, dc_mode=False): + """Sets the DC Mode Enable control + pass False to mute and True to unmute""" + if not self._hardwired and self.activity.hw != XO1: + if self._dc_control is not None: + self._set_mute(self._dc_control, 'DC mode', not dc_mode) + else: + self.amixer_set('DC Mode Enable', dc_mode) + + def get_dc_mode(self): + """Returns the setting of DC Mode Enable control + i .e. True: Unmuted and False: Muted""" + if not self._hardwired: + if self._dc_control is not None: + return not self._get_mute(self._dc_control, 'DC mode', False) + else: + return False + else: + p = str(subprocess.Popen(["amixer", "get", "'DC Mode Enable'"], + stdout=subprocess.PIPE).communicate()[0]) + p = p[find(p, "Mono:"):] + p = p[find(p, "[")+1:] + p = p[:find(p, "]")] + if p == "on": + return True + else: + return False + + def set_mic_boost(self, mic_boost=False): + """Set Mic Boost. + True = +20dB, False = 0dB""" + if not self._hardwired: + if self._mic_boost_control is None: + return + if self._mic_boost_control not in self._mixer.list_tracks(): + # gst.interfaces.MixerOptions): + _logger.warning("set_mic_boost not in mixer %s" %\ + (str(self._mic_boost_control))) + return self._set_mute(self._mic_boost_control, 'Mic Boost', + mic_boost) + #values = self._mic_boost_control.get_values() + value = self._mixer.get_volume(self._mic_boost_control) + """ + if '20dB' not in values or '0dB' not in values: + _logging.error("Mic Boost (%s) is an option list, but doesn't " + "contain 0dB and 20dB settings", + self._mic_boost_control.props.label) + return + """ + try: + if mic_boost: + # self._mixer.set_option(self._mic_boost_control, '20dB') + self._mixer.set_volume(self._mic_boost_control, + self._mic_boost_control.max_volume) + else: + # self._mixer.set_option(self._mic_boost_control, '0dB') + self._mixer.set_volume(self._mic_boost_control, + self._mic_boost_control.min_volume) + except TypeError: + _logger.warning('set_mic_boost: %s (%d-%d) %d channels' % ( + self._mic_boost_control.props.untranslated_label, + self._mic_boost_control.min_volume, + self._mic_boost_control.max_volume, + self._mic_boost_control.num_channels)) + return self._set_mute(self._mic_boost_control, 'Mic Boost', + not mic_boost) + else: + self.amixer_set('Mic Boost (+20dB)', mic_boost) + + def get_mic_boost(self): + """Return Mic Boost setting. + True = +20dB, False = 0dB""" + if not self._hardwired: + if self._mic_boost_control is None: + return False + if self._mic_boost_control not in self._mixer.list_tracks(): + _loggerging.error("get_mic_boost not found in mixer %s" %\ + (str(self._mic_boost_control))) + return self._get_mute(self._mic_boost_control, 'Mic Boost', + False) + #values = self._mic_boost_control.get_values() + # values = self._mixer.get_option(self._mic_boost_control) + """ + if '20dB' not in values or '0dB' not in values: + _logging.error("Mic Boost (%s) is an option list, but doesn't " + "contain 0dB and 20dB settings", + self._mic_boost_control.props.label) + return False + """ + _logger.warning('get_mic_boost: %s (%d-%d) %d channels' % ( + self._mic_boost_control.props.untranslated_label, + self._mic_boost_control.min_volume, + self._mic_boost_control.max_volume, + self._mic_boost_control.num_channels)) + current = self._mixer.get_volume(self._mic_boost_control) + _logger.debug('current: %s' % (str(current))) + # if current == '20dB': + if current != self._mic_boost_control.min_volume: + return True + return False + else: + p = str(subprocess.Popen(["amixer", "get", "'Mic Boost (+20dB)'"], + stdout=subprocess.PIPE).communicate()[0]) + p = p[find(p, "Mono:"):] + p = p[find(p, "[")+1:] + p = p[:find(p, "]")] + if p == "on": + return True + else: + return False + + def set_capture_gain(self, capture_val): + """Sets the Capture gain slider settings + capture_val must be given as an integer between 0 and 100 indicating the + percentage of the slider to be set""" + if not self._hardwired and self.activity.hw != XO1: + if self._capture_control is not None: + self._set_volume(self._capture_control, 'Capture', capture_val) + else: + os.system("amixer set Capture " + str(capture_val) + "%") + + def get_capture_gain(self): + """Gets the Capture gain slider settings. The value returned is an + integer between 0-100 and is an indicative of the percentage 0 - 100%""" + if not self._hardwired: + if self._capture_control is not None: + return self._get_volume(self._capture_control, 'Capture') + else: + return 0 + else: + p = str(subprocess.Popen(["amixer", "get", "Capture"], + stdout=subprocess.PIPE).communicate()[0]) + p = p[find(p, "Front Left:"):] + p = p[find(p, "[")+1:] + p = p[:find(p, "%]")] + return int(p) + + def set_mic_gain(self, mic_val): + """Sets the MIC gain slider settings + mic_val must be given as an integer between 0 and 100 indicating the + percentage of the slider to be set""" + if not self._hardwired and self.activity.hw != XO1: + self._set_volume(self._mic_gain_control, 'Mic', mic_val) + else: + os.system("amixer set Mic " + str(mic_val) + "%") + + def get_mic_gain(self): + """Gets the MIC gain slider settings. The value returned is an + integer between 0-100 and is an indicative of the percentage 0 - 100%""" + if not self._hardwired: + return self._get_volume(self._mic_gain_control, 'Mic') + else: + p = str(subprocess.Popen(["amixer", "get", "Mic"], + stdout=subprocess.PIPE).communicate()[0]) + try: + p = p[find(p, "Mono:"):] + p = p[find(p, "[")+1:] + p = p[:find(p, "%]")] + return int(p) + except: + return(0) + + def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS): + """Set the type of sensor you want to use. Set sensor_type according + to the following + SENSOR_AC_NO_BIAS - AC coupling with Bias Off --> Very rarely used. + Use when connecting a dynamic microphone externally + SENSOR_AC_BIAS - AC coupling with Bias On --> The default settings. + The internal MIC uses these + SENSOR_DC_NO_BIAS - DC coupling with Bias Off --> measuring voltage + output sensor. For example LM35 which gives output proportional + to temperature + SENSOR_DC_BIAS - DC coupling with Bias On --> measuing resistance. + """ + PARAMETERS = { + SENSOR_AC_NO_BIAS: (False, False, 50, True), + SENSOR_AC_BIAS: (False, True, 40, True), + SENSOR_DC_NO_BIAS: (True, False, 0, False), + SENSOR_DC_BIAS: (True, True, 0, False) + } + mode, bias, gain, boost = PARAMETERS[sensor_type] + _logger.debug("====================================") + _logger.debug("Set Sensor Type to %s" % (str(sensor_type))) + self._set_sensor_type(mode, bias, gain, boost) + _logger.debug("====================================") + + def _set_sensor_type(self, mode=None, bias=None, gain=None, boost=None): + """Helper to modify (some) of the sensor settings.""" + if mode is not None: + self.set_dc_mode(mode) + if self._dc_control is not None: + os.system("amixer get '%s'" %\ + (self._dc_control.props.untranslated_label)) + if bias is not None: + self.set_bias(bias) + if self._mic_bias_control is not None: + os.system("amixer get '%s'" %\ + (self._mic_bias_control.props.untranslated_label)) + if gain is not None: + self.set_capture_gain(gain) + if self._capture_control is not None: + os.system("amixer get '%s'" %\ + (self._capture_control.props.untranslated_label)) + if boost is not None: + self.set_mic_boost(boost) + if self._mic_boost_control is not None: + os.system("amixer get '%s'" %\ + (self._mic_boost_control.props.untranslated_label)) + + def on_activity_quit(self): + """When Activity quits""" + _logger.debug("====================================") + _logger.debug("Quitting") + self.set_mic_boost(QUIT_MIC_BOOST) + self.set_dc_mode(QUIT_DC_MODE_ENABLE) + self.set_capture_gain(QUIT_CAPTURE_GAIN) + self.set_bias(QUIT_BIAS) + self.stop_sound_device() + _logger.debug("====================================") + + +class AudioGrab_XO1(AudioGrab): + """ Use default parameters for OLPC XO 1.0 laptop """ + pass + + +class AudioGrab_XO15(AudioGrab): + """ Override parameters for OLPC XO 1.5 laptop """ + def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS): + """Helper to modify (some) of the sensor settings.""" + PARAMETERS = { + SENSOR_AC_NO_BIAS: (False, False, 80, True), + SENSOR_AC_BIAS: (False, True, 80, True), + SENSOR_DC_NO_BIAS: (True, False, 80, False), + SENSOR_DC_BIAS: (True, True, 90, False) + } + _logger.debug("====================================") + _logger.debug("Set Sensor Type to %s" % (str(sensor_type))) + mode, bias, gain, boost = PARAMETERS[sensor_type] + self._set_sensor_type(mode, bias, gain, boost) + _logger.debug("====================================") + + +class AudioGrab_Unknown(AudioGrab): + """ Override parameters for generic hardware """ + def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS): + """Helper to modify (some) of the sensor settings.""" + PARAMETERS = { + SENSOR_AC_NO_BIAS: (None, False, 50, True), + SENSOR_AC_BIAS: (None, True, 40, True), + SENSOR_DC_NO_BIAS: (True, False, 80, False), + SENSOR_DC_BIAS: (True, True, 90, False) + } + _logger.debug("====================================") + _logger.debug("Set Sensor Type to %s" % (str(sensor_type))) + mode, bias, gain, boost = PARAMETERS[sensor_type] + self._set_sensor_type(mode, bias, gain, boost) + _logger.debug("====================================") diff --git a/TurtleArt/ringbuffer.py b/TurtleArt/ringbuffer.py new file mode 100644 index 0000000..d2fb1af --- /dev/null +++ b/TurtleArt/ringbuffer.py @@ -0,0 +1,112 @@ +# Copyright (C) 2009, Benjamin Berg, Sebastian Berg +# Copyright (C) 2010, Walter Bender +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +import numpy as np + +class RingBuffer1d(object): + """This class implements an array being written in as a ring and that can + be read from continuously ending with the newest data or starting with the + oldest. It returns a numpy array copy of the data; + """ + def __init__(self, length, dtype=None): + """Initialize the 1 dimensional ring buffer with the given lengths. + The initial values are all 0s + """ + + self.offset = 0 + + self._data = np.zeros(length, dtype=dtype) + + self.stored = 0 + + + def fill(self, number): + self._data.fill(number) + self.offset = 0 + + + def append(self, data): + """Append to the ring buffer (and overwrite old data). If len(data) + is greater then the ring buffers length, the newest data takes + precedence. + """ + data = np.asarray(data) + + if len(self._data) == 0: + return + + if len(data) >= len(self._data): + self._data[:] = data[-len(self._data):] + self.offset = 0 + self.stored = len(self._data) + + elif len(self._data)-self.offset >= len(data): + self._data[self.offset:self.offset+len(data)] = data + self.offset = self.offset+len(data) + self.stored += len(data) + else: + self._data[self.offset:] = data[:len(self._data)-self.offset] + self._data[:len(data)-(len(self._data)-self.offset)] = \ + data[-len(data)+(len(self._data)-self.offset):] + self.offset = len(data)-(len(self._data)-self.offset) + self.stored += len(data) + + if len(self._data) <= self.stored: + self.read = self._read + + + def read(self, number=None, step=1): + """Read the ring Buffer. Number can be positive or negative. + Positive values will give the latest information, negative values will + give the newest added information from the buffer. (in normal order) + + Before the buffer is filled once: This returns just None + """ + return np.array([]) + + + def _read(self, number=None, step=1): + """Read the ring Buffer. Number can be positive or negative. + Positive values will give the latest information, negative values will + give the newest added information from the buffer. (in normal order) + """ + if number == None: + number = len(self._data)//step + + number *= step + assert abs(number) <= len(self._data), \ + 'Number to read*step must be smaller then length' + + if number < 0: + if abs(number) <= self.offset: + return self._data[self.offset+number:self.offset:step] + + spam = (self.offset-1) % step + + return np.concatenate( + (self._data[step-spam-1+self.offset+number::step], + self._data[spam:self.offset:step])) + + if number-(len(self._data)-self.offset) > 0: + spam = ((self.offset+number) - self.offset-1) % step + return np.concatenate( + (self._data[self.offset:self.offset+number:step], + self._data[spam:number- + (len(self._data)-self.offset):step])) + + return self._data[self.offset:self.offset+number:step].copy() + diff --git a/TurtleArt/taconstants.py b/TurtleArt/taconstants.py index ca5aa59..b23f5d4 100644 --- a/TurtleArt/taconstants.py +++ b/TurtleArt/taconstants.py @@ -113,7 +113,7 @@ TOP_LAYER = 1000 # PALETTE_NAMES = ['turtle', 'pen', 'colors', 'numbers', 'flow', 'blocks', - 'extras', 'portfolio', 'trash'] + 'extras', 'sensor', 'portfolio', 'trash'] PALETTES = [['clean', 'forward', 'back', 'show', 'left', 'right', 'seth', 'setxy2', 'heading', 'xcor', 'ycor', 'setscale', @@ -132,10 +132,11 @@ PALETTES = [['clean', 'forward', 'back', 'show', 'left', 'right', ['hat1', 'stack1', 'hat', 'hat2', 'stack2', 'stack', 'storeinbox1', 'storeinbox2', 'string', 'box1', 'box2', 'box', 'storein', 'start'], - ['kbinput', 'push', 'printheap', 'keyboard', 'pop', 'clearheap', + ['push', 'printheap', 'keyboard', 'pop', 'clearheap', 'myfunc1arg', 'userdefined', 'addturtle', 'comment', 'print', 'cartesian', 'width', 'height', 'polar', 'sandwichtop_no_label', - 'sandwichbottom', 'readpixel', 'see', 'reskin'], + 'sandwichbottom', 'reskin'], + ['kbinput', 'readpixel', 'see', 'volume'], # 'pitch' ['journal', 'audio', 'description', 'hideblocks', 'showblocks', 'fullscreen', 'savepix', 'savesvg', 'picturelist', 'picture1x1a', 'picture1x1', 'picture2x2', 'picture2x1', @@ -149,8 +150,8 @@ PALETTES = [['clean', 'forward', 'back', 'show', 'left', 'right', COLORS = [["#00FF00", "#00A000"], ["#00FFFF", "#00A0A0"], ["#00FFFF", "#00A0A0"], ["#FF00FF", "#A000A0"], ["#FFC000", "#A08000"], ["#FFFF00", "#A0A000"], - ["#FF0000", "#A00000"], ["#0000FF", "#0000A0"], - ["#FFFF00", "#A0A000"]] + ["#FF0000", "#A00000"], ["#FF0000", "#A00000"], + ["#0000FF", "#0000A0"], ["#FFFF00", "#A0A000"]] BOX_COLORS = {'red': ["#FF0000", "#A00000"], 'orange': ["#FFD000", "#AA8000"], @@ -186,6 +187,13 @@ HIT_GREEN = "#00F000" HIDE_WHITE = "#F8F8F8" SHOW_WHITE = "#F0F0F0" DEFAULT_SCALE = 33 +XO1 = 'xo1' +XO15 = 'xo1.5' +UNKNOWN = 'unknown' +SENSOR_AC_NO_BIAS = 'external' +SENSOR_AC_BIAS = 'sound' +SENSOR_DC_NO_BIAS = 'voltage' +SENSOR_DC_BIAS = 'resistance' # # Block-style definitions @@ -395,6 +403,7 @@ BLOCK_NAMES = { 'sandwichtop_no_arm_no_label': [' '], 'scale': [_('scale')], 'see': [_('turtle sees')], + 'sensor': [_('sensors')], 'setcolor': [_('set color')], 'setgray': [_('set gray')], 'seth': [_('set heading')], @@ -859,6 +868,7 @@ HELP_STRINGS = { 'savesvg': _("saves turtle graphics as an SVG file in the Sugar Journal"), 'scale': _("holds current scale value"), 'see': _('returns the color that the turtle "sees"'), + 'sensor': _("Palette of sensor blocks"), 'setcolor': _("sets color of the line drawn by the turtle"), 'setgray': _("sets gray level of the line drawn by the turtle"), 'seth': _("sets the heading of the turtle (0 is towards the top of the screen.)"), diff --git a/TurtleArt/talogo.py b/TurtleArt/talogo.py index 1819520..d127836 100644 --- a/TurtleArt/talogo.py +++ b/TurtleArt/talogo.py @@ -34,7 +34,8 @@ except: pass from taconstants import PALETTES, PALETTE_NAMES, TAB_LAYER, BLACK, WHITE, \ - DEFAULT_SCALE, ICON_SIZE, BLOCK_NAMES, CONSTANTS + DEFAULT_SCALE, ICON_SIZE, BLOCK_NAMES, CONSTANTS, SENSOR_DC_NO_BIAS, \ + SENSOR_DC_BIAS from tagplay import play_audio, play_movie_from_file, stop_media from tajail import myfunc, myfunc_import from tautils import get_pixbuf_from_journal, movie_media_type, convert, \ @@ -343,6 +344,7 @@ class LogoCode: 'pendown': [0, lambda self: self.tw.canvas.setpen(True)], 'pensize': [0, lambda self: self.tw.canvas.pensize], 'penup': [0, lambda self: self.tw.canvas.setpen(False)], + 'pitch': [0, lambda self: self._get_pitch()], 'plus': [2, lambda self, x, y: _plus(x, y)], 'polar': [0, lambda self: self.tw.set_polar(True)], 'pop': [0, lambda self: self._prim_pop()], @@ -355,6 +357,7 @@ class LogoCode: 'readpixel': [0, lambda self: self._read_pixel()], 'red': [0, lambda self: CONSTANTS['red']], 'repeat': [2, self._prim_repeat, True], + 'resistance': [0, lambda self: self._get_resistance()], 'right': [1, lambda self, x: self._prim_right(x)], 'rightx': [0, lambda self: CONSTANTS['rightx']], 'rpos': [0, lambda self: CONSTANTS['rightpos']], @@ -415,9 +418,10 @@ class LogoCode: 'userdefined3': [3, lambda self, x, y, z: self._prim_myblock([x, y, z])], 'video': [1, lambda self, x: self._play_movie(x)], + 'voltage': [0, lambda self: self._get_voltage()], + 'volume': [0, lambda self: self._get_volume()], 'vres': [0, lambda self: CONSTANTS['height']], 'wait': [1, self._prim_wait, True], - # 'while': [2, self._prim_while, True], 'white': [0, lambda self: WHITE], 'write': [2, lambda self, x, y: self._write(self, x, y)], 'xcor': [0, lambda self: self.tw.canvas.xcor / self.tw.coord_scale], @@ -463,6 +467,13 @@ class LogoCode: self.scale = DEFAULT_SCALE + ### sensor stuff + self.max_samples = 115 + self.input_step = 1 + from ringbuffer import RingBuffer1d + self.ringbuffer = RingBuffer1d(self.max_samples, dtype='int16') + self.fftx = [] + def _def_prim(self, name, args, fcn, rprim=False): """ Define the primitives associated with the blocks """ sym = self._intern(name) @@ -486,6 +497,7 @@ class LogoCode: self.tw.saving_svg = False self.find_value_blocks() + self.find_sensor_blocks() if self.trace > 0: self.update_values = True else: @@ -510,7 +522,7 @@ class LogoCode: code = self._blocks_to_code(blk) if run_flag: - print "running code: %s" % (code) + _logger.debug("running code: %s" % (code)) self._setup_cmd(code) if not self.tw.hide: self.tw.display_coordinates() @@ -727,7 +739,7 @@ class LogoCode: self.arglist.append(self.iresult) if self.cfun.rprim: if type(self.cfun.fcn) == self.listtype: - print "evalsym rprim list: ", token + _logger.debug("evalsym rprim list: %s" % (str(token))) self._icall(self._ufuncall, self.cfun.fcn) yield True else: @@ -1054,6 +1066,20 @@ class LogoCode: self.value_blocks[name] = self.tw.block_list.get_similar_blocks( 'block', name) + def find_sensor_blocks(self): + """ Find any audio/data sensor blocks """ + for name in ['volume', 'pitch', 'resistance', 'voltage']: + if len(self.tw.block_list.get_similar_blocks('block', name)): + if name in ['volume', 'pitch']: + self.tw.audiograb.set_sensor_type() + return + elif name == 'resistance': + self.tw.audiograb.set_sensor_type(SENSOR_DC_BIAS) + return + elif name == 'voltage': + self.tw.audiograb.set_sensor_type(SENSOR_DC_NO_BIAS) + return + def update_label_value(self, name, value=None): """ Update the label of value blocks to reflect current value """ if self.tw.hide or not self.tw.interactive_mode or \ @@ -1080,15 +1106,15 @@ class LogoCode: self.update_label_value(name, value) def _prim_right(self, value): - self.tw.canvas.right(value) + self.tw.canvas.right(float(value)) self.update_label_value('heading', self.tw.canvas.heading) def _prim_move(self, cmd, value1, value2=None, pendown=True): if value2 is None: cmd(value1) else: - print cmd, value1, value2, pendown - cmd(value1, value2, pendown=pendown) + # print cmd, value1, value2, pendown + cmd(float(value1), float(value2), pendown=pendown) self.update_label_value('xcor', self.tw.canvas.xcor / self.tw.coord_scale) self.update_label_value('ycor', @@ -1097,7 +1123,7 @@ class LogoCode: self._see() def _prim_arc(self, cmd, value1, value2): - cmd(value1, value2) + cmd(float(value1), float(value2)) self.update_label_value('xcor', self.tw.canvas.xcor / self.tw.coord_scale) self.update_label_value('ycor', @@ -1236,7 +1262,7 @@ class LogoCode: dsobject = datastore.get(audio[6:]) play_audio(self, dsobject.file_path) except: - print "Couldn't open id: " + str(audio[6:]) + _logger.debug("Couldn't open id: %s" % (str(audio[6:]))) else: play_audio(self, audio[6:]) @@ -1310,14 +1336,14 @@ class LogoCode: text = str(dsobject.metadata['description']) dsobject.destroy() except: - print "no description in %s" % (media[6:]) + _logger.debug("no description in %s" % (media[6:])) else: try: f = open(media[6:], 'r') text = f.read() f.close() except: - print "no text in %s?" % (media[6:]) + _logger.debug("no text in %s?" % (media[6:])) if text is not None: self.tw.canvas.draw_text(text, int(x), int(y), self.body_height, int(w)) @@ -1337,6 +1363,39 @@ class LogoCode: self.heap.append(g) self.heap.append(r) + def _get_volume(self): + """ return mic in value """ + buf = self.ringbuffer.read(None, self.input_step) + if len(buf) > 0: + return float(buf[0]) / 164 # scale from -100 to 100 + else: + return 0 + + def _get_pitch(self): + """ return frequence of mic in value """ + # TODO: Calculate FFT + buf = self.ringbuffer.read(None, self.input_step) + if len(buf) > 0: + return float(buf[0]) / 164 # scale from -100 to 100 + else: + return 0 + + def _get_resistance(self): + """ return resistance sensor value """ + buf = self.ringbuffer.read(None, self.input_step) + if len(buf) > 0: + return float(buf[0]) / 164 # scale from -100 to 100 + else: + return 0 + + def _get_voltage(self): + """ return voltage sensor value """ + buf = self.ringbuffer.read(None, self.input_step) + if len(buf) > 0: + return float(buf[0]) / 164 # scale from -100 to 100 + else: + return 0 + # Depreciated block methods def _show_template1x1(self, title, media): diff --git a/TurtleArt/tautils.py b/TurtleArt/tautils.py index 025c4f4..6d530af 100644 --- a/TurtleArt/tautils.py +++ b/TurtleArt/tautils.py @@ -22,6 +22,8 @@ import gtk import pickle import subprocess +import dbus + try: OLD_SUGAR_SYSTEM = False import json @@ -35,9 +37,10 @@ except (ImportError, AttributeError): from simplejson import dump as jdump except: OLD_SUGAR_SYSTEM = True + from taconstants import STRING_OR_NUMBER_ARGS, HIDE_LAYER, CONTENT_ARGS, \ COLLAPSIBLE, BLOCK_LAYER, CONTENT_BLOCKS, HIT_HIDE, \ - HIT_SHOW + HIT_SHOW, XO1, XO15, UNKNOWN from StringIO import StringIO import os.path from gettext import gettext as _ @@ -785,10 +788,27 @@ def find_blk_below(blk, name): return None -def olpc_xo_1(): - """ Is the an OLPC XO-1 or XO-1.5? """ - return os.path.exists('/etc/olpc-release') or \ - os.path.exists('/sys/power/olpc-pm') +def get_hardware(): + """ Determine whether we are using XO 1.0, 1.5, or "unknown" hardware """ + bus = dbus.SystemBus() + + comp_obj = bus.get_object('org.freedesktop.Hal', + '/org/freedesktop/Hal/devices/computer') + dev = dbus.Interface(comp_obj, 'org.freedesktop.Hal.Device') + if dev.PropertyExists('system.hardware.vendor') and \ + dev.PropertyExists('system.hardware.version'): + if dev.GetProperty('system.hardware.vendor') == 'OLPC': + if dev.GetProperty('system.hardware.version') == '1.5': + return XO15 + else: + return XO1 + else: + return UNKNOWN + elif path.exists('/etc/olpc-release') or \ + path.exists('/sys/power/olpc-pm'): + return XO1 + else: + return UNKNOWN def walk_stack(tw, blk): diff --git a/TurtleArt/tawindow.py b/TurtleArt/tawindow.py index b550a50..5b3098c 100644 --- a/TurtleArt/tawindow.py +++ b/TurtleArt/tawindow.py @@ -54,7 +54,7 @@ from taconstants import HORIZONTAL_PALETTE, VERTICAL_PALETTE, BLOCK_SCALE, \ TURTLE_LAYER, EXPANDABLE_BLOCKS, COMPARE_STYLE, \ BOOLEAN_STYLE, EXPANDABLE_ARGS, NUMBER_STYLE, \ NUMBER_STYLE_PORCH, NUMBER_STYLE_BLOCK, \ - NUMBER_STYLE_VAR_ARG, CONSTANTS + NUMBER_STYLE_VAR_ARG, CONSTANTS, XO1, XO15, UNKNOWN from talogo import LogoCode, stop_logo from tacanvas import TurtleGraphics from tablock import Blocks, Block @@ -67,10 +67,13 @@ from tautils import magnitude, get_load_name, get_save_name, data_from_file, \ find_sandwich_bottom, restore_stack, collapse_stack, \ collapsed, collapsible, hide_button_hit, show_button_hit, \ arithmetic_check, xy, find_block_to_run, find_top_block, \ - find_start_stack, find_group, find_blk_below, olpc_xo_1, \ - dock_dx_dy, data_to_string, journal_check, chooser + find_start_stack, find_group, find_blk_below, \ + dock_dx_dy, data_to_string, journal_check, chooser, \ + get_hardware from tasprite_factory import SVG, svg_str_to_pixbuf, svg_from_file from sprites import Sprites, Sprite +from audiograb import AudioGrab_Unknown, AudioGrab_XO1, AudioGrab_XO15 + import logging _logger = logging.getLogger('turtleart-activity') @@ -133,7 +136,10 @@ class TurtleArtWindow(): self.decimal_point = '.' self.orientation = HORIZONTAL_PALETTE - if olpc_xo_1(): + + self.hw = get_hardware() + _logger.debug('running on %s hardware' % (self.hw)) + if self.hw in (XO1, XO15): self.lead = 1.0 self.scale = 0.67 self.color_mode = '565' @@ -228,11 +234,28 @@ class TurtleArtWindow(): self.lc = LogoCode(self) self.saved_pictures = [] + self.block_operation = '' + if self.interactive_mode: self._setup_misc() self._show_toolbar_palette(0, False) - self.block_operation = '' + # setup sound/sensor grab + if self.hw in [XO1, XO15]: + PALETTES[PALETTE_NAMES.index('sensor')].append('resistance') + PALETTES[PALETTE_NAMES.index('sensor')].append('voltage') + if self.hw == XO15: + self.audiograb = AudioGrab_XO15(self.new_buffer, self) + elif self.hw == XO1: + self.audiograb = AudioGrab_XO1(self.new_buffer, self) + else: + self.audiograb = AudioGrab_Unknown(self.new_buffer, self) + self.audio_started = False + + def new_buffer(self, buf): + """ Append a new buffer to the ringbuffer """ + self.lc.ringbuffer.append(buf) + return True def _setup_events(self): """ Register the events we listen to. """ @@ -309,11 +332,26 @@ class TurtleArtWindow(): self.lc.prim_clear() self.display_coordinates() + def _start_audiograb(self): + """ Start grabbing audio if there is an audio block in use """ + if len(self.block_list.get_similar_blocks('block', 'volume')) > 0 or \ + len(self.block_list.get_similar_blocks('block', 'pitch')) > 0 or \ + len(self.block_list.get_similar_blocks('block', 'resistance')) > 0 or \ + len(self.block_list.get_similar_blocks('block', 'voltage')) > 0: + if self.audio_started: + self.audiograb.resume_grabbing() + else: + self.audiograb.start_grabbing() + self.audio_started = True + def run_button(self, time): """ Run turtle! """ if self.running_sugar: self.activity.recenter() + if self.interactive_mode: + self._start_audiograb() + # Look for a 'start' block for blk in self.just_blocks(): if find_start_stack(blk): @@ -333,6 +371,8 @@ class TurtleArtWindow(): def stop_button(self): """ Stop button """ stop_logo(self) + if self.audio_started: + self.audiograb.pause_grabbing() def set_userdefined(self): """ Change icon for user-defined blocks after loading Python code. """ @@ -1471,6 +1511,7 @@ class TurtleArtWindow(): dy = 20 blk.expand_in_y(dy) else: + self._start_audiograb() self._run_stack(blk) return @@ -1533,6 +1574,7 @@ class TurtleArtWindow(): elif blk.name in PYTHON_SKIN and self.myblock is None: self._import_py() else: + self._start_audiograb() self._run_stack(blk) elif blk.name in COLLAPSIBLE: top = find_sandwich_top(blk) @@ -1541,6 +1583,7 @@ class TurtleArtWindow(): elif top is not None: collapse_stack(top) else: + self._start_audiograb() self._run_stack(blk) def _expand_boolean(self, blk, blk2, dy): @@ -1826,6 +1869,8 @@ class TurtleArtWindow(): if keyname == "p": self.hideshow_button() elif keyname == 'q': + if self.audio_started: + self.audiograb.stop_grabbing() exit() elif self.selected_blk is not None: @@ -2363,8 +2408,8 @@ class TurtleArtWindow(): def display_coordinates(self): """ Display the coordinates of the current turtle on the toolbar """ - x = round_int(self.canvas.xcor / self.coord_scale) - y = round_int(self.canvas.ycor / self.coord_scale) + x = round_int(float(self.canvas.xcor) / self.coord_scale) + y = round_int(float(self.canvas.ycor) / self.coord_scale) h = round_int(self.canvas.heading) if self.running_sugar: self.activity.coordinates_label.set_text("%s: %d %s: %d %s: %d" % \ -- cgit v0.9.1