Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/TurtleArt
diff options
context:
space:
mode:
authorWalter Bender <walter.bender@gmail.com>2011-02-19 14:47:57 (GMT)
committer Walter Bender <walter.bender@gmail.com>2011-02-19 14:47:57 (GMT)
commitde95d30d41d92690212af05c2fbe15391ce1a05d (patch)
tree34e08d1aa6d014b5cff3929fbe08876c572c7b11 /TurtleArt
parentcfda7fc6d020da59f1a9d135fe6f127ff4135d66 (diff)
moving audio sensor-specific code to its own directrory
Diffstat (limited to 'TurtleArt')
-rw-r--r--TurtleArt/audiograb.py622
-rw-r--r--TurtleArt/ringbuffer.py108
2 files changed, 0 insertions, 730 deletions
diff --git a/TurtleArt/audiograb.py b/TurtleArt/audiograb.py
deleted file mode 100644
index 3ecdc11..0000000
--- a/TurtleArt/audiograb.py
+++ /dev/null
@@ -1,622 +0,0 @@
-#! /usr/bin/python
-#
-# Author: Arjun Sarwal arjun@laptop.org
-# Copyright (C) 2007, Arjun Sarwal
-# Copyright (C) 2009,10 Walter Bender
-# Copyright (C) 2009, Benjamin Berg, Sebastian Berg
-# Copyright (C) 2009, Sayamindu Dasgupta
-# Copyright (C) 2010, Sascha Silbe
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-import pygst
-import gst
-import gst.interfaces
-from numpy import fromstring
-import os
-import subprocess
-from string import find
-from threading import Timer
-
-# Initial device settings
-RATE = 48000
-MIC_BOOST = True
-DC_MODE_ENABLE = False
-CAPTURE_GAIN = 50
-BIAS = True
-
-# Setting on quit
-QUIT_MIC_BOOST = False
-QUIT_DC_MODE_ENABLE = False
-QUIT_CAPTURE_GAIN = 100
-QUIT_BIAS = True
-
-import logging
-
-_logger = logging.getLogger('TurtleArt')
-_logger.setLevel(logging.DEBUG)
-logging.basicConfig()
-
-from taconstants import SENSOR_AC_NO_BIAS, SENSOR_AC_BIAS, SENSOR_DC_NO_BIAS, \
- SENSOR_DC_BIAS, XO1
-
-
-class AudioGrab:
- """ The interface between measure and the audio device """
-
- def __init__(self, callable1, activity):
- """ Initialize the class: callable1 is a data buffer;
- activity is the parent class"""
-
- self.callable1 = callable1
- self.activity = activity
- self.sensor = None
-
- self.temp_buffer = [0]
- self.picture_buffer = [] # place to hold screen grabs
-
- self.draw_graph_status = False
- self.screenshot = True
-
- self.rate = 48000
- self.final_count = 0
- self.count_temp = 0
- self.entry_count = 0
-
- self.counter_buffer = 0
-
- self._dc_control = None
- self._mic_bias_control = None
- self._capture_control = None
- self._mic_boost_control = None
- self._hardwired = False # Query controls or use hardwired names
-
- # Set up gst pipeline
- self.pipeline = gst.Pipeline("pipeline")
- self.alsasrc = gst.element_factory_make("alsasrc", "alsa-source")
- self.pipeline.add(self.alsasrc)
- self.caps1 = gst.element_factory_make("capsfilter", "caps1")
- self.pipeline.add(self.caps1)
- caps_str = "audio/x-raw-int,rate=%d,channels=1,depth=16" % (RATE)
- self.caps1.set_property("caps", gst.caps_from_string(caps_str))
- self.fakesink = gst.element_factory_make("fakesink", "fsink")
- self.pipeline.add(self.fakesink)
- self.fakesink.connect("handoff", self.on_buffer)
- self.fakesink.set_property("signal-handoffs", True)
- gst.element_link_many(self.alsasrc, self.caps1, self.fakesink)
-
- self.dont_queue_the_buffer = False
-
- self._mixer = gst.element_factory_make('alsamixer')
- rc = self._mixer.set_state(gst.STATE_PAUSED)
- assert rc == gst.STATE_CHANGE_SUCCESS
-
- # Query the available controls
- try: # F11+
- _logger.debug('controls: %r', [t.props.untranslated_label \
- for t in self._mixer.list_tracks()])
- self._dc_control = self._find_control(['dc mode'])
- self._mic_bias_control = self._find_control(['mic bias',
- 'dc input bias',
- 'v_refout'])
- self._mic_boost_control = self._find_control(['mic boost',
- 'mic boost (+20db)',
- 'internal mic boost',
- 'analog mic boost'])
- self._mic_gain_control = self._find_control(['mic'])
- self._capture_control = self._find_control(['capture'])
- self._master_control = self._find_control(['master'])
- except AttributeError: # F9- (no untranslated_label attribute)
- self._hardwired = True
-
- # Variables for saving and resuming state of sound device
- self.master = self.get_master()
- self.bias = BIAS
- self.dcmode = DC_MODE_ENABLE
- self.capture_gain = CAPTURE_GAIN
- self.mic_boost = MIC_BOOST
- self.mic = self.get_mic_gain()
-
- # Timer for interval sampling and switch to indicate when to capture
- self.capture_timer = None
- self.capture_interval_sample = False
-
- def set_handoff_signal(self, handoff_state):
- """Sets whether the handoff signal would generate an interrupt or not"""
- self.fakesink.set_property("signal-handoffs", handoff_state)
-
- def _new_buffer(self, buf):
- """ Use a new buffer """
- if not self.dont_queue_the_buffer:
- self.temp_buffer = buf
- self.callable1(buf)
- else:
- pass
-
- def on_buffer(self, element, buffer, pad):
- """The function that is called whenever new data is available
- This is the signal handler for the handoff signal"""
- if buffer is None:
- _logger.debug('audiograb buffer is None')
- return False
-
- temp_buffer = fromstring(buffer, 'int16')
- if not self.dont_queue_the_buffer:
- self._new_buffer(temp_buffer)
- return False
-
- def set_sensor(self, sensor):
- """Keep a reference to the sensot toolbar for logging"""
- self.sensor = sensor
-
- def start_sound_device(self):
- """Start or Restart grabbing data from the audio capture"""
- gst.event_new_flush_start()
- self.pipeline.set_state(gst.STATE_PLAYING)
-
- def stop_sound_device(self):
- """Stop grabbing data from capture device"""
- gst.event_new_flush_stop()
- self.pipeline.set_state(gst.STATE_NULL)
-
- def set_sampling_rate(self, sr):
- """Sets the sampling rate of the capture device
- Sampling rate must be given as an integer for example 16000 for
- setting 16Khz sampling rate
- The sampling rate would be set in the device to the nearest available"""
- self.pause_grabbing()
- caps_str = "audio/x-raw-int,rate=%d,channels=1,depth=16" % (sr, )
- self.caps1.set_property("caps", gst.caps_from_string(caps_str))
- self.resume_grabbing()
-
- def get_sampling_rate(self):
- """Gets the sampling rate of the capture device"""
- return int(self.caps1.get_property("caps")[0]['rate'])
-
- def set_callable1(self, callable1):
- """Sets the callable to the drawing function for giving the
- data at the end of idle-add"""
- self.callable1 = callable1
-
- def start_grabbing(self):
- """Called right at the start of the Activity"""
- self.start_sound_device()
-
- def pause_grabbing(self):
- """When Activity goes into background"""
- self.save_state()
- self.stop_sound_device()
-
- def resume_grabbing(self):
- """When Activity becomes active after going to background"""
- self.start_sound_device()
- self.resume_state()
-
- def stop_grabbing(self):
- """Not used ???"""
- self.stop_sound_device()
- self.set_handoff_signal(False)
-
- def _find_control(self, prefixes):
- """Try to find a mixer control matching one of the prefixes.
-
- The control with the best match (smallest difference in length
- between label and prefix) will be returned. If no match is found,
- None is returned.
- """
- def best_prefix(label, prefixes):
- matches =\
- [len(label) - len(p) for p in prefixes if label.startswith(p)]
- if not matches:
- return None
-
- matches.sort()
- return matches[0]
-
- controls = []
- for track in self._mixer.list_tracks():
- label = track.props.untranslated_label.lower()
- diff = best_prefix(label, prefixes)
- if diff is not None:
- controls.append((track, diff))
-
- controls.sort(key=lambda e: e[1])
- if controls:
- _logger.debug("found control: %s" %\
- (str(controls[0][0].props.untranslated_label)))
- return controls[0][0]
-
- return None
-
- def save_state(self):
- """Saves the state of all audio controls"""
- self.master = self.get_master()
- self.bias = self.get_bias()
- self.dcmode = self.get_dc_mode()
- self.capture_gain = self.get_capture_gain()
- self.mic_boost = self.get_mic_boost()
-
- def resume_state(self):
- """Put back all audio control settings from the saved state"""
- self.set_master(self.master)
- self.set_bias(self.bias)
- self.set_dc_mode(self.dcmode)
- self.set_capture_gain(self.capture_gain)
- self.set_mic_boost(self.mic_boost)
-
- def _get_mute(self, control, name, default):
- """Get mute status of a control"""
- if not control:
- return default
-
- value = bool(control.flags & gst.interfaces.MIXER_TRACK_MUTE)
- _logger.debug('Getting %s (%s) mute status: %r', name,
- control.props.untranslated_label, value)
- return value
-
- def _set_mute(self, control, name, value):
- """Mute a control"""
- if not control:
- return
-
- self._mixer.set_mute(control, value)
- _logger.debug('Set mute for %s (%s) to %r', name,
- control.props.untranslated_label, value)
-
- def _get_volume(self, control, name):
- """Get volume of a control and convert to a scale of 0-100"""
- if not control:
- return 100
-
- try: # sometimes get_volume does not return a tuple
- hw_volume = self._mixer.get_volume(control)[0]
- except IndexError:
- return 100
-
- min_vol = control.min_volume
- max_vol = control.max_volume
- percent = (hw_volume - min_vol)*100//(max_vol - min_vol)
- return percent
-
- def _set_volume(self, control, name, value):
- """Sets the level of a control on a scale of 0-100"""
- if not control:
- return
-
- # convert value to scale of control
- min_vol = control.min_volume
- max_vol = control.max_volume
- if min_vol != max_vol:
- hw_volume = value*(max_vol - min_vol)//100 + min_vol
- self._mixer.set_volume(control, (hw_volume,)*control.num_channels)
-
- def amixer_set(self, control, state):
- """ Direct call to amixer for old systems. """
- if state:
- os.system("amixer set '%s' unmute" % (control))
- else:
- os.system("amixer set '%s' mute" % (control))
-
- def mute_master(self):
- """Mutes the Master Control"""
- if not self._hardwired and self.activity.hw != XO1:
- self._set_mute(self._master_control, 'Master', True)
- else:
- self.amixer_set('Master', False)
-
- def unmute_master(self):
- """Unmutes the Master Control"""
- if not self._hardwired and self.activity.hw != XO1:
- self._set_mute(self._master_control, 'Master', True)
- else:
- self.amixer_set('Master', True)
-
- def set_master(self, master_val):
- """Sets the Master gain slider settings
- master_val must be given as an integer between 0 and 100 indicating the
- percentage of the slider to be set"""
- if not self._hardwired:
- self._set_volume(self._master_control, 'Master', master_val)
- else:
- os.system("amixer set Master " + str(master_val) + "%")
-
- def get_master(self):
- """Gets the Master gain slider settings. The value returned is an
- integer between 0-100 and is an indicative of the percentage 0 - 100%"""
- if not self._hardwired:
- return self._get_volume(self._master_control, 'master')
- else:
- p = str(subprocess.Popen(["amixer", "get", "Master"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Front Left:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "%]")]
- return int(p)
-
- def set_bias(self, bias_state=False):
- """Enables / disables bias voltage."""
- if not self._hardwired and self.activity.hw != XO1:
- if self._mic_bias_control is None:
- return
- # if not isinstance(self._mic_bias_control,
- # gst.interfaces.MixerOptions):
- if self._mic_bias_control not in self._mixer.list_tracks():
- return self._set_mute(self._mic_bias_control, 'Mic Bias',
- not bias_state)
-
- # We assume that values are sorted from lowest (=off) to highest.
- # Since they are mixed strings ("Off", "50%", etc.), we cannot
- # easily ensure this by sorting with the default sort order.
- try:
- if bias_state:
- # self._mixer.set_option(self._mic_bias_control, values[-1])
- self._mixer.set_volume(self._mic_bias_control,
- self._mic_bias_control.max_volume)
- else:
- self._mixer.set_volume(self._mic_bias_control,
- self._mic_bias_control.min_volume)
- # self._mixer.set_option(self._mic_bias_control, values[0])
- except TypeError:
- self._set_mute(self._mic_bias_control, 'Mic Bias',
- not bias_state)
- elif self._hardwired:
- self.amixer_set('V_REFOUT Enable', bias_state)
- else:
- self.amixer_set('MIC Bias Enable', bias_state)
-
- def get_bias(self):
- """Check whether bias voltage is enabled."""
- if not self._hardwired:
- if self._mic_bias_control is None:
- return False
- if self._mic_bias_control not in self._mixer.list_tracks():
- return not self._get_mute(self._mic_bias_control, 'Mic Bias',
- False)
- current = self._mixer.get_volume(self._mic_bias_control)
- # same ordering assertion as in set_bias() applies
- # if current == values[0]:
- if current == self._mic_bias_control.min_volume:
- return False
- return True
- else:
- p = str(subprocess.Popen(["amixer", "get", "'V_REFOUT Enable'"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Mono:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "]")]
- if p == "on":
- return True
- return False
-
- def set_dc_mode(self, dc_mode=False):
- """Sets the DC Mode Enable control
- pass False to mute and True to unmute"""
- if not self._hardwired and self.activity.hw != XO1:
- if self._dc_control is not None:
- self._set_mute(self._dc_control, 'DC mode', not dc_mode)
- else:
- self.amixer_set('DC Mode Enable', dc_mode)
-
- def get_dc_mode(self):
- """Returns the setting of DC Mode Enable control
- i .e. True: Unmuted and False: Muted"""
- if not self._hardwired:
- if self._dc_control is not None:
- return not self._get_mute(self._dc_control, 'DC mode', False)
- else:
- return False
- else:
- p = str(subprocess.Popen(["amixer", "get", "'DC Mode Enable'"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Mono:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "]")]
- if p == "on":
- return True
- else:
- return False
-
- def set_mic_boost(self, mic_boost=False):
- """Set Mic Boost.
- True = +20dB, False = 0dB"""
- if not self._hardwired:
- if self._mic_boost_control is None:
- return
- if self._mic_boost_control not in self._mixer.list_tracks():
- return self._set_mute(self._mic_boost_control, 'Mic Boost',
- mic_boost)
- value = self._mixer.get_volume(self._mic_boost_control)
- try:
- if mic_boost:
- self._mixer.set_volume(self._mic_boost_control,
- self._mic_boost_control.max_volume)
- else:
- self._mixer.set_volume(self._mic_boost_control,
- self._mic_boost_control.min_volume)
- except TypeError:
- return self._set_mute(self._mic_boost_control, 'Mic Boost',
- not mic_boost)
- else:
- self.amixer_set('Mic Boost (+20dB)', mic_boost)
-
- def get_mic_boost(self):
- """Return Mic Boost setting.
- True = +20dB, False = 0dB"""
- if not self._hardwired:
- if self._mic_boost_control is None:
- return False
- if self._mic_boost_control not in self._mixer.list_tracks():
- return self._get_mute(self._mic_boost_control, 'Mic Boost',
- False)
- current = self._mixer.get_volume(self._mic_boost_control)
- _logger.debug('current: %s' % (str(current)))
- if current != self._mic_boost_control.min_volume:
- return True
- return False
- else:
- p = str(subprocess.Popen(["amixer", "get", "'Mic Boost (+20dB)'"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Mono:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "]")]
- if p == "on":
- return True
- else:
- return False
-
- def set_capture_gain(self, capture_val):
- """Sets the Capture gain slider settings
- capture_val must be given as an integer between 0 and 100 indicating the
- percentage of the slider to be set"""
- if not self._hardwired and self.activity.hw != XO1:
- if self._capture_control is not None:
- self._set_volume(self._capture_control, 'Capture', capture_val)
- else:
- os.system("amixer set Capture " + str(capture_val) + "%")
-
- def get_capture_gain(self):
- """Gets the Capture gain slider settings. The value returned is an
- integer between 0-100 and is an indicative of the percentage 0 - 100%"""
- if not self._hardwired:
- if self._capture_control is not None:
- return self._get_volume(self._capture_control, 'Capture')
- else:
- return 0
- else:
- p = str(subprocess.Popen(["amixer", "get", "Capture"],
- stdout=subprocess.PIPE).communicate()[0])
- p = p[find(p, "Front Left:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "%]")]
- return int(p)
-
- def set_mic_gain(self, mic_val):
- """Sets the MIC gain slider settings
- mic_val must be given as an integer between 0 and 100 indicating the
- percentage of the slider to be set"""
- if not self._hardwired and self.activity.hw != XO1:
- self._set_volume(self._mic_gain_control, 'Mic', mic_val)
- else:
- os.system("amixer set Mic " + str(mic_val) + "%")
-
- def get_mic_gain(self):
- """Gets the MIC gain slider settings. The value returned is an
- integer between 0-100 and is an indicative of the percentage 0 - 100%"""
- if not self._hardwired:
- return self._get_volume(self._mic_gain_control, 'Mic')
- else:
- p = str(subprocess.Popen(["amixer", "get", "Mic"],
- stdout=subprocess.PIPE).communicate()[0])
- try:
- p = p[find(p, "Mono:"):]
- p = p[find(p, "[")+1:]
- p = p[:find(p, "%]")]
- return int(p)
- except:
- return(0)
-
- def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS):
- """Set the type of sensor you want to use. Set sensor_type according
- to the following
- SENSOR_AC_NO_BIAS - AC coupling with Bias Off --> Very rarely used.
- Use when connecting a dynamic microphone externally
- SENSOR_AC_BIAS - AC coupling with Bias On --> The default settings.
- The internal MIC uses these
- SENSOR_DC_NO_BIAS - DC coupling with Bias Off --> measuring voltage
- output sensor. For example LM35 which gives output proportional
- to temperature
- SENSOR_DC_BIAS - DC coupling with Bias On --> measuing resistance.
- """
- PARAMETERS = {
- SENSOR_AC_NO_BIAS: (False, False, 50, True),
- SENSOR_AC_BIAS: (False, True, 40, True),
- SENSOR_DC_NO_BIAS: (True, False, 0, False),
- SENSOR_DC_BIAS: (True, True, 0, False)
- }
- mode, bias, gain, boost = PARAMETERS[sensor_type]
- _logger.debug("====================================")
- _logger.debug("Set Sensor Type to %s" % (str(sensor_type)))
- self._set_sensor_type(mode, bias, gain, boost)
- _logger.debug("====================================")
-
- def _set_sensor_type(self, mode=None, bias=None, gain=None, boost=None):
- """Helper to modify (some) of the sensor settings."""
- if mode is not None:
- self.set_dc_mode(mode)
- if self._dc_control is not None:
- os.system("amixer get '%s'" %\
- (self._dc_control.props.untranslated_label))
- if bias is not None:
- self.set_bias(bias)
- if self._mic_bias_control is not None:
- os.system("amixer get '%s'" %\
- (self._mic_bias_control.props.untranslated_label))
- if gain is not None:
- self.set_capture_gain(gain)
- if self._capture_control is not None:
- os.system("amixer get '%s'" %\
- (self._capture_control.props.untranslated_label))
- if boost is not None:
- self.set_mic_boost(boost)
- if self._mic_boost_control is not None:
- os.system("amixer get '%s'" %\
- (self._mic_boost_control.props.untranslated_label))
-
- def on_activity_quit(self):
- """When Activity quits"""
- self.set_mic_boost(QUIT_MIC_BOOST)
- self.set_dc_mode(QUIT_DC_MODE_ENABLE)
- self.set_capture_gain(QUIT_CAPTURE_GAIN)
- self.set_bias(QUIT_BIAS)
- self.stop_sound_device()
-
-
-class AudioGrab_XO1(AudioGrab):
- """ Use default parameters for OLPC XO 1.0 laptop """
- pass
-
-
-class AudioGrab_XO15(AudioGrab):
- """ Override parameters for OLPC XO 1.5 laptop """
- def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS):
- """Helper to modify (some) of the sensor settings."""
- PARAMETERS = {
- SENSOR_AC_NO_BIAS: (False, False, 80, True),
- SENSOR_AC_BIAS: (False, True, 80, True),
- SENSOR_DC_NO_BIAS: (True, False, 80, False),
- SENSOR_DC_BIAS: (True, True, 90, False)
- }
- _logger.debug("====================================")
- _logger.debug("Set Sensor Type to %s" % (str(sensor_type)))
- mode, bias, gain, boost = PARAMETERS[sensor_type]
- self._set_sensor_type(mode, bias, gain, boost)
- _logger.debug("====================================")
-
-
-class AudioGrab_Unknown(AudioGrab):
- """ Override parameters for generic hardware """
- def set_sensor_type(self, sensor_type=SENSOR_AC_BIAS):
- """Helper to modify (some) of the sensor settings."""
- PARAMETERS = {
- SENSOR_AC_NO_BIAS: (None, False, 50, True),
- SENSOR_AC_BIAS: (None, True, 40, True),
- SENSOR_DC_NO_BIAS: (True, False, 80, False),
- SENSOR_DC_BIAS: (True, True, 90, False)
- }
- _logger.debug("====================================")
- _logger.debug("Set Sensor Type to %s" % (str(sensor_type)))
- mode, bias, gain, boost = PARAMETERS[sensor_type]
- self._set_sensor_type(mode, bias, gain, boost)
- _logger.debug("====================================")
diff --git a/TurtleArt/ringbuffer.py b/TurtleArt/ringbuffer.py
deleted file mode 100644
index 2afb5c9..0000000
--- a/TurtleArt/ringbuffer.py
+++ /dev/null
@@ -1,108 +0,0 @@
-# Copyright (C) 2009, Benjamin Berg, Sebastian Berg
-# Copyright (C) 2010, Walter Bender
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
-import numpy as np
-
-
-class RingBuffer1d(object):
- """This class implements an array being written in as a ring and that can
- be read from continuously ending with the newest data or starting with the
- oldest. It returns a numpy array copy of the data;
- """
-
- def __init__(self, length, dtype=None):
- """Initialize the 1 dimensional ring buffer with the given lengths.
- The initial values are all 0s
- """
- self.offset = 0
-
- self._data = np.zeros(length, dtype=dtype)
-
- self.stored = 0
-
- def fill(self, number):
- self._data.fill(number)
- self.offset = 0
-
- def append(self, data):
- """Append to the ring buffer (and overwrite old data). If len(data)
- is greater then the ring buffers length, the newest data takes
- precedence.
- """
- data = np.asarray(data)
-
- if len(self._data) == 0:
- return
-
- if len(data) >= len(self._data):
- self._data[:] = data[-len(self._data):]
- self.offset = 0
- self.stored = len(self._data)
-
- elif len(self._data) - self.offset >= len(data):
- self._data[self.offset: self.offset + len(data)] = data
- self.offset = self.offset + len(data)
- self.stored += len(data)
- else:
- self._data[self.offset:] = data[:len(self._data) - self.offset]
- self._data[:len(data) - (len(self._data) - self.offset)] = \
- data[-len(data) + (len(self._data) - self.offset):]
- self.offset = len(data) - (len(self._data) - self.offset)
- self.stored += len(data)
-
- if len(self._data) <= self.stored:
- self.read = self._read
-
- def read(self, number=None, step=1):
- """Read the ring Buffer. Number can be positive or negative.
- Positive values will give the latest information, negative values will
- give the newest added information from the buffer. (in normal order)
-
- Before the buffer is filled once: This returns just None
- """
- return np.array([])
-
- def _read(self, number=None, step=1):
- """Read the ring Buffer. Number can be positive or negative.
- Positive values will give the latest information, negative values will
- give the newest added information from the buffer. (in normal order)
- """
- if number == None:
- number = len(self._data) // step
-
- number *= step
- assert abs(number) <= len(self._data), \
- 'Number to read*step must be smaller then length'
-
- if number < 0:
- if abs(number) <= self.offset:
- return self._data[self.offset + number:self.offset:step]
-
- spam = (self.offset - 1) % step
-
- return np.concatenate(
- (self._data[step - spam - 1 + self.offset + number::step],
- self._data[spam:self.offset:step]))
-
- if number - (len(self._data) - self.offset) > 0:
- spam = ((self.offset + number) - self.offset - 1) % step
- return np.concatenate(
- (self._data[self.offset:self.offset + number:step],
- self._data[spam:number -
- (len(self._data) - self.offset):step]))
-
- return self._data[self.offset:self.offset + number:step].copy()