From de95d30d41d92690212af05c2fbe15391ce1a05d Mon Sep 17 00:00:00 2001 From: Walter Bender Date: Sat, 19 Feb 2011 14:47:57 +0000 Subject: moving audio sensor-specific code to its own directrory --- (limited to 'audio/ringbuffer.py') diff --git a/audio/ringbuffer.py b/audio/ringbuffer.py new file mode 100644 index 0000000..2afb5c9 --- /dev/null +++ b/audio/ringbuffer.py @@ -0,0 +1,108 @@ +# Copyright (C) 2009, Benjamin Berg, Sebastian Berg +# Copyright (C) 2010, Walter Bender +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + +import numpy as np + + +class RingBuffer1d(object): + """This class implements an array being written in as a ring and that can + be read from continuously ending with the newest data or starting with the + oldest. It returns a numpy array copy of the data; + """ + + def __init__(self, length, dtype=None): + """Initialize the 1 dimensional ring buffer with the given lengths. + The initial values are all 0s + """ + self.offset = 0 + + self._data = np.zeros(length, dtype=dtype) + + self.stored = 0 + + def fill(self, number): + self._data.fill(number) + self.offset = 0 + + def append(self, data): + """Append to the ring buffer (and overwrite old data). If len(data) + is greater then the ring buffers length, the newest data takes + precedence. + """ + data = np.asarray(data) + + if len(self._data) == 0: + return + + if len(data) >= len(self._data): + self._data[:] = data[-len(self._data):] + self.offset = 0 + self.stored = len(self._data) + + elif len(self._data) - self.offset >= len(data): + self._data[self.offset: self.offset + len(data)] = data + self.offset = self.offset + len(data) + self.stored += len(data) + else: + self._data[self.offset:] = data[:len(self._data) - self.offset] + self._data[:len(data) - (len(self._data) - self.offset)] = \ + data[-len(data) + (len(self._data) - self.offset):] + self.offset = len(data) - (len(self._data) - self.offset) + self.stored += len(data) + + if len(self._data) <= self.stored: + self.read = self._read + + def read(self, number=None, step=1): + """Read the ring Buffer. Number can be positive or negative. + Positive values will give the latest information, negative values will + give the newest added information from the buffer. (in normal order) + + Before the buffer is filled once: This returns just None + """ + return np.array([]) + + def _read(self, number=None, step=1): + """Read the ring Buffer. Number can be positive or negative. + Positive values will give the latest information, negative values will + give the newest added information from the buffer. (in normal order) + """ + if number == None: + number = len(self._data) // step + + number *= step + assert abs(number) <= len(self._data), \ + 'Number to read*step must be smaller then length' + + if number < 0: + if abs(number) <= self.offset: + return self._data[self.offset + number:self.offset:step] + + spam = (self.offset - 1) % step + + return np.concatenate( + (self._data[step - spam - 1 + self.offset + number::step], + self._data[spam:self.offset:step])) + + if number - (len(self._data) - self.offset) > 0: + spam = ((self.offset + number) - self.offset - 1) % step + return np.concatenate( + (self._data[self.offset:self.offset + number:step], + self._data[spam:number - + (len(self._data) - self.offset):step])) + + return self._data[self.offset:self.offset + number:step].copy() -- cgit v0.9.1