Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log messages for CommClosedError now includes information about remote address #5209

Merged
merged 2 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ def _background_send(self):
else:
self.recent_message_log.append("large-message")
self.byte_count += nbytes
except CommClosedError as e:
logger.info("Batched Comm Closed: %s", e)
except CommClosedError:
logger.info("Batched Comm Closed %r", self.comm, exc_info=True)
break
except Exception:
# We cannot safely retry self.comm.write, as we have no idea
Expand Down Expand Up @@ -133,7 +133,7 @@ def send(self, *msgs):
This completes quickly and synchronously
"""
if self.comm is not None and self.comm.closed():
raise CommClosedError()
raise CommClosedError(f"Comm {self.comm!r} already closed.")

self.message_count += len(msgs)
self.buffer.extend(msgs)
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def on_connection(self, comm: Comm, handshake_overrides=None):
except Exception as e:
with suppress(Exception):
await comm.close()
raise CommClosedError() from e
raise CommClosedError(f"Comm {comm!r} closed.") from e

comm.remote_info = handshake
comm.remote_info["address"] = comm._peer_addr
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async def _handle_stream(self, stream, address):
try:
await self.on_connection(comm)
except CommClosedError:
logger.info("Connection closed before handshake completed")
logger.info("Connection from %s closed before handshake completed", address)
return

await self.comm_handler(comm)
Expand Down
18 changes: 13 additions & 5 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,9 @@ async def handle_comm(self, comm):
result = asyncio.ensure_future(result)
self._ongoing_coroutines.add(result)
result = await result
except (CommClosedError, CancelledError) as e:
except (CommClosedError, CancelledError):
if self.status == Status.running:
logger.info("Lost connection to %r: %s", address, e)
logger.info("Lost connection to %r", address, exc_info=True)
break
except Exception as e:
logger.exception("Exception while handling op %s", op)
Expand Down Expand Up @@ -791,12 +791,20 @@ async def send_recv_from_rpc(**kwargs):
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
comm = None
try:
comm = await self.live_comm()
comm.name = "rpc." + key
result = await send_recv(comm=comm, op=key, **kwargs)
except (RPCClosed, CommClosedError) as e:
raise e.__class__(f"{e}: while trying to call remote method {key!r}")
if comm:
raise type(e)(
f"Exception while trying to call remote method {key!r} before comm was established."
) from e
else:
raise type(e)(
f"Exception while trying to call remote method {key!r} using comm {comm!r}."
) from e

self.comms[comm] = True # mark as open
return result
Expand Down Expand Up @@ -1020,7 +1028,7 @@ async def connect(self, addr, timeout=None):
try:
if self.status != Status.running:
raise CommClosedError(
f"ConnectionPool not running. Status: {self.status}"
f"ConnectionPool not running. Status: {self.status}"
)

fut = asyncio.ensure_future(
Expand All @@ -1044,7 +1052,7 @@ async def connect(self, addr, timeout=None):
except asyncio.CancelledError as exc:
self.semaphore.release()
raise CommClosedError(
f"ConnectionPool not running. Status: {self.status}"
f"ConnectionPool not running. Status: {self.status}"
) from exc
except Exception as exc:
self.semaphore.release()
Expand Down
15 changes: 12 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5195,7 +5195,9 @@ def report(self, msg: dict, ts: TaskState = None, client: str = None):
# logger.debug("Scheduler sends message to client %s", msg)
except CommClosedError:
if self.status == Status.running:
logger.critical("Tried writing to closed comm: %s", msg)
logger.critical(
"Closed comm %r while trying to write %s", c, msg, exc_info=True
)

async def add_client(self, comm, client=None, versions=None):
"""Add client to network
Expand Down Expand Up @@ -5496,7 +5498,9 @@ def client_send(self, client, msg):
c.send(msg)
except CommClosedError:
if self.status == Status.running:
logger.critical("Tried writing to closed comm: %s", msg)
logger.critical(
"Closed comm %r while trying to write %s", c, msg, exc_info=True
)

def send_all(self, client_msgs: dict, worker_msgs: dict):
"""Send messages to client and workers"""
Expand All @@ -5512,7 +5516,12 @@ def send_all(self, client_msgs: dict, worker_msgs: dict):
c.send(*msgs)
except CommClosedError:
if self.status == Status.running:
logger.critical("Tried writing to closed comm: %s", msgs)
logger.critical(
"Closed comm %r while trying to write %s",
c,
msgs,
exc_info=True,
)

for worker, msgs in worker_msgs.items():
try:
Expand Down
2 changes: 1 addition & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def move_task_request(self, ts, victim, thief):
self.in_flight_occupancy[victim] -= victim_duration
self.in_flight_occupancy[thief] += thief_duration
except CommClosedError:
logger.info("Worker comm closed while stealing: %s", victim)
logger.info("Worker comm %r closed while stealing: %r", victim, ts)
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ async def connect_to_server():
close_fut = asyncio.create_task(pool.close())

with pytest.raises(
CommClosedError, match="ConnectionPool not running. Status: Status.closed"
CommClosedError, match="ConnectionPool not running. Status: Status.closed"
):
await asyncio.gather(*tasks)

Expand Down
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ async def heartbeat(self):
self.bandwidth_workers.clear()
self.bandwidth_types.clear()
except CommClosedError:
logger.warning("Heartbeat to scheduler failed")
logger.warning("Heartbeat to scheduler failed", exc_info=True)
if not self.reconnect:
await self.close(report=False)
except OSError as e:
Expand Down