Skip to content

Commit

Permalink
add coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
klahnakoski committed Jun 18, 2024
1 parent 3fc9742 commit 5a99248
Showing 1 changed file with 24 additions and 46 deletions.
70 changes: 24 additions & 46 deletions mo_threads/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import types
from collections import deque
from copy import copy
from datetime import datetime
from time import time
from queue import Empty, Full
from time import time

from mo_dots import Null, coalesce
from mo_logs import Except, logger

Expand All @@ -28,11 +28,8 @@

DEBUG = False

# MAX_DATETIME = datetime(2286, 11, 20, 17, 46, 39)
DEFAULT_WAIT_TIME = 10 * 60 # SECONDS

datetime.strptime("2012-01-01", "%Y-%m-%d") # http://bugs.python.org/issue7980


class Queue(object):
"""
Expand Down Expand Up @@ -82,8 +79,6 @@ def add(self, value, timeout=None, force=False, till=None):

if not force:
self._wait_for_queue_space(till)
if self.closed and not self.allow_add_after_close:
logger.error("Do not add to closed queue")
if not self.unique or value not in self.queue:
self.queue.append(value)
return self
Expand All @@ -94,8 +89,6 @@ def push(self, value):
"""
with self.lock:
self._wait_for_queue_space(None)
if self.closed and not self.allow_add_after_close:
logger.error("Do not push to closed queue")
self.queue.appendleft(value)
return self

Expand All @@ -105,8 +98,6 @@ def push_all(self, values):
"""
with self.lock:
self._wait_for_queue_space(None)
if self.closed and not self.allow_add_after_close:
logger.error("Do not push to closed queue")
self.queue.extendleft(values)
return self

Expand All @@ -119,26 +110,22 @@ def pop_message(self, till=None):
return Null, self.pop(till=till)

def extend(self, values):
if self.closed and not self.allow_add_after_close:
logger.error("Do not push to closed queue")

with self.lock:
# ONCE THE queue IS BELOW LIMIT, ALLOW ADDING MORE
self._wait_for_queue_space(None)
if not self.closed:
if self.unique:
for v in values:
if v is PLEASE_STOP:
self.closed.go()
continue
if v not in self.queue:
self.queue.append(v)
else:
for v in values:
if v is PLEASE_STOP:
self.closed.go()
continue
if self.unique:
for v in values:
if v is PLEASE_STOP:
self.closed.go()
continue
if v not in self.queue:
self.queue.append(v)
else:
for v in values:
if v is PLEASE_STOP:
self.closed.go()
continue
self.queue.append(v)
return self

def _wait_for_queue_space(self, till):
Expand All @@ -147,11 +134,6 @@ def _wait_for_queue_space(self, till):
:param timeout: IN SECONDS
"""
if len(self.queue) < self.max:
return

wait_time = 5

(DEBUG and len(self.queue) > 1 * 1000 * 1000) and logger.warning("Queue {name} has over a million items")

start = time()
Expand All @@ -163,7 +145,7 @@ def _wait_for_queue_space(self, till):
if self.silent:
self.lock.wait(till)
else:
self.lock.wait(Till(seconds=wait_time))
self.lock.wait(Till(seconds=5))
if not till and len(self.queue) >= self.max:
now = time()
logger.alert(
Expand All @@ -173,6 +155,8 @@ def _wait_for_queue_space(self, till):
num=len(self.queue),
wait_time=now - start,
)
if self.closed and not self.allow_add_after_close:
logger.error("Do not add to closed queue")

def __len__(self):
with self.lock:
Expand Down Expand Up @@ -345,14 +329,11 @@ def add(self, value, timeout=None, priority=0, till=None):
return

self.queue[priority]._wait_for_queue_space(till)
if self.closed and not self.queue[priority].allow_add_after_close:
logger.error("Do not add to closed queue")
else:
if self.unique:
if value not in self.queue[priority].queue:
self.queue[priority].queue.append(value)
else:
if self.unique:
if value not in self.queue[priority].queue:
self.queue[priority].queue.append(value)
else:
self.queue[priority].queue.append(value)
return self

def push(self, value, priority=0):
Expand All @@ -363,8 +344,7 @@ def push(self, value, priority=0):
logger.error("Do not push to closed queue")
with self.lock:
self.queue[priority]._wait_for_queue_space(None)
if not self.closed:
self.queue[priority].queue.appendleft(value)
self.queue[priority].queue.appendleft(value)
return self

def __len__(self):
Expand Down Expand Up @@ -571,17 +551,15 @@ def add(self, value, timeout=None, till=None):
till = till or Till(seconds=coalesce(timeout, DEFAULT_WAIT_TIME))
with self.lock:
self._wait_for_queue_space(till)
if not self.closed:
self.queue.append(value)
self.queue.append(value)
return self

def extend(self, values, till=None):
till = till or Till(seconds=DEFAULT_WAIT_TIME)
with self.lock:
# ONCE THE queue IS BELOW LIMIT, ALLOW ADDING MORE
self._wait_for_queue_space(till)
if not self.closed:
self.queue.extend(values)
self.queue.extend(values)
if not self.silent:
logger.info("{name} has {num} items", name=self.name, num=len(self.queue))
return self
Expand Down

0 comments on commit 5a99248

Please sign in to comment.