Skip to content

Commit

Permalink
Add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
codingl2k1 committed Nov 27, 2024
1 parent 82e16fc commit 401f8ae
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
7 changes: 6 additions & 1 deletion python/xoscar/backends/communication/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,12 @@ async def connect(
client = DummyClient(local_address, dest_address, client_channel)
client._task = task
server._tasks.add(task)
task.add_done_callback(server._tasks.discard)

def _discard(t):
server._tasks.discard(t)
logger.info("Channel exit: %s", server_channel.info)

task.add_done_callback(_discard)
return client

@implements(Client.close)
Expand Down
1 change: 1 addition & 0 deletions python/xoscar/backends/communication/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ async def on_connected(self, *args, **kwargs):
await channel.close()
# Remove channel if channel exit
self._channels.discard(channel)
logger.debug("Channel exit: %s", channel.info)

@implements(Server.stop)
async def stop(self):
Expand Down
1 change: 1 addition & 0 deletions python/xoscar/backends/communication/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ async def on_connected(self, *args, **kwargs):
await channel.close()
# Remove channel if channel exit
self._channels.discard(channel)
logger.debug("Channel exit: %s", channel.info)

@implements(Server.stop)
async def stop(self):
Expand Down
17 changes: 11 additions & 6 deletions python/xoscar/backends/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ async def call(
return await self.call_with_client(client, message, wait)

async def stop(self):
logger.debug("Actor caller stop.")
try:
await asyncio.gather(*[client.close() for client in self._clients])
except (ConnectionError, ServerClosed):
Expand Down Expand Up @@ -223,14 +224,18 @@ def __getattr__(self, item):
try:
actor_caller = self._thread_local.actor_caller
except AttributeError:
thread_info = str(threading.current_thread())
logger.debug("Creating a new actor caller for thread: %s", thread_info)
actor_caller = self._thread_local.actor_caller = ActorCallerThreadLocal()
ref = self._thread_local.ref = ActorCaller._RefHolder()
# If the thread exit, we clean the related actor callers and channels.
weakref.finalize(
ref,
asyncio.run_coroutine_threadsafe,
actor_caller.stop(),
self._close_loop,
)

def _cleanup():
asyncio.run_coroutine_threadsafe(actor_caller.stop(), self._close_loop)
logger.debug(
"Clean up the actor caller due to thread exit: %s", thread_info
)

weakref.finalize(ref, _cleanup)

return getattr(actor_caller, item)

0 comments on commit 401f8ae

Please sign in to comment.