Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/plugins/audio_sensors/ringbuffer.py
blob: 2afb5c98171f2d62cec56b52f145909e2f699697 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#    Copyright (C) 2009, Benjamin Berg, Sebastian Berg
#    Copyright (C) 2010, Walter Bender
#
#    This program is free software; you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation; either version 2 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program; if not, write to the Free Software
#    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.

import numpy as np


class RingBuffer1d(object):
    """This class implements an array being written in as a ring and that can
    be read from continuously ending with the newest data or starting with the
    oldest. It returns a numpy array copy of the data;
    """

    def __init__(self, length, dtype=None):
        """Initialize the 1 dimensional ring buffer with the given lengths.
        The initial values are all 0s
        """
        self.offset = 0

        self._data = np.zeros(length, dtype=dtype)

        self.stored = 0

    def fill(self, number):
        self._data.fill(number)
        self.offset = 0

    def append(self, data):
        """Append to the ring buffer (and overwrite old data). If len(data)
        is greater then the ring buffers length, the newest data takes
        precedence.
        """
        data = np.asarray(data)

        if len(self._data) == 0:
            return

        if len(data) >= len(self._data):
            self._data[:] = data[-len(self._data):]
            self.offset = 0
            self.stored = len(self._data)

        elif len(self._data) - self.offset >= len(data):
            self._data[self.offset: self.offset + len(data)] = data
            self.offset = self.offset + len(data)
            self.stored += len(data)
        else:
            self._data[self.offset:] = data[:len(self._data) - self.offset]
            self._data[:len(data) - (len(self._data) - self.offset)] = \
                data[-len(data) + (len(self._data) - self.offset):]
            self.offset = len(data) - (len(self._data) - self.offset)
            self.stored += len(data)

        if len(self._data) <= self.stored:
            self.read = self._read

    def read(self, number=None, step=1):
        """Read the ring Buffer. Number can be positive or negative.
        Positive values will give the latest information, negative values will
        give the newest added information from the buffer. (in normal order)

        Before the buffer is filled once: This returns just None
        """
        return np.array([])

    def _read(self, number=None, step=1):
        """Read the ring Buffer. Number can be positive or negative.
        Positive values will give the latest information, negative values will
        give the newest added information from the buffer. (in normal order)
        """
        if number == None:
            number = len(self._data) // step

        number *= step
        assert abs(number) <= len(self._data), \
            'Number to read*step must be smaller then length'

        if number < 0:
            if abs(number) <= self.offset:
                return self._data[self.offset + number:self.offset:step]

            spam = (self.offset - 1) % step

            return np.concatenate(
                (self._data[step - spam - 1 + self.offset + number::step],
                 self._data[spam:self.offset:step]))

        if number - (len(self._data) - self.offset) > 0:
            spam = ((self.offset + number) - self.offset - 1) % step
            return np.concatenate(
                (self._data[self.offset:self.offset + number:step],
                 self._data[spam:number -
                            (len(self._data) - self.offset):step]))

        return self._data[self.offset:self.offset + number:step].copy()