Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/plugins/audio_sensors
diff options
context:
space:
mode:
authorWalter Bender <walter@sugarlabs.org>2013-05-25 22:12:48 (GMT)
committer Walter Bender <walter@sugarlabs.org>2013-05-25 22:12:48 (GMT)
commit29851fcbe3b4d392efd5a873f92666da27ec0e99 (patch)
tree27ce347b8465c907836a116d8752ddb7fab48c69 /plugins/audio_sensors
parented303715f77f79e697e33b3d0df082843ffc1492 (diff)
patches to create confusion
Diffstat (limited to 'plugins/audio_sensors')
-rw-r--r--plugins/audio_sensors/audio_sensors.py468
-rw-r--r--plugins/audio_sensors/audiograb.py673
-rw-r--r--plugins/audio_sensors/icons/sensoroff.svg79
-rw-r--r--plugins/audio_sensors/icons/sensoron.svg63
-rw-r--r--plugins/audio_sensors/ringbuffer.py108
5 files changed, 0 insertions, 1391 deletions
diff --git a/plugins/audio_sensors/audio_sensors.py b/plugins/audio_sensors/audio_sensors.py
deleted file mode 100644
index 8d45395..0000000
--- a/plugins/audio_sensors/audio_sensors.py
+++ /dev/null
@@ -1,468 +0,0 @@
-#!/usr/bin/env python
-#Copyright (c) 2011, 2012 Walter Bender
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-
-from gettext import gettext as _
-
-try:
- from numpy import append
- from numpy.fft import rfft
- PITCH_AVAILABLE = True
-except:
- PITCH_AVAILABLE = False
-
-from plugins.plugin import Plugin
-
-from plugins.audio_sensors.audiograb import (AudioGrab,
- SENSOR_DC_NO_BIAS, SENSOR_DC_BIAS, SENSOR_AC_BIAS)
-
-from plugins.audio_sensors.ringbuffer import RingBuffer1d
-
-from TurtleArt.tapalette import make_palette
-from TurtleArt.taconstants import XO1, XO15, XO175, XO30, XO4
-from TurtleArt.talogo import primitive_dictionary
-from TurtleArt.tautils import debug_output
-
-import logging
-_logger = logging.getLogger('turtleart-activity audio sensors plugin')
-
-
-def _avg(array, abs_value=False):
- ''' Calc. the average value of an array '''
- if len(array) == 0:
- return 0
- array_sum = 0
- if abs_value:
- for a in array:
- array_sum += abs(a)
- else:
- for a in array:
- array_sum += a
- return float(array_sum) / len(array)
-
-
-class Audio_sensors(Plugin):
-
- def __init__(self, parent):
- self._parent = parent
- self._status = True # TODO: test for audio device
- # These flags are referenced by audiograb
- self.hw = self._parent.hw
- self.running_sugar = self._parent.running_sugar
-
- def setup(self):
- ''' set up audio-sensor-specific blocks '''
- self.max_samples = 1500
- self.input_step = 1
-
- self.ringbuffer = []
-
- palette = make_palette('sensor',
- colors=["#FF6060", "#A06060"],
- help_string=_('Palette of sensor blocks'),
- position=6)
-
- primitive_dictionary['sound'] = self.prim_sound
- primitive_dictionary['volume'] = self.prim_volume
- if self._status:
- palette.add_block('sound',
- style='box-style',
- label=_('sound'),
- help_string=_('raw microphone input signal'),
- value_block=True,
- prim_name='sound')
-
- palette.add_block('volume',
- style='box-style',
- label=_('loudness'),
- help_string=_('microphone input volume'),
- value_block=True,
- prim_name='volume')
- else:
- palette.add_block('sound',
- hidden=True,
- style='box-style',
- label=_('sound'),
- help_string=_('raw microphone input signal'),
- value_block=True,
- prim_name='sound')
- palette.add_block('volume',
- hidden=True,
- style='box-style',
- label=_('loudness'),
- help_string=_('microphone input volume'),
- value_block=True,
- prim_name='volume')
-
- self._parent.lc.def_prim(
- 'sound', 0, lambda self: primitive_dictionary['sound'](0))
- self._parent.lc.def_prim(
- 'volume', 0, lambda self: primitive_dictionary['volume'](0))
-
- primitive_dictionary['pitch'] = self.prim_pitch
- if PITCH_AVAILABLE and self._status:
- palette.add_block('pitch',
- style='box-style',
- label=_('pitch'),
- help_string=_('microphone input pitch'),
- value_block=True,
- prim_name='pitch')
- else:
- palette.add_block('pitch',
- hidden=True,
- style='box-style',
- label=_('pitch'),
- help_string=_('microphone input pitch'),
- value_block=True,
- prim_name='pitch')
- self._parent.lc.def_prim('pitch', 0,
- lambda self: primitive_dictionary['pitch'](0))
-
- primitive_dictionary['resistance'] = self.prim_resistance
- primitive_dictionary['voltage'] = self.prim_voltage
- if self.hw in [XO1, XO15, XO175, XO4, XO30] and self._status:
- if self.hw == XO1:
- self.voltage_gain = 0.000022
- self.voltage_bias = 1.14
- elif self.hw == XO15:
- self.voltage_gain = -0.00015
- self.voltage_bias = 1.70
- elif self.hw in [XO175, XO4]: # recalibrate in light of #3675?
- self.voltage_gain = 0.000071
- self.voltage_bias = 0.55
- else: # XO 3.0
- self.voltage_gain = 0.000077
- self.voltage_bias = 0.72
- palette.add_block('resistance',
- style='box-style',
- label=_('resistance'),
- help_string=_('microphone input resistance'),
- value_block=True,
- prim_name='resistance')
- palette.add_block('voltage',
- style='box-style',
- label=_('voltage'),
- help_string=_('microphone input voltage'),
- value_block=True,
- prim_name='voltage')
- else:
- palette.add_block('resistance',
- hidden=True,
- style='box-style',
- label=_('resistance'),
- help_string=_('microphone input resistance'),
- prim_name='resistance')
- palette.add_block('voltage',
- hidden=True,
- style='box-style',
- label=_('voltage'),
- help_string=_('microphone input voltage'),
- prim_name='voltage')
-
- # FIXME: Only add stereo capture for XO15 (broken on ARM #3675)
- if self.hw in [XO15] and self._status:
- palette.add_block('resistance2',
- style='box-style',
- label=_('resistance') + '2',
- help_string=_('microphone input resistance'),
- value_block=True,
- prim_name='resistance2')
- palette.add_block('voltage2',
- style='box-style',
- label=_('voltage') + '2',
- help_string=_('microphone input voltage'),
- value_block=True,
- prim_name='voltage2')
- else:
- palette.add_block('resistance2',
- hidden=True,
- style='box-style',
- label=_('resistance') + '2',
- help_string=_('microphone input resistance'),
- prim_name='resistance2')
- palette.add_block('voltage2',
- hidden=True,
- style='box-style',
- label=_('voltage') + '2',
- help_string=_('microphone input voltage'),
- prim_name='voltage2')
- self._parent.lc.def_prim(
- 'resistance', 0,
- lambda self: primitive_dictionary['resistance'](0))
- self._parent.lc.def_prim(
- 'voltage', 0, lambda self: primitive_dictionary['voltage'](0))
- self._parent.lc.def_prim(
- 'resistance2', 0,
- lambda self: primitive_dictionary['resistance'](1))
- self._parent.lc.def_prim(
- 'voltage2', 0, lambda self: primitive_dictionary['voltage'](1))
-
- self.audio_started = False
- if self.hw in [XO175, XO30, XO4]:
- self.PARAMETERS = {
- SENSOR_AC_BIAS: (False, True, 80, True),
- SENSOR_DC_NO_BIAS: (True, False, 80, False),
- SENSOR_DC_BIAS: (True, True, 90, False)
- }
- elif self.hw == XO15:
- self.PARAMETERS = {
- SENSOR_AC_BIAS: (False, True, 80, True),
- SENSOR_DC_NO_BIAS: (True, False, 80, False),
- SENSOR_DC_BIAS: (True, True, 90, False)
- }
- elif self.hw == XO1:
- self.PARAMETERS = {
- SENSOR_AC_BIAS: (False, True, 40, True),
- SENSOR_DC_NO_BIAS: (True, False, 0, False),
- SENSOR_DC_BIAS: (True, True, 0, False)
- }
- else:
- self.PARAMETERS = {
- SENSOR_AC_BIAS: (None, True, 40, True),
- SENSOR_DC_NO_BIAS: (True, False, 80, False),
- SENSOR_DC_BIAS: (True, True, 90, False)
- }
-
- def start(self):
- ''' Start grabbing audio if there is an audio block in use '''
- if not self._status:
- return
- if self.audio_started:
- self.audiograb.stop_grabbing()
- if len(self._parent.block_list.get_similar_blocks(
- 'block', ['volume', 'sound', 'pitch'])) > 0:
- mode, bias, gain, boost = self.PARAMETERS[SENSOR_AC_BIAS]
- elif len(self._parent.block_list.get_similar_blocks(
- 'block', ['resistance', 'resistance2'])) > 0:
- mode, bias, gain, boost = self.PARAMETERS[SENSOR_DC_BIAS]
- elif len(self._parent.block_list.get_similar_blocks(
- 'block', ['voltage', 'voltage2'])) > 0:
- mode, bias, gain, boost = self.PARAMETERS[SENSOR_DC_NO_BIAS]
- else:
- return # No audio blocks in use.
- self.audiograb = AudioGrab(self.new_buffer, self,
- mode, bias, gain, boost)
- self._channels = self.audiograb.channels
- for i in range(self._channels):
- self.ringbuffer.append(RingBuffer1d(self.max_samples,
- dtype='int16'))
- self.audiograb.start_grabbing()
- self.audio_started = True
-
- def new_buffer(self, buf, channel=0):
- ''' Append a new buffer to the ringbuffer '''
- self.ringbuffer[channel].append(buf)
- return True
-
- def stop(self):
- ''' This gets called by the stop button '''
- if self._status and self.audio_started:
- self.audiograb.on_activity_quit() # reset all setting
- self.audio_started = False
-
- def goto_background(self):
- ''' This gets called when your process is sent to the background '''
- pass
-
- def return_to_foreground(self):
- ''' This gets called when your process returns from the background '''
- pass
-
- def quit(self):
- ''' This gets called by the quit button '''
- if self._status and self.audio_started:
- self.audiograb.on_activity_quit()
-
- def _status_report(self):
- debug_output(
- 'Reporting audio sensor status: %s' % (str(self._status)),
- self._parent.running_sugar)
- return self._status
-
- # Block primitives used in talogo
-
- def prim_volume(self, channel):
- if not self._status:
- return 0
- # Return average of both channels if sampling in stereo
- if self._channels == 2:
- chan0 = self._prim_volume(0)
- chan1 = self._prim_volume(1)
- return (chan0 + chan1) / 2
- else:
- return self._prim_volume(0)
-
- def _prim_volume(self, channel):
- ''' return mic in value '''
- buf = self.ringbuffer[channel].read(None, self.input_step)
- if len(buf) > 0:
- volume = float(_avg(buf, abs_value=True))
- self._parent.lc.update_label_value('volume', volume)
- return volume
- else:
- return 0
-
- def prim_sound(self, channel):
- if not self._status:
- return 0
- # Return average of both channels if sampling in stereo
- if self._channels == 2:
- chan0 = self._prim_sound(0)
- chan1 = self._prim_sound(1)
- return (chan0 + chan1) / 2
- else:
- return self._prim_sound(0)
-
- def _prim_sound(self, channel):
- ''' return raw mic in value '''
- buf = self.ringbuffer[channel].read(None, self.input_step)
- if len(buf) > 0:
- sound = float(buf[0])
- if self._parent.lc.update_values:
- self._parent.lc.update_label_value('sound', sound)
- return sound
- else:
- return 0
-
- def prim_pitch(self, channel):
- if not PITCH_AVAILABLE or not self._status:
- return 0
- # Return average of both channels if sampling in stereo
- if self._channels == 2:
- chan0 = self._prim_pitch(0)
- chan1 = self._prim_pitch(1)
- return (chan0 + chan1) / 2
- else:
- return self._prim_pitch(0)
-
- def _prim_pitch(self, channel):
- ''' return index of max value in fft of mic in values '''
- buf = self.ringbuffer[channel].read(None, self.input_step)
- if len(buf) > 0:
- buf = rfft(buf)
- buf = abs(buf)
- maxi = buf.argmax()
- if maxi == 0:
- pitch = 0
- else: # Simple interpolation
- a, b, c = buf[maxi - 1], buf[maxi], buf[maxi + 1]
- maxi -= a / float(a + b + c)
- maxi += c / float(a + b + c)
- pitch = maxi * 48000 / (len(buf) * 2)
-
- if self._parent.lc.update_values:
- self._parent.lc.update_label_value('pitch', pitch)
- return pitch
- else:
- return 0
-
- def prim_resistance(self, channel):
- if not self.hw in [XO1, XO15, XO175, XO30, XO4] or not self._status:
- return 0
- if self.hw in [XO1, XO4]:
- resistance = self._prim_resistance(0)
- if self._parent.lc.update_values:
- self._update_resistance_labels(0, resistance)
- return resistance
- elif self.hw == XO15:
- resistance = self._prim_resistance(channel)
- if self._parent.lc.update_values:
- self._update_resistance_labels(channel, resistance)
- return resistance
- # FIXME: For XO175: channel assignment is seemingly random
- # (#3675), so sum both channels (one of them will be 0)
- else:
- chan0 = self._prim_resistance(0)
- chan1 = self._prim_resistance(1)
- resistance = chan0 + chan1
- if self._parent.lc.update_values:
- self._update_resistance_labels(0, resistance)
- return resistance
-
- def _prim_resistance(self, channel):
- ''' return resistance sensor value '''
- buf = self.ringbuffer[channel].read(None, self.input_step)
- if len(buf) > 0:
- # See <http://bugs.sugarlabs.org/ticket/552#comment:7>
- # TODO: test this calibration on XO 1.5, XO 1.75
- avg_buf = float(_avg(buf))
- if self.hw == XO1:
- resistance = 2.718 ** ((avg_buf * 0.000045788) + 8.0531)
- elif self.hw == XO15:
- if avg_buf > 0:
- resistance = (420000000 / avg_buf) - 13500
- else:
- resistance = 420000000
- elif self.hw in [XO175, XO4]:
- if avg_buf < 30700:
- resistance = .12 * ((180000000 / (30700 - avg_buf)) - 3150)
- else:
- resistance = 999999999
- else: # XO 3.0
- if avg_buf < 30514:
- resistance = (46000000 / (30514 - avg_buf)) - 1150
- else:
- resistance = 999999999
- if resistance < 0:
- resistance = 0
- return resistance
- else:
- return 0
-
- def _update_resistance_labels(self, channel, resistance):
- if channel == 0:
- self._parent.lc.update_label_value('resistance', resistance)
- else:
- self._parent.lc.update_label_value('resistance2', resistance)
-
- def prim_voltage(self, channel):
- if not self.hw in [XO1, XO15, XO175, XO30, XO4] or not self._status:
- return 0
- if self.hw in [XO1, XO4]:
- voltage = self._prim_voltage(0)
- if self._parent.lc.update_values:
- self._update_voltage_labels(0, voltage)
- return voltage
- elif self.hw == XO15:
- voltage = self._prim_voltage(channel)
- if self._parent.lc.update_values:
- self._update_voltage_labels(channel, voltage)
- return voltage
- # FIXME: For XO175: channel assignment is seemingly random
- # (#3675), so sum both channels (one of them will be 0)
- else:
- chan0 = self._prim_voltage(0)
- chan1 = self._prim_voltage(1)
- voltage = chan0 + chan1
- if self._parent.lc.update_values:
- self._update_voltage_labels(0, voltage)
- return voltage
-
- def _prim_voltage(self, channel):
- ''' return voltage sensor value '''
- buf = self.ringbuffer[channel].read(None, self.input_step)
- if len(buf) > 0:
- # See <http://bugs.sugarlabs.org/ticket/552#comment:7>
- voltage = float(_avg(buf)) * self.voltage_gain + self.voltage_bias
- return voltage
- else:
- return 0
-
- def _update_voltage_labels(self, channel, voltage):
- if channel == 0:
- self._parent.lc.update_label_value('voltage', voltage)
- else:
- self._parent.lc.update_label_value('voltage2', voltage)
diff --git a/plugins/audio_sensors/audiograb.py b/plugins/audio_sensors/audiograb.py
deleted file mode 100644
index 228e4c2..0000000
--- a/plugins/audio_sensors/audiograb.py
+++ /dev/null
@@ -1,673 +0,0 @@
-#! /usr/bin/python
-#
-# Author: Arjun Sarwal arjun@laptop.org
-# Copyright (C) 2007, Arjun Sarwal
-# Copyright (C) 2009-12 Walter Bender
-# Copyright (C) 2009, Benjamin Berg, Sebastian Berg
-# Copyright (C) 2009, Sayamindu Dasgupta
-# Copyright (C) 2010, Sascha Silbe
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# You should have received a copy of the GNU General Public License
-# along with this library; if not, write to the Free Software
-# Foundation, 51 Franklin Street, Suite 500 Boston, MA 02110-1335 USA
-
-import pygst
-import gst
-import gst.interfaces
-from numpy import fromstring
-import subprocess
-import traceback
-from string import find
-from threading import Timer
-
-from TurtleArt.taconstants import XO1, XO4
-from TurtleArt.tautils import debug_output
-
-# Initial device settings
-RATE = 48000
-MIC_BOOST = True
-DC_MODE_ENABLE = False
-CAPTURE_GAIN = 50
-BIAS = True
-
-# Setting on quit
-QUIT_MIC_BOOST = False
-QUIT_DC_MODE_ENABLE = False
-QUIT_CAPTURE_GAIN = 100
-QUIT_BIAS = True
-
-# Capture modes
-SENSOR_AC_NO_BIAS = 'external'
-SENSOR_AC_BIAS = 'sound'
-SENSOR_DC_NO_BIAS = 'voltage'
-SENSOR_DC_BIAS = 'resistance'
-
-
-class AudioGrab():
- """ The interface between measure and the audio device """
-
- def __init__(self, callable1, parent,
- mode=None, bias=None, gain=None, boost=None):
- """ Initialize the class: callable1 is a data buffer;
- parent is the parent class"""
-
- self.callable1 = callable1
- self.parent = parent
- self.sensor = None
-
- self.temp_buffer = [0]
-
- self.rate = RATE
- # Force XO1 and XO4 to use just 1 channel
- if self.parent.hw in [XO1, XO4]:
- self.channels = 1
- else:
- self.channels = None
-
- self._dc_control = None
- self._mic_bias_control = None
- self._capture_control = None
- self._mic_boost_control = None
- self._labels_available = True # Query controls for device names
-
- self._query_mixer()
- # If Channels was not found in the Capture controller, guess.
- if self.channels is None:
- debug_output('Guessing there are 2 channels',
- self.parent.running_sugar)
- self.channels = 2
-
- # Set mixer to known state
- self.set_dc_mode(DC_MODE_ENABLE)
- self.set_bias(BIAS)
- self.set_capture_gain(CAPTURE_GAIN)
- self.set_mic_boost(MIC_BOOST)
-
- self.master = self.get_master()
- self.dc_mode = self.get_dc_mode()
- self.bias = self.get_bias()
- self.capture_gain = self.get_capture_gain()
- self.mic_boost = self.get_mic_boost()
-
- # Set mixer to desired state
- self._set_sensor_type(mode, bias, gain, boost)
- self.dc_mode = self.get_dc_mode()
- self.bias = self.get_bias()
- self.capture_gain = self.get_capture_gain()
- self.mic_boost = self.get_mic_boost()
-
- # Set up gstreamer pipeline
- self._pad_count = 0
- self.pads = []
- self.queue = []
- self.fakesink = []
- self.pipeline = gst.Pipeline('pipeline')
- self.alsasrc = gst.element_factory_make('alsasrc', 'alsa-source')
- self.pipeline.add(self.alsasrc)
- self.caps1 = gst.element_factory_make('capsfilter', 'caps1')
- self.pipeline.add(self.caps1)
- caps_str = 'audio/x-raw-int,rate=%d,channels=%d,depth=16' % (
- RATE, self.channels)
- self.caps1.set_property('caps', gst.caps_from_string(caps_str))
- if self.channels == 1:
- self.fakesink.append(gst.element_factory_make('fakesink', 'fsink'))
- self.pipeline.add(self.fakesink[0])
- self.fakesink[0].connect('handoff', self.on_buffer, 0)
- self.fakesink[0].set_property('signal-handoffs', True)
- gst.element_link_many(self.alsasrc, self.caps1, self.fakesink[0])
- else:
- if not hasattr(self, 'splitter'):
- self.splitter = gst.element_factory_make('deinterleave')
- self.pipeline.add(self.splitter)
- self.splitter.set_properties('keep-positions=true', 'name=d')
- self.splitter.connect('pad-added', self._splitter_pad_added)
- gst.element_link_many(self.alsasrc, self.caps1, self.splitter)
- for i in range(self.channels):
- self.queue.append(gst.element_factory_make('queue'))
- self.pipeline.add(self.queue[i])
- self.fakesink.append(gst.element_factory_make('fakesink'))
- self.pipeline.add(self.fakesink[i])
- self.fakesink[i].connect('handoff', self.on_buffer, i)
- self.fakesink[i].set_property('signal-handoffs', True)
-
- self.dont_queue_the_buffer = False
-
- # Timer for interval sampling and switch to indicate when to capture
- self.capture_timer = None
- self.capture_interval_sample = False
-
- def _query_mixer(self):
- self._mixer = gst.element_factory_make('alsamixer')
- rc = self._mixer.set_state(gst.STATE_PAUSED)
- assert rc == gst.STATE_CHANGE_SUCCESS
-
- # Query the available controls
- tracks_list = self._mixer.list_tracks()
- if hasattr(tracks_list[0].props, 'untranslated_label'):
- self._capture_control = self._find_control(['capture', 'axi'])
- self._dc_control = self._find_control(['dc mode'])
- self._mic_bias_control = self._find_control(['mic bias',
- 'dc input bias',
- 'v_refout'])
- self._mic_boost_control = self._find_control(['mic boost',
- 'mic1 boost',
- 'mic boost (+20db)',
- 'internal mic boost',
- 'analog mic boost'])
- self._mic_gain_control = self._find_control(['mic'])
- self._master_control = self._find_control(['master'])
- else: # Use hardwired values
- self._labels_available = False
-
- def _unlink_sink_queues(self):
- ''' Build the sink pipelines '''
-
- # If there were existing pipelines, unlink them
- for i in range(self._pad_count):
- try:
- self.splitter.unlink(self.queue[i])
- self.queue[i].unlink(self.fakesink[i])
- except:
- traceback.print_exc()
-
- # Build the new pipelines
- self._pad_count = 0
- self.pads = []
-
- def _splitter_pad_added(self, element, pad):
- ''' Seems to be the case that ring is right channel 0,
- tip is left channel 1'''
- '''
- debug_output('splitter pad %d added' % (self._pad_count),
- self.parent.running_sugar)
- '''
- self.pads.append(pad)
- if self._pad_count < self.channels:
- pad.link(self.queue[self._pad_count].get_pad('sink'))
- self.queue[self._pad_count].get_pad('src').link(
- self.fakesink[self._pad_count].get_pad('sink'))
- self._pad_count += 1
- else:
- debug_output('ignoring channels > %d' % (self.channels),
- self.parent.running_sugar)
-
- def set_handoff_signal(self, handoff_state):
- '''Sets whether the handoff signal would generate an interrupt
- or not'''
- for i in range(len(self.fakesink)):
- self.fakesink[i].set_property('signal-handoffs', handoff_state)
-
- def _new_buffer(self, buf, channel):
- ''' Use a new buffer '''
- if not self.dont_queue_the_buffer:
- self.temp_buffer = buf
- self.callable1(buf, channel=channel)
- else:
- pass
-
- def on_buffer(self, element, data_buffer, pad, channel):
- '''The function that is called whenever new data is available
- This is the signal handler for the handoff signal'''
- temp_buffer = fromstring(data_buffer, 'int16')
- if not self.dont_queue_the_buffer:
- self._new_buffer(temp_buffer, channel=channel)
- return False
-
- def start_sound_device(self):
- '''Start or Restart grabbing data from the audio capture'''
- gst.event_new_flush_start()
- self.pipeline.set_state(gst.STATE_PLAYING)
-
- def stop_sound_device(self):
- '''Stop grabbing data from capture device'''
- gst.event_new_flush_stop()
- self.pipeline.set_state(gst.STATE_NULL)
-
- def sample_now(self):
- ''' Log the current sample now. This method is called from the
- capture_timer object when the interval expires. '''
- self.capture_interval_sample = True
- self.make_timer()
-
- def set_buffer_interval_logging(self, interval=0):
- '''Sets the number of buffers after which a buffer needs to be
- emitted'''
- self.buffer_interval_logging = interval
-
- def set_sampling_rate(self, sr):
- '''Sets the sampling rate of the capture device
- Sampling rate must be given as an integer for example 16000 for
- setting 16Khz sampling rate
- The sampling rate would be set in the device to the nearest available'''
- self.pause_grabbing()
- caps_str = 'audio/x-raw-int,rate=%d,channels=%d,depth=16' % (
- sr, self.channels)
- self.caps1.set_property('caps', gst.caps_from_string(caps_str))
- self.resume_grabbing()
-
- def get_sampling_rate(self):
- '''Gets the sampling rate of the capture device'''
- return int(self.caps1.get_property('caps')[0]['rate'])
-
- def set_callable1(self, callable1):
- '''Sets the callable to the drawing function for giving the
- data at the end of idle-add'''
- self.callable1 = callable1
-
- def start_grabbing(self):
- '''Called right at the start of the Activity'''
- self.start_sound_device()
- self.set_handoff_signal(True)
-
- def pause_grabbing(self):
- '''When Activity goes into background'''
- self.save_state()
- self.stop_sound_device()
-
- def resume_grabbing(self):
- '''When Activity becomes active after going to background'''
- self.start_sound_device()
- self.resume_state()
- self.set_handoff_signal(True)
-
- def stop_grabbing(self):
- '''Not used ???'''
- self.stop_sound_device()
- self.set_handoff_signal(False)
-
- def _find_control(self, prefixes):
- '''Try to find a mixer control matching one of the prefixes.
-
- The control with the best match (smallest difference in length
- between label and prefix) will be returned. If no match is found,
- None is returned.
- '''
- def best_prefix(label, prefixes):
- matches =\
- [len(label) - len(p) for p in prefixes if label.startswith(p)]
- if not matches:
- return None
-
- matches.sort()
- return matches[0]
-
- controls = []
- for track in self._mixer.list_tracks():
- label = track.props.untranslated_label.lower()
- diff = best_prefix(label, prefixes)
- if diff is not None:
- controls.append((track, diff))
-
- controls.sort(key=lambda e: e[1])
- if controls:
- '''
- debug_output('Found control: %s' %\
- (str(controls[0][0].props.untranslated_label)),
- self.parent.running_sugar)
- '''
- if self.channels is None:
- if hasattr(controls[0][0], 'num_channels'):
- channels = controls[0][0].num_channels
- if channels > 0:
- self.channels = channels
- '''
- debug_output('setting channels to %d' % (self.channels),
- self.parent.running_sugar)
- '''
-
- return controls[0][0]
-
- return None
-
- def save_state(self):
- '''Saves the state of all audio controls'''
- self.master = self.get_master()
- self.bias = self.get_bias()
- self.dc_mode = self.get_dc_mode()
- self.capture_gain = self.get_capture_gain()
- self.mic_boost = self.get_mic_boost()
-
- def resume_state(self):
- '''Put back all audio control settings from the saved state'''
- self.set_master(self.master)
- self.set_bias(self.bias)
- self.set_dc_mode(self.dc_mode)
- self.set_capture_gain(self.capture_gain)
- self.set_mic_boost(self.mic_boost)
-
- def _get_mute(self, control, name, default):
- '''Get mute status of a control'''
- if not control:
- return default
- return bool(control.flags & gst.interfaces.MIXER_TRACK_MUTE)
-
- def _set_mute(self, control, name, value):
- '''Mute a control'''
- if not control:
- return
- self._mixer.set_mute(control, value)
-
- def _get_volume(self, control, name):
- '''Get volume of a control and convert to a scale of 0-100'''
- if not control:
- return 100
- volume = self._mixer.get_volume(control)
- if type(volume) == tuple:
- hw_volume = volume[0]
- else:
- hw_volume = volume
- min_vol = control.min_volume
- max_vol = control.max_volume
- if max_vol == min_vol:
- percent = 100
- else:
- percent = (hw_volume - min_vol) * 100 // (max_vol - min_vol)
- return percent
-
- def _set_volume(self, control, name, value):
- '''Sets the level of a control on a scale of 0-100'''
- if not control:
- return
- # convert value to scale of control
- min_vol = control.min_volume
- max_vol = control.max_volume
- if min_vol != max_vol:
- hw_volume = value * (max_vol - min_vol) // 100 + min_vol
- self._mixer.set_volume(control,
- (hw_volume,) * control.num_channels)
-
- def amixer_set(self, control, state):
- ''' Direct call to amixer for old systems. '''
- if state:
- output = check_output(
- ['amixer', 'set', "%s" % (control), 'unmute'],
- 'Problem with amixer set "%s" unmute' % (control),
- self.parent.running_sugar)
- else:
- output = check_output(
- ['amixer', 'set', "%s" % (control), 'mute'],
- 'Problem with amixer set "%s" mute' % (control),
- self.parent.running_sugar)
-
- def mute_master(self):
- '''Mutes the Master Control'''
- if self._labels_available and self.parent.hw != XO1:
- self._set_mute(self._master_control, 'Master', True)
- else:
- self.amixer_set('Master', False)
-
- def unmute_master(self):
- '''Unmutes the Master Control'''
- if self._labels_available and self.parent.hw != XO1:
- self._set_mute(self._master_control, 'Master', True)
- else:
- self.amixer_set('Master', True)
-
- def set_master(self, master_val):
- '''Sets the Master gain slider settings
- master_val must be given as an integer between 0 and 100 indicating the
- percentage of the slider to be set'''
- if self._labels_available:
- self._set_volume(self._master_control, 'Master', master_val)
- else:
- output = check_output(
- ['amixer', 'set', 'Master', "%d%s" % (master_val, '%')],
- 'Problem with amixer set Master',
- self.parent.running_sugar)
-
- def get_master(self):
- '''Gets the MIC gain slider settings. The value returned is an
- integer between 0-100 and is an indicative of the percentage 0 - 100%'''
- if self._labels_available:
- return self._get_volume(self._master_control, 'master')
- else:
- output = check_output(['amixer', 'get', 'Master'],
- 'amixer: Could not get Master volume',
- self.parent.running_sugar)
- if output is None:
- return 100
- else:
- output = output[find(output, 'Front Left:'):]
- output = output[find(output, '[') + 1:]
- output = output[:find(output, '%]')]
- return int(output)
-
- def set_bias(self, bias_state=False):
- '''Enables / disables bias voltage.'''
- if self._labels_available and self.parent.hw != XO1:
- if self._mic_bias_control is None:
- return
- # If there is a flag property, use set_mute
- if self._mic_bias_control not in self._mixer.list_tracks() or \
- hasattr(self._mic_bias_control.props, 'flags'):
- self._set_mute(
- self._mic_bias_control, 'Mic Bias', not bias_state)
- # We assume that values are sorted from lowest (=off) to highest.
- # Since they are mixed strings ('Off', '50%', etc.), we cannot
- # easily ensure this by sorting with the default sort order.
- elif bias_state: # Otherwise, set with volume
- self._mixer.set_volume(self._mic_bias_control,
- self._mic_bias_control.max_volume)
- else:
- self._mixer.set_volume(self._mic_bias_control,
- self._mic_bias_control.min_volume)
- elif not self._labels_available:
- self.amixer_set('V_REFOUT Enable', bias_state)
- else:
- self.amixer_set('MIC Bias Enable', bias_state)
-
- def get_bias(self):
- '''Check whether bias voltage is enabled.'''
- if self._labels_available:
- if self._mic_bias_control is None:
- return False
- if self._mic_bias_control not in self._mixer.list_tracks() or \
- hasattr(self._mic_bias_control.props, 'flags'):
- return not self._get_mute(
- self._mic_bias_control, 'Mic Bias', False)
- value = self._mixer.get_volume(self._mic_bias_control)
- if value == self._mic_bias_control.min_volume:
- return False
- return True
- else:
- output = check_output(['amixer', 'get', "V_REFOUT Enable"],
- 'amixer: Could not get mic bias voltage',
- self.parent.running_sugar)
- if output is None:
- return False
- else:
- output = output[find(output, 'Mono:'):]
- output = output[find(output, '[') + 1:]
- output = output[:find(output, ']')]
- if output == 'on':
- return True
- return False
-
- def set_dc_mode(self, dc_mode=False):
- '''Sets the DC Mode Enable control
- pass False to mute and True to unmute'''
- if self._labels_available and self.parent.hw != XO1:
- if self._dc_control is not None:
- self._set_mute(self._dc_control, 'DC mode', not dc_mode)
- else:
- self.amixer_set('DC Mode Enable', dc_mode)
-
- def get_dc_mode(self):
- '''Returns the setting of DC Mode Enable control
- i.e. True: Unmuted and False: Muted'''
- if self._labels_available and self.parent.hw != XO1:
- if self._dc_control is not None:
- return not self._get_mute(self._dc_control, 'DC mode', False)
- else:
- return False
- else:
- output = check_output(['amixer', 'get', "DC Mode Enable"],
- 'amixer: Could not get DC Mode',
- self.parent.running_sugar)
- if output is None:
- return False
- else:
- output = output[find(output, 'Mono:'):]
- output = output[find(output, '[') + 1:]
- output = output[:find(output, ']')]
- if output == 'on':
- return True
- return False
-
- def set_mic_boost(self, mic_boost=False):
- '''Set Mic Boost.
- True = +20dB, False = 0dB'''
- if self._labels_available:
- if self._mic_boost_control is None:
- return
- # If there is a volume, use set volume
- if hasattr(self._mic_boost_control, 'min_volume'):
- if mic_boost:
- self._set_volume(self._mic_boost_control, 'boost', 100)
- else:
- self._set_volume(self._mic_boost_control, 'boost', 0)
- # Else if there is a flag property, use set_mute
- elif self._mic_boost_control not in self._mixer.list_tracks() or \
- hasattr(self._mic_boost_control.props, 'flags'):
- self._set_mute(
- self._mic_boost_control, 'Mic Boost', not mic_boost)
- else:
- self.amixer_set('Mic Boost (+20dB)', mic_boost)
-
- def get_mic_boost(self):
- '''Return Mic Boost setting.
- True = +20dB, False = 0dB'''
- if self._labels_available:
- if self._mic_boost_control is None:
- return False
- if self._mic_boost_control not in self._mixer.list_tracks() or \
- hasattr(self._mic_boost_control.props, 'flags'):
- return not self._get_mute(
- self._mic_boost_control, 'Mic Boost', False)
- else: # Compare to min value
- value = self._mixer.get_volume(self._mic_boost_control)
- if value != self._mic_boost_control.min_volume:
- return True
- return False
- else:
- output = check_output(['amixer', 'get', "Mic Boost (+20dB)"],
- 'amixer: Could not get mic boost',
- self.parent.running_sugar)
- if output is None:
- return False
- else:
- output = output[find(output, 'Mono:'):]
- output = output[find(output, '[') + 1:]
- output = output[:find(output, ']')]
- if output == 'on':
- return True
- return False
-
- def set_capture_gain(self, capture_val):
- '''Sets the Capture gain slider settings
- capture_val must be given as an integer between 0 and 100 indicating the
- percentage of the slider to be set'''
- if self._labels_available and self.parent.hw != XO1:
- if self._capture_control is not None:
- self._set_volume(self._capture_control, 'Capture', capture_val)
- else:
- output = check_output(
- ['amixer', 'set', 'Capture', "%d%s" % (capture_val, '%')],
- 'Problem with amixer set Capture',
- self.parent.running_sugar)
-
- def get_capture_gain(self):
- '''Gets the Capture gain slider settings. The value returned is an
- integer between 0-100 and is an indicative of the percentage 0 - 100%'''
- if self._labels_available:
- if self._capture_control is not None:
- return self._get_volume(self._capture_control, 'Capture')
- else:
- return 0
- else:
- output = check_output(['amixer', 'get', 'Capture'],
- 'amixer: Could not get Capture level',
- self.parent.running_sugar)
- if output is None:
- return 100
- else:
- output = output[find(output, 'Front Left:'):]
- output = output[find(output, '[') + 1:]
- output = output[:find(output, '%]')]
- return int(output)
-
- def set_mic_gain(self, mic_val):
- '''Sets the MIC gain slider settings
- mic_val must be given as an integer between 0 and 100 indicating the
- percentage of the slider to be set'''
- if self._labels_available and self.parent.hw != XO1:
- self._set_volume(self._mic_gain_control, 'Mic', mic_val)
- else:
- output = check_output(
- ['amixer', 'set', 'Mic', "%d%s" % (mic_val, '%')],
- 'Problem with amixer set Mic',
- self.parent.running_sugar)
-
- def get_mic_gain(self):
- '''Gets the MIC gain slider settings. The value returned is an
- integer between 0-100 and is an indicative of the percentage 0 - 100%'''
- if self._labels_available and self.parent.hw != XO1:
- return self._get_volume(self._mic_gain_control, 'Mic')
- else:
- output = check_output(['amixer', 'get', 'Mic'],
- 'amixer: Could not get mic gain level',
- self.parent.running_sugar)
- if output is None:
- return 100
- else:
- output = output[find(output, 'Mono:'):]
- output = output[find(output, '[') + 1:]
- output = output[:find(output, '%]')]
- return int(output)
-
- def _set_sensor_type(self, mode=None, bias=None, gain=None, boost=None):
- '''Helper to modify (some) of the sensor settings.'''
- if mode is not None:
- self.set_dc_mode(mode)
- if bias is not None:
- self.set_bias(bias)
- if gain is not None:
- self.set_capture_gain(gain)
- if boost is not None:
- self.set_mic_boost(boost)
- self.save_state()
-
- def on_activity_quit(self):
- '''When Activity quits'''
- self.set_mic_boost(QUIT_MIC_BOOST)
- self.set_dc_mode(QUIT_DC_MODE_ENABLE)
- self.set_capture_gain(QUIT_CAPTURE_GAIN)
- self.set_bias(QUIT_BIAS)
- self.stop_sound_device()
-
-
-def check_output(command, warning, running_sugar=True):
- ''' Workaround for old systems without subprocess.check_output'''
- if hasattr(subprocess, 'check_output'):
- try:
- output = subprocess.check_output(command)
- except subprocess.CalledProcessError:
- debug_output(warning, running_sugar)
- return None
- else:
- import commands
-
- cmd = ''
- for c in command:
- cmd += c
- cmd += ' '
- (status, output) = commands.getstatusoutput(cmd)
- if status != 0:
- debug_output(warning, running_sugar)
- return None
- return output
diff --git a/plugins/audio_sensors/icons/sensoroff.svg b/plugins/audio_sensors/icons/sensoroff.svg
deleted file mode 100644
index 0a16670..0000000
--- a/plugins/audio_sensors/icons/sensoroff.svg
+++ /dev/null
@@ -1,79 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!-- Created with Inkscape (http://www.inkscape.org/) -->
-
-<svg
- xmlns:dc="http://purl.org/dc/elements/1.1/"
- xmlns:cc="http://creativecommons.org/ns#"
- xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
- xmlns:svg="http://www.w3.org/2000/svg"
- xmlns="http://www.w3.org/2000/svg"
- version="1.1"
- width="55"
- height="55"
- viewBox="0 0 55 55"
- id="svg2"
- xml:space="preserve"><metadata
- id="metadata15"><rdf:RDF><cc:Work
- rdf:about=""><dc:format>image/svg+xml</dc:format><dc:type
- rdf:resource="http://purl.org/dc/dcmitype/StillImage" /><dc:title></dc:title></cc:Work></rdf:RDF></metadata><defs
- id="defs13" />
-<rect
- width="42.763924"
- height="42.763924"
- x="6.1180382"
- y="6.1180382"
- id="rect2986"
- style="fill:#282828;fill-opacity:1;stroke:#282828;stroke-width:2.23607516;stroke-linecap:round;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none" /><g
- transform="matrix(0.87078705,0,0,0.87078705,3.2821055,2.9298726)"
- id="network-wired_1_"
- style="display:block">
- <g
- id="network-wired">
- <line
- id="line3076"
- y2="23.993"
- y1="32.438999"
- x2="16.966999"
- x1="16.966999"
- style="fill:none;stroke:#ffffff;stroke-width:3.5;stroke-linecap:round" />
- <line
- id="line3078"
- y2="28.215"
- y1="28.215"
- x2="34.938"
- x1="29.636999"
- style="fill:none;stroke:#ffffff;stroke-width:3.5;stroke-linecap:round" />
-
- <rect
- width="9.7939997"
- height="7.599"
- x="42.157001"
- y="24.312"
- id="rect3080"
- style="fill:#ffffff;stroke:#ffffff;stroke-width:2.25;stroke-linecap:round" />
- <path
- d="m 16.967,23.993 c 0,-2.334 -1.892,-4.224 -4.224,-4.224 -2.332,0 -4.223,1.889 -4.223,4.224"
- id="path3082"
- style="fill:none;stroke:#ffffff;stroke-width:3.5;stroke-linecap:round;stroke-linejoin:round" />
- <path
- d="m 25.413,32.439 c 0,2.334 -1.891,4.224 -4.224,4.224 -2.332,0 -4.223,-1.89 -4.223,-4.224"
- id="path3084"
- style="fill:none;stroke:#ffffff;stroke-width:3.5;stroke-linecap:round;stroke-linejoin:round" />
- <path
- d="m 25.413,32.439 c 0,-2.332 1.893,-4.226 4.224,-4.226"
- id="path3086"
- style="fill:none;stroke:#ffffff;stroke-width:3.5;stroke-linecap:round;stroke-linejoin:round" />
- <path
- d="m 8.52,23.993 c 0,2.332 -1.892,4.222 -4.223,4.222"
- id="path3088"
- style="fill:none;stroke:#ffffff;stroke-width:3.5;stroke-linecap:round;stroke-linejoin:round" />
-
- <rect
- width="14.477"
- height="11.35"
- x="31.945"
- y="22.438"
- id="rect3090"
- style="fill:#ffffff;stroke:#ffffff;stroke-width:3.5;stroke-linecap:round;stroke-linejoin:round" />
- </g>
-</g></svg> \ No newline at end of file
diff --git a/plugins/audio_sensors/icons/sensoron.svg b/plugins/audio_sensors/icons/sensoron.svg
deleted file mode 100644
index d756860..0000000
--- a/plugins/audio_sensors/icons/sensoron.svg
+++ /dev/null
@@ -1,63 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<!-- Created with Inkscape (http://www.inkscape.org/) -->
-
-<svg
- xmlns:dc="http://purl.org/dc/elements/1.1/"
- xmlns:cc="http://creativecommons.org/ns#"
- xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
- xmlns:svg="http://www.w3.org/2000/svg"
- xmlns="http://www.w3.org/2000/svg"
- version="1.1"
- width="55"
- height="55"
- viewBox="0 0 55 55"
- id="svg2"
- xml:space="preserve"><metadata
- id="metadata15"><rdf:RDF><cc:Work
- rdf:about=""><dc:format>image/svg+xml</dc:format><dc:type
- rdf:resource="http://purl.org/dc/dcmitype/StillImage" /><dc:title></dc:title></cc:Work></rdf:RDF></metadata><defs
- id="defs13" /><rect
- width="55"
- height="55"
- x="0"
- y="0"
- id="rect3269"
- style="fill:#ffd200;fill-opacity:1;fill-rule:nonzero;stroke:none" /><g
- transform="translate(0.27777716,18.796296)"
- id="g4054"><line
- style="fill:#000000;fill-opacity:1;stroke:#ff0000;stroke-width:3.04775476;stroke-linecap:round;stroke-opacity:1"
- x1="17.778971"
- x2="17.778971"
- y1="12.381037"
- y2="5.0263696"
- id="line3076" /><line
- style="fill:#ff0000;fill-opacity:1;stroke:#ff0000;stroke-width:3.04775476;stroke-linecap:round;stroke-opacity:1"
- x1="28.811842"
- x2="33.427887"
- y1="8.7028332"
- y2="8.7028332"
- id="line3078" /><rect
- width="8.5284882"
- height="6.6171107"
- x="39.7141"
- y="5.3041511"
- id="rect3080"
- style="fill:#ff0000;fill-opacity:1;stroke:#ff0000;stroke-width:1.95927083;stroke-linecap:round;stroke-opacity:1" /><path
- d="m 17.778971,5.0263697 c 0,-2.032417 -1.647529,-3.6782045 -3.678204,-3.6782045 -2.030675,0 -3.677334,1.6449167 -3.677334,3.6782045"
- id="path3082"
- style="fill:none;stroke:#ff0000;stroke-width:3.04775476;stroke-linecap:round;stroke-linejoin:round;stroke-opacity:1" /><path
- d="m 25.133639,12.381037 c 0,2.032417 -1.646658,3.678205 -3.678205,3.678205 -2.030675,0 -3.677333,-1.645788 -3.677333,-3.678205"
- id="path3084"
- style="fill:none;stroke:#ff0000;stroke-width:3.04775476;stroke-linecap:round;stroke-linejoin:round;stroke-opacity:1" /><path
- d="m 25.133639,12.381037 c 0,-2.030675 1.6484,-3.679946 3.678204,-3.679946"
- id="path3086"
- style="fill:#000000;fill-opacity:1;stroke:#ff0000;stroke-width:3.04775476;stroke-linecap:round;stroke-linejoin:round;stroke-opacity:1" /><path
- d="m 10.423433,5.0263697 c 0,2.0306754 -1.6475288,3.6764629 -3.6773334,3.6764629"
- id="path3088"
- style="fill:#000000;fill-opacity:1;stroke:#ff0000;stroke-width:3.04775476;stroke-linecap:round;stroke-linejoin:round;stroke-opacity:1" /><rect
- width="12.606384"
- height="9.8834333"
- x="30.821619"
- y="3.6722956"
- id="rect3090"
- style="fill:#ff0000;fill-opacity:1;stroke:#ff0000;stroke-width:3.04775476;stroke-linecap:round;stroke-linejoin:round;stroke-opacity:1" /></g></svg> \ No newline at end of file
diff --git a/plugins/audio_sensors/ringbuffer.py b/plugins/audio_sensors/ringbuffer.py
deleted file mode 100644
index 2afb5c9..0000000
--- a/plugins/audio_sensors/ringbuffer.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# Copyright (C) 2009, Benjamin Berg, Sebastian Berg
-# Copyright (C) 2010, Walter Bender
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-import numpy as np
-
-
-class RingBuffer1d(object):
- """This class implements an array being written in as a ring and that can
- be read from continuously ending with the newest data or starting with the
- oldest. It returns a numpy array copy of the data;
- """
-
- def __init__(self, length, dtype=None):
- """Initialize the 1 dimensional ring buffer with the given lengths.
- The initial values are all 0s
- """
- self.offset = 0
-
- self._data = np.zeros(length, dtype=dtype)
-
- self.stored = 0
-
- def fill(self, number):
- self._data.fill(number)
- self.offset = 0
-
- def append(self, data):
- """Append to the ring buffer (and overwrite old data). If len(data)
- is greater then the ring buffers length, the newest data takes
- precedence.
- """
- data = np.asarray(data)
-
- if len(self._data) == 0:
- return
-
- if len(data) >= len(self._data):
- self._data[:] = data[-len(self._data):]
- self.offset = 0
- self.stored = len(self._data)
-
- elif len(self._data) - self.offset >= len(data):
- self._data[self.offset: self.offset + len(data)] = data
- self.offset = self.offset + len(data)
- self.stored += len(data)
- else:
- self._data[self.offset:] = data[:len(self._data) - self.offset]
- self._data[:len(data) - (len(self._data) - self.offset)] = \
- data[-len(data) + (len(self._data) - self.offset):]
- self.offset = len(data) - (len(self._data) - self.offset)
- self.stored += len(data)
-
- if len(self._data) <= self.stored:
- self.read = self._read
-
- def read(self, number=None, step=1):
- """Read the ring Buffer. Number can be positive or negative.
- Positive values will give the latest information, negative values will
- give the newest added information from the buffer. (in normal order)
-
- Before the buffer is filled once: This returns just None
- """
- return np.array([])
-
- def _read(self, number=None, step=1):
- """Read the ring Buffer. Number can be positive or negative.
- Positive values will give the latest information, negative values will
- give the newest added information from the buffer. (in normal order)
- """
- if number == None:
- number = len(self._data) // step
-
- number *= step
- assert abs(number) <= len(self._data), \
- 'Number to read*step must be smaller then length'
-
- if number < 0:
- if abs(number) <= self.offset:
- return self._data[self.offset + number:self.offset:step]
-
- spam = (self.offset - 1) % step
-
- return np.concatenate(
- (self._data[step - spam - 1 + self.offset + number::step],
- self._data[spam:self.offset:step]))
-
- if number - (len(self._data) - self.offset) > 0:
- spam = ((self.offset + number) - self.offset - 1) % step
- return np.concatenate(
- (self._data[self.offset:self.offset + number:step],
- self._data[spam:number -
- (len(self._data) - self.offset):step]))
-
- return self._data[self.offset:self.offset + number:step].copy()