-
Notifications
You must be signed in to change notification settings - Fork 267
/
ring_buffer.py
133 lines (106 loc) · 4.12 KB
/
ring_buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from typing import Generic, TypeVar
T = TypeVar("T")
class RingBuffer(Generic[T]):
def __init__(self, capacity: int) -> None:
self._capacity = capacity
self._init_buffer()
def __str__(self) -> str:
if self.is_empty():
return "[]"
if self._tail > self._head:
buffer_state = self._buffer[self._head : self._tail]
else:
buffer_state = self._buffer[self._head :] + self._buffer[: self._tail]
return str(buffer_state)
def __repr__(self) -> str:
return self.__str__()
def __len__(self) -> int:
return self._size
def _init_buffer(self) -> None:
self._buffer = [None] * self._capacity
self._head = 0
self._tail = 0
self._size = 0
def clear(self) -> None:
self._init_buffer()
def push(self, item: T) -> None:
if self.is_full():
raise IndexError("Ring buffer is full")
self._buffer[self._tail] = item
self._tail = (self._tail + 1) % self._capacity
self._size = min(self._size + 1, self._capacity)
def get(self) -> T:
if self.is_empty():
raise IndexError("Ring buffer is empty")
item = self._buffer[self._head]
self._head = (self._head + 1) % self._capacity
self._size -= 1
return item
def is_full(self) -> bool:
return self._size == self._capacity
def is_empty(self) -> bool:
return self._size == 0
if __name__ == "__main__":
import unittest
class TestRingBuffer(unittest.TestCase):
def test_push(self):
ring_buffer = RingBuffer[int](3)
ring_buffer.push(7)
ring_buffer.push(3)
self.assertEqual("[7, 3]", str(ring_buffer))
ring_buffer.push(10)
self.assertEqual("[7, 3, 10]", str(ring_buffer))
def test_get(self):
ring_buffer = RingBuffer[int](3)
ring_buffer.push(17)
ring_buffer.push(35)
self.assertEqual(17, ring_buffer.get())
self.assertEqual(35, ring_buffer.get())
def test_clear(self):
ring_buffer = RingBuffer[int](3)
ring_buffer.push(6)
ring_buffer.push(11)
self.assertEqual("[6, 11]", str(ring_buffer))
ring_buffer.clear()
self.assertEqual("[]", str(ring_buffer))
self.assertEqual(0, len(ring_buffer))
def test_push_with_full_error(self):
ring_buffer = RingBuffer[int](1)
ring_buffer.push(7)
self.assertRaises(IndexError, ring_buffer.push, 9)
ring_buffer = RingBuffer[int](2)
ring_buffer.push(23)
ring_buffer.push(14)
self.assertRaises(IndexError, ring_buffer.push, 11)
def test_get_with_empty_error(self):
ring_buffer = RingBuffer[int](1)
self.assertRaises(IndexError, ring_buffer.get)
ring_buffer.push(15)
ring_buffer.get()
self.assertRaises(IndexError, ring_buffer.get)
def test_size(self):
ring_buffer = RingBuffer[int](3)
ring_buffer.push(5)
self.assertEqual(1, len(ring_buffer))
ring_buffer.push(3)
self.assertEqual(2, len(ring_buffer))
ring_buffer.get()
self.assertEqual(1, len(ring_buffer))
def test_str(self):
ring_buffer = RingBuffer[int](3)
self.assertEqual("[]", str(ring_buffer))
ring_buffer.push(10)
self.assertEqual("[10]", str(ring_buffer))
ring_buffer.get()
ring_buffer.push(11)
ring_buffer.push(9)
self.assertEqual("[11, 9]", str(ring_buffer))
ring_buffer.push(7)
self.assertEqual("[11, 9, 7]", str(ring_buffer))
def test_with_str_type(self):
ring_buffer = RingBuffer[str](3)
ring_buffer.push("mug")
ring_buffer.push("cup")
self.assertEqual("['mug', 'cup']", str(ring_buffer))
suite = unittest.TestLoader().loadTestsFromTestCase(TestRingBuffer)
unittest.TextTestRunner(verbosity=2).run(suite)