Skip to content

Commit

Permalink
release 6.634.24139
Browse files Browse the repository at this point in the history
  • Loading branch information
klahnakoski committed May 18, 2024
2 parents 34a7e91 + 2fee088 commit 0fd84a1
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 55 deletions.
3 changes: 2 additions & 1 deletion mo_threads/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from mo_threads.signals import Signal, DONE, NEVER
from mo_threads.threads import (
MainThread,
THREAD_STOP,
PLEASE_STOP,
THREAD_TIMEOUT,
Thread,
stop_main_thread,
Expand All @@ -33,6 +33,7 @@
)
from mo_threads.till import Till

THREAD_STOP = PLEASE_STOP
export("mo_threads.signals", threads)
del threads

Expand Down
16 changes: 8 additions & 8 deletions mo_threads/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from mo_threads.processes import os_path, Process
from mo_threads.queues import Queue
from mo_threads.signals import Signal
from mo_threads.threads import THREAD_STOP, Thread
from mo_threads.threads import PLEASE_STOP, Thread
from mo_threads.till import Till

DEBUG = False
Expand Down Expand Up @@ -117,7 +117,7 @@ def _worker(self, source, destination, please_stop=None):
value = source.pop(till=please_stop)
if value is None:
continue
elif value is THREAD_STOP:
elif value is PLEASE_STOP:
self.debug and logger.info("got thread stop")
return
elif line_count == 0 and "is not recognized as an internal or external command" in value:
Expand All @@ -135,8 +135,8 @@ def _worker(self, source, destination, please_stop=None):
line_count += 1
destination.add(value)
finally:
destination.add(THREAD_STOP)
# self.process.stderr.add(THREAD_STOP)
destination.add(PLEASE_STOP)
# self.process.stderr.add(PLEASE_STOP)
self.stderr_thread.please_stop.go()
self.stderr_thread.join()
self.manager.return_process(self.process)
Expand All @@ -146,15 +146,15 @@ def _worker(self, source, destination, please_stop=None):
def _stderr_relay(source, destination, please_stop=None):
while not please_stop:
value = source.pop(till=please_stop)
if value is THREAD_STOP:
if value is PLEASE_STOP:
break
if value:
destination.add(value)
for value in source.pop_all():
if value and value is not THREAD_STOP:
if value and value is not PLEASE_STOP:
destination.add(value)

destination.add(THREAD_STOP)
destination.add(PLEASE_STOP)


class LifetimeManager:
Expand Down Expand Up @@ -216,7 +216,7 @@ def get_or_create_process(self, *, params, bufsize, cwd, debug, env, name, shell
start_timeout = Till(seconds=START_TIMEOUT)
while not start_timeout:
value = process.stdout.pop(till=start_timeout)
if value == THREAD_STOP:
if value == PLEASE_STOP:
process.kill_once()
process.join()
logger.error("Could not start command, stdout closed early")
Expand Down
6 changes: 3 additions & 3 deletions mo_threads/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from mo_threads.queues import Queue
from mo_threads.signals import Signal
from mo_threads.threads import THREAD_STOP, Thread, EndOfThread, ALL_LOCK, ALL
from mo_threads.threads import PLEASE_STOP, Thread, EndOfThread, ALL_LOCK, ALL
from mo_threads.till import Till

DEBUG = False
Expand Down Expand Up @@ -284,9 +284,9 @@ def _reader(self, name, pipe, receive, status: Status, please_stop):
def _writer(self, pipe, send, please_stop):
while not please_stop:
line = send.pop(till=please_stop)
if line is THREAD_STOP:
if line is PLEASE_STOP:
please_stop.go()
self.debug and logger.info("got THREAD_STOP")
self.debug and logger.info("got PLEASE_STOP")
break
elif line is None:
continue
Expand Down
8 changes: 4 additions & 4 deletions mo_threads/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from mo_dots import to_data, from_data
from mo_future import is_windows
from mo_logs import Except, logger
from mo_threads import Lock, Process, Signal, THREAD_STOP, Thread, DONE, python_worker
from mo_threads import Lock, Process, Signal, PLEASE_STOP, Thread, DONE, python_worker

DEBUG = False

Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(self, name, config, parent_thread=None):
)))
while True:
line = self.process.stdout.pop()
if line == THREAD_STOP:
if line == PLEASE_STOP:
logger.error("problem starting python process: STOP detected on stdout")
if line == '{"out":"ok"}':
break
Expand Down Expand Up @@ -87,7 +87,7 @@ def _watch_stdout(self, please_stop):
while not please_stop:
line = self.process.stdout.pop(till=please_stop)
DEBUG and logger.info("stdout got {line}", line=line)
if line == THREAD_STOP:
if line == PLEASE_STOP:
please_stop.go()
break
elif not line:
Expand Down Expand Up @@ -117,7 +117,7 @@ def _watch_stderr(self, please_stop):
try:
line = self.process.stderr.pop(till=please_stop)
DEBUG and logger.info("stderr got {line}", line=line)
if line is None or line == THREAD_STOP:
if line is None or line == PLEASE_STOP:
please_stop.go()
break
logger.info(
Expand Down
48 changes: 24 additions & 24 deletions mo_threads/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from mo_threads.lock import Lock
from mo_threads.signals import Signal
from mo_threads.threads import THREAD_STOP, THREAD_TIMEOUT, Thread
from mo_threads.threads import PLEASE_STOP, THREAD_TIMEOUT, Thread
from mo_threads.till import Till

DEBUG = False
Expand Down Expand Up @@ -63,7 +63,7 @@ def __iter__(self):
try:
while True:
value = self.pop()
if value is THREAD_STOP:
if value is PLEASE_STOP:
break
if value is not None:
yield value
Expand All @@ -78,7 +78,7 @@ def add(self, value, timeout=None, force=False):
:return: self
"""
with self.lock:
if value is THREAD_STOP:
if value is PLEASE_STOP:
# INSIDE THE lock SO THAT EXITING WILL RELEASE wait()
self.queue.append(value)
self.closed.go()
Expand Down Expand Up @@ -139,14 +139,14 @@ def extend(self, values):
if not self.closed:
if self.unique:
for v in values:
if v is THREAD_STOP:
if v is PLEASE_STOP:
self.closed.go()
continue
if v not in self.queue:
self.queue.append(v)
else:
for v in values:
if v is THREAD_STOP:
if v is PLEASE_STOP:
self.closed.go()
continue
self.queue.append(v)
Expand Down Expand Up @@ -192,16 +192,16 @@ def __len__(self):

def __nonzero__(self):
with self.lock:
return any(r != THREAD_STOP for r in self.queue)
return any(r != PLEASE_STOP for r in self.queue)

def pop(self, till=None):
"""
WAIT FOR NEXT ITEM ON THE QUEUE
RETURN THREAD_STOP IF QUEUE IS CLOSED
RETURN PLEASE_STOP IF QUEUE IS CLOSED
RETURN None IF till IS REACHED AND QUEUE IS STILL EMPTY
:param till: A `Signal` to stop waiting and return None
:return: A value, or a THREAD_STOP or None
:return: A value, or a PLEASE_STOP or None
"""
if till is not None and not isinstance(till, Signal):
logger.error("expecting a signal")
Expand All @@ -217,7 +217,7 @@ def pop(self, till=None):
break
return None
(DEBUG or not self.silent) and logger.info("{name} queue closed", name=self.name, stack_depth=1)
return THREAD_STOP
return PLEASE_STOP

def pop_all(self):
"""
Expand All @@ -235,12 +235,12 @@ def pop_one(self):
"""
with self.lock:
if self.closed:
return THREAD_STOP
return PLEASE_STOP
elif not self.queue:
return None
else:
v = self.queue.popleft()
if v is THREAD_STOP: # SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
if v is PLEASE_STOP: # SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
self.closed.go()
return v

Expand Down Expand Up @@ -279,7 +279,7 @@ def __iter__(self):
try:
while True:
value = self.pop(self.closed)
if value is THREAD_STOP:
if value is PLEASE_STOP:
break
if value is not None:
yield value
Expand All @@ -291,7 +291,7 @@ def __iter__(self):

def add(self, value, timeout=None, priority=0):
with self.lock:
if value is THREAD_STOP:
if value is PLEASE_STOP:
# INSIDE THE lock SO THAT EXITING WILL RELEASE wait()
self.queue[priority].queue.append(value)
self.closed.go()
Expand Down Expand Up @@ -327,7 +327,7 @@ def __len__(self):

def __nonzero__(self):
with self.lock:
return any(any(r != THREAD_STOP for r in q.queue) for q in self.queue)
return any(any(r != PLEASE_STOP for r in q.queue) for q in self.queue)

def highest_entry(self):
for count, q in enumerate(self.queue):
Expand All @@ -338,11 +338,11 @@ def highest_entry(self):
def pop(self, till=None, priority=None):
"""
WAIT FOR NEXT ITEM ON THE QUEUE
RETURN THREAD_STOP IF QUEUE IS CLOSED
RETURN PLEASE_STOP IF QUEUE IS CLOSED
RETURN None IF till IS REACHED AND QUEUE IS STILL EMPTY
:param till: A `Signal` to stop waiting and return None
:return: A value, or a THREAD_STOP or None
:return: A value, or a PLEASE_STOP or None
"""
if till is not None and not isinstance(till, Signal):
logger.error("expecting a signal")
Expand All @@ -361,7 +361,7 @@ def pop(self, till=None, priority=None):
break
return None
(DEBUG or not self.silent) and logger.info(self.name + " queue stopped")
return THREAD_STOP
return PLEASE_STOP

def pop_all(self, priority=None):
"""
Expand Down Expand Up @@ -396,12 +396,12 @@ def pop_one(self, priority=None):
if not priority:
priority = self.highest_entry()
if self.closed:
return [THREAD_STOP]
return [PLEASE_STOP]
elif not self.queue:
return None
else:
v = self.pop(priority=priority)
if v is THREAD_STOP: # SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
if v is PLEASE_STOP: # SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION
self.closed.go()
return v

Expand Down Expand Up @@ -441,7 +441,7 @@ def __init__(
)

def worker_bee(self, batch_size, period, error_target, please_stop):
please_stop.then(lambda: self.add(THREAD_STOP))
please_stop.then(lambda: self.add(PLEASE_STOP))

_buffer = []
_post_push_functions = []
Expand Down Expand Up @@ -470,7 +470,7 @@ def push_to_queue():
item = self.pop(till=next_push)
now = time()

if item is THREAD_STOP:
if item is PLEASE_STOP:
push_to_queue()
please_stop.go()
break
Expand Down Expand Up @@ -516,7 +516,7 @@ def push_to_queue():
if _buffer:
# ONE LAST PUSH, DO NOT HAVE TIME TO DEAL WITH ERRORS
push_to_queue()
self.slow_queue.add(THREAD_STOP)
self.slow_queue.add(PLEASE_STOP)

def add_child(self, child):
pass
Expand All @@ -542,12 +542,12 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.add(THREAD_STOP)
self.add(PLEASE_STOP)
if isinstance(exc_val, BaseException):
self.thread.please_stop.go()
self.thread.join()

def stop(self):
self.add(THREAD_STOP)
self.add(PLEASE_STOP)
self.thread.join()
return self
1 change: 0 additions & 1 deletion mo_threads/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
PARENT_THREAD = "parent_thread" # OPTIONAL PARAMETER TO ASSIGN THREAD TO SOMETHING OTHER THAN CURRENT THREAD
MAX_DATETIME = datetime(2286, 11, 20, 17, 46, 39)
DEFAULT_WAIT_TIME = timedelta(minutes=10)
THREAD_STOP = "stop"
THREAD_TIMEOUT = "Thread {name} timeout"
COVERAGE_COLLECTOR = None # Detect Coverage.py

Expand Down
Loading

0 comments on commit 0fd84a1

Please sign in to comment.