From a2295083d196672e12076f9b90c79ec6be934cdd Mon Sep 17 00:00:00 2001 From: Kyle Lahnakoski Date: Tue, 14 May 2024 22:52:03 -0400 Subject: [PATCH 1/3] add PLEASE_STOP sentinal --- mo_threads/__init__.py | 3 ++- mo_threads/commands.py | 16 +++++++------- mo_threads/processes.py | 6 +++--- mo_threads/python.py | 8 +++---- mo_threads/queues.py | 48 ++++++++++++++++++++--------------------- mo_threads/threads.py | 1 - tests/test_locks.py | 6 +++--- 7 files changed, 44 insertions(+), 44 deletions(-) diff --git a/mo_threads/__init__.py b/mo_threads/__init__.py index 266ae31..47bd45d 100644 --- a/mo_threads/__init__.py +++ b/mo_threads/__init__.py @@ -21,7 +21,7 @@ from mo_threads.signals import Signal, DONE, NEVER from mo_threads.threads import ( MainThread, - THREAD_STOP, + PLEASE_STOP, THREAD_TIMEOUT, Thread, stop_main_thread, @@ -33,6 +33,7 @@ ) from mo_threads.till import Till +THREAD_STOP = PLEASE_STOP export("mo_threads.signals", threads) del threads diff --git a/mo_threads/commands.py b/mo_threads/commands.py index c844610..8dcd19a 100644 --- a/mo_threads/commands.py +++ b/mo_threads/commands.py @@ -21,7 +21,7 @@ from mo_threads.processes import os_path, Process from mo_threads.queues import Queue from mo_threads.signals import Signal -from mo_threads.threads import THREAD_STOP, Thread +from mo_threads.threads import PLEASE_STOP, Thread from mo_threads.till import Till DEBUG = False @@ -117,7 +117,7 @@ def _worker(self, source, destination, please_stop=None): value = source.pop(till=please_stop) if value is None: continue - elif value is THREAD_STOP: + elif value is PLEASE_STOP: self.debug and logger.info("got thread stop") return elif line_count == 0 and "is not recognized as an internal or external command" in value: @@ -135,8 +135,8 @@ def _worker(self, source, destination, please_stop=None): line_count += 1 destination.add(value) finally: - destination.add(THREAD_STOP) - # self.process.stderr.add(THREAD_STOP) + destination.add(PLEASE_STOP) + # self.process.stderr.add(PLEASE_STOP) self.stderr_thread.please_stop.go() self.stderr_thread.join() self.manager.return_process(self.process) @@ -146,15 +146,15 @@ def _worker(self, source, destination, please_stop=None): def _stderr_relay(source, destination, please_stop=None): while not please_stop: value = source.pop(till=please_stop) - if value is THREAD_STOP: + if value is PLEASE_STOP: break if value: destination.add(value) for value in source.pop_all(): - if value and value is not THREAD_STOP: + if value and value is not PLEASE_STOP: destination.add(value) - destination.add(THREAD_STOP) + destination.add(PLEASE_STOP) class LifetimeManager: @@ -216,7 +216,7 @@ def get_or_create_process(self, *, params, bufsize, cwd, debug, env, name, shell start_timeout = Till(seconds=START_TIMEOUT) while not start_timeout: value = process.stdout.pop(till=start_timeout) - if value == THREAD_STOP: + if value == PLEASE_STOP: process.kill_once() process.join() logger.error("Could not start command, stdout closed early") diff --git a/mo_threads/processes.py b/mo_threads/processes.py index 57a4236..22677b5 100644 --- a/mo_threads/processes.py +++ b/mo_threads/processes.py @@ -22,7 +22,7 @@ from mo_threads.queues import Queue from mo_threads.signals import Signal -from mo_threads.threads import THREAD_STOP, Thread, EndOfThread, ALL_LOCK, ALL +from mo_threads.threads import PLEASE_STOP, Thread, EndOfThread, ALL_LOCK, ALL from mo_threads.till import Till DEBUG = False @@ -284,9 +284,9 @@ def _reader(self, name, pipe, receive, status: Status, please_stop): def _writer(self, pipe, send, please_stop): while not please_stop: line = send.pop(till=please_stop) - if line is THREAD_STOP: + if line is PLEASE_STOP: please_stop.go() - self.debug and logger.info("got THREAD_STOP") + self.debug and logger.info("got PLEASE_STOP") break elif line is None: continue diff --git a/mo_threads/python.py b/mo_threads/python.py index 28a00f1..ddb8c5c 100644 --- a/mo_threads/python.py +++ b/mo_threads/python.py @@ -15,7 +15,7 @@ from mo_dots import to_data, from_data from mo_future import is_windows from mo_logs import Except, logger -from mo_threads import Lock, Process, Signal, THREAD_STOP, Thread, DONE, python_worker +from mo_threads import Lock, Process, Signal, PLEASE_STOP, Thread, DONE, python_worker DEBUG = False @@ -48,7 +48,7 @@ def __init__(self, name, config, parent_thread=None): ))) while True: line = self.process.stdout.pop() - if line == THREAD_STOP: + if line == PLEASE_STOP: logger.error("problem starting python process: STOP detected on stdout") if line == '{"out":"ok"}': break @@ -87,7 +87,7 @@ def _watch_stdout(self, please_stop): while not please_stop: line = self.process.stdout.pop(till=please_stop) DEBUG and logger.info("stdout got {line}", line=line) - if line == THREAD_STOP: + if line == PLEASE_STOP: please_stop.go() break elif not line: @@ -117,7 +117,7 @@ def _watch_stderr(self, please_stop): try: line = self.process.stderr.pop(till=please_stop) DEBUG and logger.info("stderr got {line}", line=line) - if line is None or line == THREAD_STOP: + if line is None or line == PLEASE_STOP: please_stop.go() break logger.info( diff --git a/mo_threads/queues.py b/mo_threads/queues.py index 6a1f88a..8b1928f 100644 --- a/mo_threads/queues.py +++ b/mo_threads/queues.py @@ -23,7 +23,7 @@ from mo_threads.lock import Lock from mo_threads.signals import Signal -from mo_threads.threads import THREAD_STOP, THREAD_TIMEOUT, Thread +from mo_threads.threads import PLEASE_STOP, THREAD_TIMEOUT, Thread from mo_threads.till import Till DEBUG = False @@ -63,7 +63,7 @@ def __iter__(self): try: while True: value = self.pop() - if value is THREAD_STOP: + if value is PLEASE_STOP: break if value is not None: yield value @@ -78,7 +78,7 @@ def add(self, value, timeout=None, force=False): :return: self """ with self.lock: - if value is THREAD_STOP: + if value is PLEASE_STOP: # INSIDE THE lock SO THAT EXITING WILL RELEASE wait() self.queue.append(value) self.closed.go() @@ -139,14 +139,14 @@ def extend(self, values): if not self.closed: if self.unique: for v in values: - if v is THREAD_STOP: + if v is PLEASE_STOP: self.closed.go() continue if v not in self.queue: self.queue.append(v) else: for v in values: - if v is THREAD_STOP: + if v is PLEASE_STOP: self.closed.go() continue self.queue.append(v) @@ -192,16 +192,16 @@ def __len__(self): def __nonzero__(self): with self.lock: - return any(r != THREAD_STOP for r in self.queue) + return any(r != PLEASE_STOP for r in self.queue) def pop(self, till=None): """ WAIT FOR NEXT ITEM ON THE QUEUE - RETURN THREAD_STOP IF QUEUE IS CLOSED + RETURN PLEASE_STOP IF QUEUE IS CLOSED RETURN None IF till IS REACHED AND QUEUE IS STILL EMPTY :param till: A `Signal` to stop waiting and return None - :return: A value, or a THREAD_STOP or None + :return: A value, or a PLEASE_STOP or None """ if till is not None and not isinstance(till, Signal): logger.error("expecting a signal") @@ -217,7 +217,7 @@ def pop(self, till=None): break return None (DEBUG or not self.silent) and logger.info("{name} queue closed", name=self.name, stack_depth=1) - return THREAD_STOP + return PLEASE_STOP def pop_all(self): """ @@ -235,12 +235,12 @@ def pop_one(self): """ with self.lock: if self.closed: - return THREAD_STOP + return PLEASE_STOP elif not self.queue: return None else: v = self.queue.popleft() - if v is THREAD_STOP: # SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION + if v is PLEASE_STOP: # SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION self.closed.go() return v @@ -279,7 +279,7 @@ def __iter__(self): try: while True: value = self.pop(self.closed) - if value is THREAD_STOP: + if value is PLEASE_STOP: break if value is not None: yield value @@ -291,7 +291,7 @@ def __iter__(self): def add(self, value, timeout=None, priority=0): with self.lock: - if value is THREAD_STOP: + if value is PLEASE_STOP: # INSIDE THE lock SO THAT EXITING WILL RELEASE wait() self.queue[priority].queue.append(value) self.closed.go() @@ -327,7 +327,7 @@ def __len__(self): def __nonzero__(self): with self.lock: - return any(any(r != THREAD_STOP for r in q.queue) for q in self.queue) + return any(any(r != PLEASE_STOP for r in q.queue) for q in self.queue) def highest_entry(self): for count, q in enumerate(self.queue): @@ -338,11 +338,11 @@ def highest_entry(self): def pop(self, till=None, priority=None): """ WAIT FOR NEXT ITEM ON THE QUEUE - RETURN THREAD_STOP IF QUEUE IS CLOSED + RETURN PLEASE_STOP IF QUEUE IS CLOSED RETURN None IF till IS REACHED AND QUEUE IS STILL EMPTY :param till: A `Signal` to stop waiting and return None - :return: A value, or a THREAD_STOP or None + :return: A value, or a PLEASE_STOP or None """ if till is not None and not isinstance(till, Signal): logger.error("expecting a signal") @@ -361,7 +361,7 @@ def pop(self, till=None, priority=None): break return None (DEBUG or not self.silent) and logger.info(self.name + " queue stopped") - return THREAD_STOP + return PLEASE_STOP def pop_all(self, priority=None): """ @@ -396,12 +396,12 @@ def pop_one(self, priority=None): if not priority: priority = self.highest_entry() if self.closed: - return [THREAD_STOP] + return [PLEASE_STOP] elif not self.queue: return None else: v = self.pop(priority=priority) - if v is THREAD_STOP: # SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION + if v is PLEASE_STOP: # SENDING A STOP INTO THE QUEUE IS ALSO AN OPTION self.closed.go() return v @@ -441,7 +441,7 @@ def __init__( ) def worker_bee(self, batch_size, period, error_target, please_stop): - please_stop.then(lambda: self.add(THREAD_STOP)) + please_stop.then(lambda: self.add(PLEASE_STOP)) _buffer = [] _post_push_functions = [] @@ -470,7 +470,7 @@ def push_to_queue(): item = self.pop(till=next_push) now = time() - if item is THREAD_STOP: + if item is PLEASE_STOP: push_to_queue() please_stop.go() break @@ -516,7 +516,7 @@ def push_to_queue(): if _buffer: # ONE LAST PUSH, DO NOT HAVE TIME TO DEAL WITH ERRORS push_to_queue() - self.slow_queue.add(THREAD_STOP) + self.slow_queue.add(PLEASE_STOP) def add_child(self, child): pass @@ -542,12 +542,12 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.add(THREAD_STOP) + self.add(PLEASE_STOP) if isinstance(exc_val, BaseException): self.thread.please_stop.go() self.thread.join() def stop(self): - self.add(THREAD_STOP) + self.add(PLEASE_STOP) self.thread.join() return self diff --git a/mo_threads/threads.py b/mo_threads/threads.py index b26162b..054fa76 100644 --- a/mo_threads/threads.py +++ b/mo_threads/threads.py @@ -41,7 +41,6 @@ PARENT_THREAD = "parent_thread" # OPTIONAL PARAMETER TO ASSIGN THREAD TO SOMETHING OTHER THAN CURRENT THREAD MAX_DATETIME = datetime(2286, 11, 20, 17, 46, 39) DEFAULT_WAIT_TIME = timedelta(minutes=10) -THREAD_STOP = "stop" THREAD_TIMEOUT = "Thread {name} timeout" COVERAGE_COLLECTOR = None # Detect Coverage.py diff --git a/tests/test_locks.py b/tests/test_locks.py index 20a6825..0c9d2bd 100644 --- a/tests/test_locks.py +++ b/tests/test_locks.py @@ -26,7 +26,7 @@ from mo_times.timer import Timer import mo_threads -from mo_threads import Lock, THREAD_STOP, Signal, Thread, ThreadedQueue, Till +from mo_threads import Lock, PLEASE_STOP, Signal, Thread, ThreadedQueue, Till from mo_threads.busy_lock import BusyLock from mo_threads.signals import OrSignal from mo_threads.threads import ALL, ALL_LOCK, start_main_thread @@ -105,7 +105,7 @@ def _test_queue_speed(self, test=False): def empty(please_stop): while not please_stop: item = slow.pop() - if item is THREAD_STOP: + if item is PLEASE_STOP: break done.go() @@ -116,7 +116,7 @@ def empty(please_stop): with timer: for i in range(SCALE): q.add(i) - q.add(THREAD_STOP) + q.add(PLEASE_STOP) logger.info("Done insert") done.wait() From 388be55984492c2895773a87b712f0bbd06e3e7e Mon Sep 17 00:00:00 2001 From: Kyle Lahnakoski Date: Sat, 18 May 2024 02:44:44 -0400 Subject: [PATCH 2/3] update version number --- packaging/setup.py | 4 ++-- packaging/setuptools.json | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packaging/setup.py b/packaging/setup.py index 6d7f3ea..1a47197 100644 --- a/packaging/setup.py +++ b/packaging/setup.py @@ -8,13 +8,13 @@ description='More Threads! Simpler and faster threading.', extras_require={"tests":["mo-testing>=7.587.24110","jx-python>=4.586.24095","psutil>=5.9.8","objgraph>=3.6.1","mo-files>=6.585.24095","mo-json>=6.584.24095"]}, include_package_data=True, - install_requires=["mo-dots==10.623.24125","mo-future==7.584.24095","mo-logs==8.623.24125","mo-math==7.623.24125","mo-times==5.623.24125"], + install_requires=["mo-dots==10.632.24139","mo-future==7.584.24095","mo-logs==8.632.24139","mo-math==7.632.24139","mo-times==5.632.24139"], license='MPL 2.0', long_description='\n# More Threads!\n\n\n|Branch | Status |\n|------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n|master | [![Build Status](https://github.com/klahnakoski/mo-threads/actions/workflows/build.yml/badge.svg?branch=master)](https://github.com/klahnakoski/mo-threads/actions/workflows/build.yml) |\n|dev | [![Build Status](https://app.travis-ci.com/klahnakoski/mo-threads.svg?branch=dev)](https://travis-ci.com/github/klahnakoski/mo-threads) [![Coverage Status](https://coveralls.io/repos/github/klahnakoski/mo-threads/badge.svg?branch=dev)](https://coveralls.io/github/klahnakoski/mo-threads?branch=dev) ← child thread coverage is missing |\n\n## Module `threads`\n\nThe main benefits over Python\'s threading library is:\n\n1. **Multi-threaded queues do not use serialization** - Serialization is \ngreat in the general case, where you may also be communicating between \nprocesses, but it is a needless overhead for single-process multi-threading. \nIt is left to the programmer to ensure the messages put on the queue are \nnot changed, which is not ominous demand.\n2. **Shutdown order is deterministic and explicit** - Python\'s threading \nlibrary is missing strict conventions for controlled and orderly shutdown. \nEach thread can shutdown on its own terms, but is expected to do so expediently.\n * All threads are required to accept a `please_stop` signal; are \n expected to test it in a timely manner; and expected to exit when signalled.\n * All threads have a parent - The parent is responsible for ensuring their children get the `please_stop` signal, and are dead, before stopping themselves. This responsibility is baked into the thread spawning process, \n so you need not deal with it unless you want.\n3. Uses [**Signals**](#signal-class) to simplify logical \ndependencies among multiple threads, events, and timeouts.\n4. **Logging and Profiling is Integrated** - Logging and exception handling \nis seamlessly integrated: This means logs are centrally handled, and thread \nsafe. Parent threads have access to uncaught child thread exceptions, and \nthe cProfiler properly aggregates results from the multiple threads.\n\n\n### What\'s it used for\n\nA good amount of time is spent waiting for underlying C libraries and OS\nservices to respond to network and file access requests. Multiple\nthreads can make your code faster, despite the GIL, when dealing with those\nrequests. For example, by moving logging off the main thread, we can get\nup to 15% increase in overall speed because we no longer have the main thread\nwaiting for disk writes or remote logging posts. Please note, this level of\nspeed improvement can only be realized if there is no serialization happening\nat the multi-threaded queue. \n\n### Do not use Async\n\n[Actors](http://en.wikipedia.org/wiki/Actor_model) are easier to reason about than [async tasks](https://docs.python.org/3/library/asyncio-task.html). Mixing regular methods and co-routines (with their `yield from` pollution) is dangerous because:\n\n1. calling styles between synchronous and asynchronous methods can be easily confused\n2. actors can use blocking methods, async can not\n3. there is no way to manage resource priority with co-routines.\n4. stack traces are lost with co-routines\n5. async scope easily escapes lexical scope, which promotes bugs \n\nPython\'s async efforts are still immature; a re-invention of threading functionality by another name. Expect to experience a decade of problems that are already solved by threading; [here is an example](https://www.python.org/dev/peps/pep-0550/). \n\n## Reading\n\n* Fibers were an async experiment using a stack, as opposed to the state-machine-based async Python uses now. It does not apply to my argument, but is an interesting read: [[Fibers are] not an appropriate solution for writing scalable concurrent software](http://www.open-std.org/JTC1/SC22/WG21/docs/papers/2018/p1364r0.pdf)\n\n\n## Writing threaded functions\n\nAll threaded functions must accept a `please_stop` parameter, which is a `Signal` to indicate the desire to stop. The function should check this signal often, and do it\'s best to promptly return. \n\n#### Simple work loop\n\nFor threaded functions that perform small chunks of work in some loop; the chunks are small enough that they will complete soon enough. Simply check the `please_stop` signal at the start of each loop:\n\n def worker(p1, p2, please_stop):\n while not please_stop:\n do_some_small_chunk_of_work(p1)\n \n#### One-time work\n \nSometimes, threads are launched to perform low priority, one-time work. You may not need to check the `please_stop` signal: \n \n def worker(p1, p2, please_stop):\n do_some_work_and_exit(p1, p2)\n\n#### Passing signals to others\n \nThere are many times a more complex `please_stop` checks are required. For example, we want to wait on an input queue for the next task. Many of the methods in `mo-threads` accept `Signals` instead of timeouts so they return quickly when signalled. You may pass the `please_stop` signal to these methods, or make your own. Be sure to check if the method returned because it is done, or it returned because it was signaled to stop early.\n \n```python \ndef worker(source, please_stop):\n while not please_stop:\n data = source.pop(till=please_stop)\n if please_stop: # did pop() return because of please_stop?\n return\n chunk_of_work(data)\n```\n\n#### Combining signals\n \nWork might be done on some regular interval: The threaded function will sleep for a period and perform some work. In these cases you can combine Signals and `wait()` on either of them:\n\n```python \ndef worker(please_stop):\n while not please_stop:\n next_event = Till(seconds=1000)\n (next_event | please_stop).wait()\n if please_stop: # is wait done because of please_stop?\n return\n chunk_of_work()\n```\n\n## Spawning threads\n\nMost threads will be declared and run in a single line. It is much like Python\'s threading library, except it demands a name for the thread: \n\n thread = Thread.run("name", function, p1, p2, ...)\n \nSometimes you want to separate creation from starting:\n\n thread = Thread("name", function, p1, p2, ...)\n thread.start()\n \n \n### `join()` vs `release()`\n\nOnce a thread is created, a few actions can be performed with the thread object:\n\n* `join()` - Join on `thread` will make the caller thread wait until `thread` has stopped. Then, return the resulting value or to re-raise `thread`\'s exception in the caller.\n\n result = thread.join() # may raise exception\n\n* `release()` - Will ignore any return value, and post any exception to logging. Tracking is still performed; released threads are still properly stopped. You may still `join()` to guarantee the caller will wait for thread completion, but you risk being too late: The thread may have already completed and logged it\'s failure.\n\n thread.release() # release thread resources asap, when done\n \n* `stopped.wait()` - Every thread has a `stopped` Signal, which can be used for coordination by other threads. This allows a thread to wait for another to be complete and then resume. No errors or return values are captured\n\n thread.stopped.wait()\n \n### Registering Threads\n\nThreads created without this module can call your code; You want to ensure these "alien" threads have finished their work, released the locks, and exited your code before stopping. If you register alien threads, then `mo-threads` will ensure the alien work is done for a clean stop. \n\n def my_method():\n with RegisterThread():\n t = Thread.current() # we can now use mo-threads on this thread \n print(t.name) # a name is always given to the alien thread \n\n\n## Synchronization Primitives\n\nThere are three major aspects of a synchronization primitive:\n\n* **Resource** - Monitors and locks can only be owned by one thread at a time\n* **Binary** - The primitive has only two states\n* **Irreversible** - The state of the primitive can only be set, or advanced, never reversed\n\nThe last, *irreversibility* is very useful, but ignored in many threading\nlibraries. The irreversibility allows us to model progression; and\nwe can allow threads to poll for progress, or be notified of progress. \n\nThese three aspects can be combined to give us 8 synchronization primitives:\n\n* `- - -` - Semaphore\n* `- B -` - Event\n* `R - -` - Monitor\n* `R B -` - **[Lock](#lock-class)**\n* `- - I` - Iterator/generator\n* `- B I` - **[Signal](#signal-class)** (or Promise)\n* `R - I` - Private Iterator \n* `R B I` - Private Signal (best implemented as `is_done` Boolean flag)\n\n## `Lock` Class\n\nLocks are identical to [threading monitors](https://en.wikipedia.org/wiki/Monitor_(synchronization)), except for two differences: \n\n1. The `wait()` method will **always acquire the lock before returning**. This is an important feature, it ensures every line inside a `with` block has lock acquisition, and is easier to reason about.\n2. Exiting a lock via `__exit__()` will **always** signal a waiting thread to resume. This ensures no signals are missed, and every thread gets an opportunity to react to possible change.\n3. `Lock` is **not reentrant**! This is a feature to ensure locks are not held for long periods of time.\n\n**Example**\n```python\nlock = Lock()\nwhile not please_stop:\n with lock:\n while not todo:\n lock.wait(seconds=1)\n # DO SOME WORK\n```\nIn this example, we look for stuff `todo`, and if there is none, we wait for a second. During that time others can acquire the `lock` and add `todo` items. Upon releasing the the `lock`, our example code will immediately resume to see what\'s available, waiting again if nothing is found.\n\n\n## `Signal` Class\n\n[The `Signal` class](mo_threads/signals.py) is a binary semaphore that can be signalled only once; subsequent signals have no effect.\n * It can be signalled by any thread; \n * any thread can wait on a `Signal`; and \n * once signalled, all waiting threads are unblocked, including all subsequent waiting threads. \n * A Signal\'s current state can be accessed by any thread without blocking.\n \n`Signal` is used to model thread-safe state advancement. It initializes to `False`, and when signalled (with `go()`) becomes `True`. It can not be reversed. \n\nSignals are like a Promise, but more explicit \n\n| Signal | Promise | Python Event |\n|:------------:|:------------------:|:------------:|\n| s.go() | p.resolve() | e.set() |\n| s.then(f) | p.then(m) | |\n| s.wait() | await p | e.wait() |\n| bool(s) | | e.is_set() |\n| s & t | Promise.all(p, q) | |\n| s | t | Promise.race(p, q) | |\n\n\nHere is simple worker that signals when work is done. It is assumed `do_work` is executed by some other thread.\n\n```python\nclass Worker:\n def __init__(self):\n self.is_done = Signal()\n \n def do_work(self):\n do_some_work()\n self.is_done.go()\n```\n\nYou can attach methods to a `Signal`, which will be run, just once, upon `go()`. If already signalled, then the method is run immediately.\n\n```python\nworker = Worker()\nworker.is_done.then(lambda: print("done"))\n```\n\nYou may also wait on a `Signal`, which will block the current thread until the `Signal` is a go\n\n```python\nworker.is_done.wait()\nprint("worker thread is done")\n```\n\n`Signals` are first class, they can be passed around and combined with other Signals. For example, using the `__or__` operator (`|`): `either = lhs | rhs`; `either` will be triggered when `lhs` or `rhs` is triggered.\n\n```python\ndef worker(please_stop):\n while not please_stop:\n #DO WORK \n\nuser_cancel = Signal("user cancel")\n...\nworker(user_cancel | Till(seconds=360))\n```\n\n`Signal`s can also be combined using logical and (`&`): `both = lhs & rhs`; `both` is triggered only when both `lhs` and `rhs` are triggered:\n\n```python\n(workerA.stopped & workerB.stopped).wait()\nprint("both threads are done")\n```\n\n### Differences from Python\'s `Event`\n\n* `Signal` is not reversable, while `Event` has a `clear()` method\n* `Signal` allows function chaining using the `then` method\n* Complex signals can be composed from simple signals using boolean logic \n\n\n\n## `Till` Class\n\n[The `Till` class](mo-threads/till.py) (short for "until") is a special `Signal` used to represent timeouts. \n\n```python\nTill(seconds=20).wait()\nTill(till=Date("21 Jan 2016").unix).wait()\n```\n\nUse `Till` rather than `sleep()` because you can combine `Till` objects with other `Signals`. \n\n**Beware that all `Till` objects will be triggered before expiry when the main thread is asked to shutdown**\n\n\n## `Command` Class\n\nIf you find process creation is too slow, the `Command` class can be used to recycle existing processes. It has the same interface as `Process`, yet it manages a `bash` (or `cmd.exe`) session for you in the background.\n\n ', long_description_content_type='text/markdown', name='mo-threads', packages=["mo_threads"], url='https://github.com/klahnakoski/mo-threads', - version='6.624.24125', + version='6.634.24139', zip_safe=False ) \ No newline at end of file diff --git a/packaging/setuptools.json b/packaging/setuptools.json index dd226ac..5c2e63d 100644 --- a/packaging/setuptools.json +++ b/packaging/setuptools.json @@ -19,8 +19,8 @@ ]}, "include_package_data": true, "install_requires": [ - "mo-dots==10.623.24125", "mo-future==7.584.24095", "mo-logs==8.623.24125", - "mo-math==7.623.24125", "mo-times==5.623.24125" + "mo-dots==10.632.24139", "mo-future==7.584.24095", "mo-logs==8.632.24139", + "mo-math==7.632.24139", "mo-times==5.632.24139" ], "license": "MPL 2.0", "long_description": { @@ -314,6 +314,6 @@ "name": "mo-threads", "packages": ["mo_threads"], "url": "https://github.com/klahnakoski/mo-threads", - "version": "6.624.24125", + "version": "6.634.24139", "zip_safe": false } \ No newline at end of file From 2fee088a78c72fd8c1bce815bc2b9380a7e46c6c Mon Sep 17 00:00:00 2001 From: Kyle Lahnakoski Date: Sat, 18 May 2024 02:48:02 -0400 Subject: [PATCH 3/3] update lockfile --- tests/requirements.lock | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/requirements.lock b/tests/requirements.lock index 73501e3..084a533 100644 --- a/tests/requirements.lock +++ b/tests/requirements.lock @@ -1,4 +1,4 @@ -# Tests pass with python 3.8 on 2024-05-04 and with these versions +# Tests pass with python 3.8 on 2024-05-18 and with these versions # pip install --no-deps -r tests/requirements.lock certifi==2024.2.2 charset-normalizer==3.3.2 @@ -6,17 +6,17 @@ hjson==3.1.0 idna==3.6 jx-python==4.586.24095 mo-collections==5.584.24095 -mo-dots==10.623.24125 +mo-dots==10.632.24139 mo-files==6.585.24095 mo-future==7.584.24095 mo-imports==7.584.24095 mo-json==6.584.24095 mo-json-config==4.586.24095 -mo-kwargs==7.623.24125 -mo-logs==8.623.24125 -mo-math==7.623.24125 +mo-kwargs==7.632.24139 +mo-logs==8.632.24139 +mo-math==7.632.24139 mo-testing==7.587.24110 -mo-times==5.623.24125 +mo-times==5.632.24139 objgraph==3.6.1 psutil==5.9.8 pytz==2024.1