From 63110b0cb6098472029f16ce66667a96a97034c5 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Thu, 19 Dec 2024 19:39:32 -0400 Subject: [PATCH] Handle case when executor is shutdow Avoid exception: Traceback (most recent call last): File "lib/python3.13/threading.py", line 1041, in _bootstrap_inner self.run() ~~~~~~~~^^ File "cassandra/cluster.py", line 4429, in run future = self._executor.submit(fn, *args, **kwargs) File "lib/python3.13/concurrent/futures/thread.py", line 171, in submit Error: raise RuntimeError('cannot schedule new futures after shutdown') RuntimeError: cannot schedule new futures after shutdown --- cassandra/cluster.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 14c3074e8..51ee35737 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -208,8 +208,9 @@ def new_f(self, *args, **kwargs): try: future = self.executor.submit(f, self, *args, **kwargs) future.add_done_callback(_future_completed) - except Exception: - log.exception("Failed to submit task to executor") + except Exception as e: + if not _is_executor_shutdown_exception(e): + log.exception("Failed to submit task to executor") return new_f @@ -217,6 +218,10 @@ def new_f(self, *args, **kwargs): _clusters_for_shutdown = set() +def _is_executor_shutdown_exception(e: Exception) -> bool: + return 'cannot schedule new futures after shutdown' in str(e) + + def _register_cluster_shutdown(cluster): _clusters_for_shutdown.add(cluster) @@ -3482,7 +3487,12 @@ def encode(val): def submit(self, fn, *args, **kwargs): """ Internal """ if not self.is_shutdown: - return self.cluster.executor.submit(fn, *args, **kwargs) + try: + return self.cluster.executor.submit(fn, *args, **kwargs) + except RuntimeError as e: + if _is_executor_shutdown_exception(e): + return None + raise def get_pool_state(self): return dict((host, pool.get_state()) for host, pool in tuple(self._pools.items())) @@ -3820,7 +3830,11 @@ def _get_and_set_reconnection_handler(self, new_handler): def _submit(self, *args, **kwargs): try: if not self._cluster.is_shutdown: - return self._cluster.executor.submit(*args, **kwargs) + try: + return self._cluster.executor.submit(*args, **kwargs) + except RuntimeError as e: + if _is_executor_shutdown_exception(e): + return None except ReferenceError: pass return None @@ -4426,7 +4440,12 @@ def run(self): self._scheduled_tasks.discard(task) fn, args, kwargs = task kwargs = dict(kwargs) - future = self._executor.submit(fn, *args, **kwargs) + try: + future = self._executor.submit(fn, *args, **kwargs) + except RuntimeError as e: + if _is_executor_shutdown_exception(e): + return + raise future.add_done_callback(self._log_if_failed) else: self._queue.put_nowait((run_at, i, task))