Skip to content

Commit

Permalink
add python Queue methods
Browse files Browse the repository at this point in the history
  • Loading branch information
klahnakoski committed Jun 18, 2024
1 parent 178af6d commit e9f994b
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 53 deletions.
11 changes: 3 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,16 @@

The main benefits over Python's threading library is:

1. **Multi-threaded queues do not use serialization** - Serialization is
great in the general case, where you may also be communicating between
processes, but it is a needless overhead for single-process multi-threading.
It is left to the programmer to ensure the messages put on the queue are
not changed, which is not ominous demand.
2. **Shutdown order is deterministic and explicit** - Python's threading
1. **Shutdown order is deterministic and explicit** - Python's threading
library is missing strict conventions for controlled and orderly shutdown.
Each thread can shutdown on its own terms, but is expected to do so expediently.
* All threads are required to accept a `please_stop` signal; are
expected to test it in a timely manner; and expected to exit when signalled.
* All threads have a parent - The parent is responsible for ensuring their children get the `please_stop` signal, and are dead, before stopping themselves. This responsibility is baked into the thread spawning process,
so you need not deal with it unless you want.
3. Uses [**Signals**](#signal-class) to simplify logical
2. Uses [**Signals**](#signal-class) to simplify logical
dependencies among multiple threads, events, and timeouts.
4. **Logging and Profiling is Integrated** - Logging and exception handling
3. **Logging and Profiling is Integrated** - Logging and exception handling
is seamlessly integrated: This means logs are centrally handled, and thread
safe. Parent threads have access to uncaught child thread exceptions, and
the cProfiler properly aggregates results from the multiple threads.
Expand Down
6 changes: 3 additions & 3 deletions mo_threads/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def __init__(
if debug:
name = f"{name} (using {process.name})"
self.name = name
self.stdout = Queue("stdout for " + name, max=max_stdout)
self.stderr = Queue("stderr for " + name, max=max_stdout)
self.stdout = Queue(f"stdout for {name}", max=max_stdout)
self.stderr = Queue(f"stderr for {name}", max=max_stdout)
self.stderr_thread = Thread.run(f"{name} stderr", _stderr_relay, process.stderr, self.stderr).release()
# stdout_thread IS CONSIDERED THE LIFETIME OF THE COMMAND
self.worker_thread = Thread.run(f"{name} worker", self._worker, process.stdout, self.stdout).release()
Expand Down Expand Up @@ -211,7 +211,7 @@ def get_or_create_process(self, *, params, bufsize, cwd, debug, env, name, shell

# WAIT FOR START
try:
process.stdin.add("cd " + cmd_escape(cwd))
process.stdin.add(f"cd {cmd_escape(cwd)}")
process.stdin.add(LAST_RETURN_CODE)
start_timeout = Till(seconds=START_TIMEOUT)
while not start_timeout:
Expand Down
10 changes: 5 additions & 5 deletions mo_threads/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ def __init__(

self.debug = debug or DEBUG
self.name = f"{name} ({self.process_id})"
self.stopped = Signal("stopped signal for " + strings.quote(name))
self.please_stop = Signal("please stop for " + strings.quote(name))
self.stopped = Signal(f"stopped signal for {strings.quote(name)}")
self.please_stop = Signal(f"please stop for {strings.quote(name)}")
self.second_last_stdin = None
self.last_stdin = None
self.stdin = Queue("stdin for process " + strings.quote(name), silent=not self.debug)
self.stdout = Queue("stdout for process " + strings.quote(name), silent=not self.debug)
self.stderr = Queue("stderr for process " + strings.quote(name), silent=not self.debug)
self.stdin = Queue(f"stdin for process {strings.quote(name)}", silent=not self.debug)
self.stdout = Queue(f"stdout for process {strings.quote(name)}", silent=not self.debug)
self.stderr = Queue(f"stderr for process {strings.quote(name)}", silent=not self.debug)
self.timeout = timeout
self.monitor_period = 0.5

Expand Down
2 changes: 1 addition & 1 deletion mo_threads/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, name, config, parent_thread=None):
if line == '{"out":"ok"}':
break
logger.info("waiting to start python: {line}", line=line)
self.lock = Lock("wait for response from " + name)
self.lock = Lock(f"wait for response from {name}")
self.stop_error = None
self.done = DONE
self.response = None
Expand Down
122 changes: 87 additions & 35 deletions mo_threads/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from copy import copy
from datetime import datetime
from time import time

from queue import Empty, Full
from mo_dots import Null, coalesce
from mo_logs import Except, logger

Expand All @@ -37,9 +37,6 @@
class Queue(object):
"""
SIMPLE MULTI-THREADED QUEUE
(processes.Queue REQUIRES SERIALIZATION, WHICH
IS DIFFICULT TO USE JUST BETWEEN THREADS)
"""

def __init__(self, name, max=None, silent=False, unique=False, allow_add_after_close=False):
Expand All @@ -53,10 +50,8 @@ def __init__(self, name, max=None, silent=False, unique=False, allow_add_after_c
self.silent = silent
self.allow_add_after_close = allow_add_after_close
self.unique = unique
self.closed = Signal(
"queue is closed signal for " + name
) # INDICATE THE PRODUCER IS DONE GENERATING ITEMS TO QUEUE
self.lock = Lock("lock for queue " + name)
self.closed = Signal(f"{name} is closed")
self.lock = Lock(f"lock for queue {name}")
self.queue = deque()

def __iter__(self):
Expand All @@ -70,28 +65,26 @@ def __iter__(self):
except Exception as cause:
logger.warning("Tell me about what happened here", cause)

def add(self, value, timeout=None, force=False):
def add(self, value, timeout=None, force=False, till=None):
"""
:param value: ADDED THE THE QUEUE
:param value: ADDED TO THE QUEUE
:param timeout: HOW LONG TO WAIT FOR QUEUE TO NOT BE FULL
:param force: ADD TO QUEUE, EVEN IF FULL (USE ONLY WHEN CONSUMER IS RETURNING WORK TO THE QUEUE)
:param till: A `Signal` WHEN TO GIVE UP WAITING FOR SPACE IN THE QUEUE (INSTEAD OF timeout)
:return: self
"""
till = till or Till(seconds=coalesce(timeout, DEFAULT_WAIT_TIME))
with self.lock:
if value is PLEASE_STOP:
# INSIDE THE lock SO THAT EXITING WILL RELEASE wait()
self.queue.append(value)
self.closed.go()
return

if not force:
self._wait_for_queue_space(timeout=timeout)
self._wait_for_queue_space(till)
if self.closed and not self.allow_add_after_close:
logger.error("Do not add to closed queue")
if self.unique:
if value not in self.queue:
self.queue.append(value)
else:
if not self.unique or value not in self.queue:
self.queue.append(value)
return self

Expand All @@ -103,7 +96,7 @@ def push(self, value):
logger.error("Do not push to closed queue")

with self.lock:
self._wait_for_queue_space()
self._wait_for_queue_space(None)
if not self.closed:
self.queue.appendleft(value)
return self
Expand All @@ -116,7 +109,7 @@ def push_all(self, values):
logger.error("Do not push to closed queue")

with self.lock:
self._wait_for_queue_space()
self._wait_for_queue_space(None)
if not self.closed:
self.queue.extendleft(values)
return self
Expand All @@ -135,7 +128,7 @@ def extend(self, values):

with self.lock:
# ONCE THE queue IS BELOW LIMIT, ALLOW ADDING MORE
self._wait_for_queue_space()
self._wait_for_queue_space(None)
if not self.closed:
if self.unique:
for v in values:
Expand All @@ -152,7 +145,7 @@ def extend(self, values):
self.queue.append(v)
return self

def _wait_for_queue_space(self, timeout=None):
def _wait_for_queue_space(self, till):
"""
EXPECT THE self.lock TO BE HAD, WAITS FOR self.queue TO HAVE A LITTLE SPACE
Expand All @@ -166,17 +159,16 @@ def _wait_for_queue_space(self, timeout=None):
(DEBUG and len(self.queue) > 1 * 1000 * 1000) and logger.warning("Queue {name} has over a million items")

start = time()
stop_waiting = Till(till=start + coalesce(timeout, DEFAULT_WAIT_TIME))

while not self.closed and len(self.queue) >= self.max:
if stop_waiting:
if till:
logger.error(THREAD_TIMEOUT, name=self.name)

if self.silent:
self.lock.wait(stop_waiting)
self.lock.wait(till)
else:
self.lock.wait(Till(seconds=wait_time))
if not stop_waiting and len(self.queue) >= self.max:
if not till and len(self.queue) >= self.max:
now = time()
logger.alert(
"Queue with name {name|quote} is full with ({num} items),"
Expand All @@ -203,9 +195,6 @@ def pop(self, till=None):
:param till: A `Signal` to stop waiting and return None
:return: A value, or a PLEASE_STOP or None
"""
if till is not None and not isinstance(till, Signal):
logger.error("expecting a signal")

with self.lock:
while True:
if self.queue:
Expand Down Expand Up @@ -256,6 +245,67 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

# python queue.Queue
def qsize(self):
with self.lock:
return len(self.queue)

def empty(self):
with self.lock:
return not bool(self.queue)

def full(self):
with self.lock:
return len(self.queue) >= self.max

def put(self, item, block=True, timeout=None):
if block:
try:
self.add(item, timeout=timeout, force=not block)
return
except Exception as cause:
if THREAD_TIMEOUT in cause:
raise Full()
raise
self.put_nowait(item)

def put_nowait(self, item):
self.add(item, force=True)

def get(self, block=True, timeout=None):
if block:
if timeout is None:
return self.pop()
else:
till = Till(seconds=timeout)
value = self.pop(till)
if value is None:
raise Empty()
return value
return self.get_nowait()

def get_nowait(self):
value = self.pop(True)
if value is None:
raise Empty()
return value

def task_done(self):
pass

def join(self, till=None):
"""
WAIT FOR ALL ITEMS TO BE PROCESSED
DIFFERS FROM PYTHON queue.Queue IN THAT IT DOES NOT WAIT FOR task_done()
"""
self.closed.wait(till)
with self.lock:
while not till:
if not self.queue:
break
self.lock.wait(till)
return self


class PriorityQueue(Queue):
"""
Expand Down Expand Up @@ -289,15 +339,16 @@ def __iter__(self):
if not self.silent:
logger.info("queue iterator is done")

def add(self, value, timeout=None, priority=0):
def add(self, value, timeout=None, priority=0, till=None):
till = till or Till(seconds=coalesce(timeout, DEFAULT_WAIT_TIME))
with self.lock:
if value is PLEASE_STOP:
# INSIDE THE lock SO THAT EXITING WILL RELEASE wait()
self.queue[priority].queue.append(value)
self.closed.go()
return

self.queue[priority]._wait_for_queue_space(timeout=timeout)
self.queue[priority]._wait_for_queue_space(till)
if self.closed and not self.queue[priority].allow_add_after_close:
logger.error("Do not add to closed queue")
else:
Expand All @@ -314,9 +365,8 @@ def push(self, value, priority=0):
"""
if self.closed and not self.queue[priority].allow_add_after_close:
logger.error("Do not push to closed queue")

with self.lock:
self.queue[priority]._wait_for_queue_space()
self.queue[priority]._wait_for_queue_space(None)
if not self.closed:
self.queue[priority].queue.appendleft(value)
return self
Expand Down Expand Up @@ -521,17 +571,19 @@ def push_to_queue():
def add_child(self, child):
pass

def add(self, value, timeout=None):
def add(self, value, timeout=None, till=None):
till = till or Till(seconds=coalesce(timeout, DEFAULT_WAIT_TIME))
with self.lock:
self._wait_for_queue_space(timeout=timeout)
self._wait_for_queue_space(till)
if not self.closed:
self.queue.append(value)
return self

def extend(self, values):
def extend(self, values, till=None):
till = till or Till(seconds=DEFAULT_WAIT_TIME)
with self.lock:
# ONCE THE queue IS BELOW LIMIT, ALLOW ADDING MORE
self._wait_for_queue_space()
self._wait_for_queue_space(till)
if not self.closed:
self.queue.extend(values)
if not self.silent:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def _consumer(please_stop):

def _producer(t, please_stop=None):
for i in range(2):
queue.add(str(t) + ":" + str(i))
queue.add(f"{t}:{i}")
Till(seconds=0.01).wait()

consumer = Thread.run("consumer", _consumer)
Expand Down
Loading

0 comments on commit e9f994b

Please sign in to comment.