From dedf571f6c52acc92e7d06d92b9db7b399753f8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Tue, 4 Jun 2024 18:23:16 +0200 Subject: [PATCH] AsyncioConnection: fix initialize_reactor when called in event loop Previously, if executed within existing asyncio loop, driver would take the loop, assume it's not used and start it in a separate thread. Additionally, if executed outside of loop, driver would create a new one and make it default for calling thread. Those behaviors are wrong so they are changed. Now driver creates its own loop and executes it in a thread. Code that handled pid changes, which can happen when class is transferred using e.g. multiprocessing, is fixed too - previously it didn't create new thread after such transition. --- cassandra/io/asyncioreactor.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/cassandra/io/asyncioreactor.py b/cassandra/io/asyncioreactor.py index 4cf3f16d40..41b744602d 100644 --- a/cassandra/io/asyncioreactor.py +++ b/cassandra/io/asyncioreactor.py @@ -113,15 +113,17 @@ def __init__(self, *args, **kwargs): def initialize_reactor(cls): with cls._lock: if cls._pid != os.getpid(): + # This means that class was passed to another process, + # e.g. using multiprocessing. + # In such case the class instance will be different and passing + # tasks to loop thread won't work. + # To fix we need to re-initialize the class cls._loop = None + cls._loop_thread = None + cls._pid = os.getpid() if cls._loop is None: - try: - cls._loop = asyncio.get_running_loop() - except RuntimeError: - cls._loop = asyncio.new_event_loop() - asyncio.set_event_loop(cls._loop) - - if not cls._loop_thread: + assert cls._loop_thread is None + cls._loop = asyncio.new_event_loop() # daemonize so the loop will be shut down on interpreter # shutdown cls._loop_thread = Thread(target=cls._loop.run_forever,