Skip to content

Commit

Permalink
add coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
klahnakoski committed Jun 18, 2024
1 parent e9f994b commit 3fc9742
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 10 deletions.
16 changes: 6 additions & 10 deletions mo_threads/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,26 +92,22 @@ def push(self, value):
"""
SNEAK value TO FRONT OF THE QUEUE
"""
if self.closed and not self.allow_add_after_close:
logger.error("Do not push to closed queue")

with self.lock:
self._wait_for_queue_space(None)
if not self.closed:
self.queue.appendleft(value)
if self.closed and not self.allow_add_after_close:
logger.error("Do not push to closed queue")
self.queue.appendleft(value)
return self

def push_all(self, values):
"""
SNEAK values TO FRONT OF THE QUEUE
"""
if self.closed and not self.allow_add_after_close:
logger.error("Do not push to closed queue")

with self.lock:
self._wait_for_queue_space(None)
if not self.closed:
self.queue.extendleft(values)
if self.closed and not self.allow_add_after_close:
logger.error("Do not push to closed queue")
self.queue.extendleft(values)
return self

def pop_message(self, till=None):
Expand Down
57 changes: 57 additions & 0 deletions tests/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,60 @@ def drain(please_stop):
self.assertEqual(len(q), 0)
drain_thread.stop().join()

def test_put_to_closed(self):
q = Queue("")
q.close()
with self.assertRaises(Exception):
q.put(1)

def test_add_to_closed(self):
q = Queue("")
q.close()
with self.assertRaises(Exception):
q.add(1)

def test_push_to_closed(self):
q = Queue("")
q.close()
with self.assertRaises(Exception):
q.push(1)

def test_push(self):
q = Queue("")
q.push(1)
self.assertEqual(q.pop(), 1)

def test_push_all(self):
q = Queue("")
q.push_all([1, 2])
self.assertEqual(q.pop(), 2)
self.assertEqual(q.pop(), 1)

def test_push_all_closed(self):
q = Queue("")
q.close()
with self.assertRaises(Exception):
q.push_all([1, 2])

def test_push_to_closed_max(self):
q = Queue("", max=1)
q.add(1)
Thread.run("drain", close_and_drain, q)
with self.assertRaises(Exception):
q.push(1)

def test_push_all_closed_max(self):
q = Queue("", max=1)
q.add(1)
Thread.run("drain", close_and_drain, q)
with self.assertRaises(Exception):
q.push_all([1, 2])


def close_and_drain(q, please_stop):
Till(seconds=0.1).wait()
q.close()
while not please_stop:
result = q.pop()
if result is PLEASE_STOP:
break

0 comments on commit 3fc9742

Please sign in to comment.