Skip to content

Commit

Permalink
Handle case when executor is shutdow
Browse files Browse the repository at this point in the history
Avoid exception:
Traceback (most recent call last):
  File "lib/python3.13/threading.py", line 1041, in _bootstrap_inner
    self.run()
    ~~~~~~~~^^
  File "cassandra/cluster.py", line 4429, in run
    future = self._executor.submit(fn, *args, **kwargs)
  File "lib/python3.13/concurrent/futures/thread.py", line 171, in submit
Error:   raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
  • Loading branch information
dkropachev committed Dec 20, 2024
1 parent 347f332 commit 63110b0
Showing 1 changed file with 24 additions and 5 deletions.
29 changes: 24 additions & 5 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,20 @@ def new_f(self, *args, **kwargs):
try:
future = self.executor.submit(f, self, *args, **kwargs)
future.add_done_callback(_future_completed)
except Exception:
log.exception("Failed to submit task to executor")
except Exception as e:
if not _is_executor_shutdown_exception(e):
log.exception("Failed to submit task to executor")

return new_f


_clusters_for_shutdown = set()


def _is_executor_shutdown_exception(e: Exception) -> bool:
return 'cannot schedule new futures after shutdown' in str(e)


def _register_cluster_shutdown(cluster):
_clusters_for_shutdown.add(cluster)

Expand Down Expand Up @@ -3482,7 +3487,12 @@ def encode(val):
def submit(self, fn, *args, **kwargs):
""" Internal """
if not self.is_shutdown:
return self.cluster.executor.submit(fn, *args, **kwargs)
try:
return self.cluster.executor.submit(fn, *args, **kwargs)
except RuntimeError as e:
if _is_executor_shutdown_exception(e):
return None
raise

def get_pool_state(self):
return dict((host, pool.get_state()) for host, pool in tuple(self._pools.items()))
Expand Down Expand Up @@ -3820,7 +3830,11 @@ def _get_and_set_reconnection_handler(self, new_handler):
def _submit(self, *args, **kwargs):
try:
if not self._cluster.is_shutdown:
return self._cluster.executor.submit(*args, **kwargs)
try:
return self._cluster.executor.submit(*args, **kwargs)
except RuntimeError as e:
if _is_executor_shutdown_exception(e):
return None
except ReferenceError:
pass
return None
Expand Down Expand Up @@ -4426,7 +4440,12 @@ def run(self):
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)
try:
future = self._executor.submit(fn, *args, **kwargs)
except RuntimeError as e:
if _is_executor_shutdown_exception(e):
return
raise
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down

0 comments on commit 63110b0

Please sign in to comment.