Skip to content

Commit

Permalink
Improve logging messages for CommClosedErrors
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 13, 2021
1 parent 2298861 commit 9b51554
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 16 deletions.
6 changes: 3 additions & 3 deletions distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ def _background_send(self):
else:
self.recent_message_log.append("large-message")
self.byte_count += nbytes
except CommClosedError as e:
logger.info("Batched Comm Closed: %s", e)
except CommClosedError:
logger.info("Batched Comm Closed %r", self.comm, exc_info=True)
break
except Exception:
# We cannot safely retry self.comm.write, as we have no idea
Expand Down Expand Up @@ -133,7 +133,7 @@ def send(self, *msgs):
This completes quickly and synchronously
"""
if self.comm is not None and self.comm.closed():
raise CommClosedError()
raise CommClosedError(f"Comm {repr(self.comm)} already closed.")

self.message_count += len(msgs)
self.buffer.extend(msgs)
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def on_connection(self, comm: Comm, handshake_overrides=None):
except Exception as e:
with suppress(Exception):
await comm.close()
raise CommClosedError() from e
raise CommClosedError(f"Comm {comm!r} closed.") from e

comm.remote_info = handshake
comm.remote_info["address"] = comm._peer_addr
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async def _handle_stream(self, stream, address):
try:
await self.on_connection(comm)
except CommClosedError:
logger.info("Connection closed before handshake completed")
logger.info("Connection from %s closed before handshake completed", address)
return

await self.comm_handler(comm)
Expand Down
18 changes: 13 additions & 5 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,9 @@ async def handle_comm(self, comm):
result = asyncio.ensure_future(result)
self._ongoing_coroutines.add(result)
result = await result
except (CommClosedError, CancelledError) as e:
except (CommClosedError, CancelledError):
if self.status == Status.running:
logger.info("Lost connection to %r: %s", address, e)
logger.info("Lost connection %r", address, exc_info=True)
break
except Exception as e:
logger.exception("Exception while handling op %s", op)
Expand Down Expand Up @@ -791,12 +791,20 @@ async def send_recv_from_rpc(**kwargs):
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
comm = None
try:
comm = await self.live_comm()
comm.name = "rpc." + key
result = await send_recv(comm=comm, op=key, **kwargs)
except (RPCClosed, CommClosedError) as e:
raise e.__class__(f"{e}: while trying to call remote method {key!r}")
if comm:
raise type(e)(
f"Exception while trying to call remote method {key!r} before comm was established."
) from e
else:
raise type(e)(
f"Exception while trying to call remote method {key!r} using comm {comm!r}."
) from e

self.comms[comm] = True # mark as open
return result
Expand Down Expand Up @@ -1020,7 +1028,7 @@ async def connect(self, addr, timeout=None):
try:
if self.status != Status.running:
raise CommClosedError(
f"ConnectionPool not running. Status: {self.status}"
f"ConnectionPool not running. Status: {self.status}"
)

fut = asyncio.ensure_future(
Expand All @@ -1044,7 +1052,7 @@ async def connect(self, addr, timeout=None):
except asyncio.CancelledError as exc:
self.semaphore.release()
raise CommClosedError(
f"ConnectionPool not running. Status: {self.status}"
f"ConnectionPool not running. Status: {self.status}"
) from exc
except Exception as exc:
self.semaphore.release()
Expand Down
15 changes: 12 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5195,7 +5195,9 @@ def report(self, msg: dict, ts: TaskState = None, client: str = None):
# logger.debug("Scheduler sends message to client %s", msg)
except CommClosedError:
if self.status == Status.running:
logger.critical("Tried writing to closed comm: %s", msg)
logger.critical(
"Closed comm %r while trying to write %s", c, msg, exc_info=True
)

async def add_client(self, comm, client=None, versions=None):
"""Add client to network
Expand Down Expand Up @@ -5496,7 +5498,9 @@ def client_send(self, client, msg):
c.send(msg)
except CommClosedError:
if self.status == Status.running:
logger.critical("Tried writing to closed comm: %s", msg)
logger.critical(
"Closed comm %r while trying to write %s", c, msg, exc_info=True
)

def send_all(self, client_msgs: dict, worker_msgs: dict):
"""Send messages to client and workers"""
Expand All @@ -5512,7 +5516,12 @@ def send_all(self, client_msgs: dict, worker_msgs: dict):
c.send(*msgs)
except CommClosedError:
if self.status == Status.running:
logger.critical("Tried writing to closed comm: %s", msgs)
logger.critical(
"Closed comm %r while trying to write %s",
c,
msgs,
exc_info=True,
)

for worker, msgs in worker_msgs.items():
try:
Expand Down
2 changes: 1 addition & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def move_task_request(self, ts, victim, thief):
self.in_flight_occupancy[victim] -= victim_duration
self.in_flight_occupancy[thief] += thief_duration
except CommClosedError:
logger.info("Worker comm closed while stealing: %s", victim)
logger.info("Worker comm %r closed while stealing: %r", victim, ts)
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ async def connect_to_server():
close_fut = asyncio.create_task(pool.close())

with pytest.raises(
CommClosedError, match="ConnectionPool not running. Status: Status.closed"
CommClosedError, match="ConnectionPool not running. Status: Status.closed"
):
await asyncio.gather(*tasks)

Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ async def heartbeat(self):
self.bandwidth_workers.clear()
self.bandwidth_types.clear()
except CommClosedError:
logger.warning("Heartbeat to scheduler failed")
logger.warning("Heartbeat to scheduler failed", exc_info=True)
if not self.reconnect:
await self.close(report=False)
except OSError as e:
Expand Down

0 comments on commit 9b51554

Please sign in to comment.