diff --git a/distributed/batched.py b/distributed/batched.py index 4e59a09e80..960f4fa828 100644 --- a/distributed/batched.py +++ b/distributed/batched.py @@ -98,8 +98,8 @@ def _background_send(self): else: self.recent_message_log.append("large-message") self.byte_count += nbytes - except CommClosedError as e: - logger.info("Batched Comm Closed: %s", e) + except CommClosedError: + logger.info("Batched Comm Closed %r", self.comm, exc_info=True) break except Exception: # We cannot safely retry self.comm.write, as we have no idea @@ -133,7 +133,7 @@ def send(self, *msgs): This completes quickly and synchronously """ if self.comm is not None and self.comm.closed(): - raise CommClosedError() + raise CommClosedError(f"Comm {self.comm!r} already closed.") self.message_count += len(msgs) self.buffer.extend(msgs) diff --git a/distributed/comm/core.py b/distributed/comm/core.py index 8c3e25cc15..31223df535 100644 --- a/distributed/comm/core.py +++ b/distributed/comm/core.py @@ -227,7 +227,7 @@ async def on_connection(self, comm: Comm, handshake_overrides=None): except Exception as e: with suppress(Exception): await comm.close() - raise CommClosedError() from e + raise CommClosedError(f"Comm {comm!r} closed.") from e comm.remote_info = handshake comm.remote_info["address"] = comm._peer_addr diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 0e66378862..b4a2d39b2d 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -504,7 +504,7 @@ async def _handle_stream(self, stream, address): try: await self.on_connection(comm) except CommClosedError: - logger.info("Connection closed before handshake completed") + logger.info("Connection from %s closed before handshake completed", address) return await self.comm_handler(comm) diff --git a/distributed/core.py b/distributed/core.py index 47fe40a13a..9c5c49dd3f 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -500,9 +500,9 @@ async def handle_comm(self, comm): result = asyncio.ensure_future(result) self._ongoing_coroutines.add(result) result = await result - except (CommClosedError, CancelledError) as e: + except (CommClosedError, CancelledError): if self.status == Status.running: - logger.info("Lost connection to %r: %s", address, e) + logger.info("Lost connection to %r", address, exc_info=True) break except Exception as e: logger.exception("Exception while handling op %s", op) @@ -791,12 +791,20 @@ async def send_recv_from_rpc(**kwargs): kwargs["serializers"] = self.serializers if self.deserializers is not None and kwargs.get("deserializers") is None: kwargs["deserializers"] = self.deserializers + comm = None try: comm = await self.live_comm() comm.name = "rpc." + key result = await send_recv(comm=comm, op=key, **kwargs) except (RPCClosed, CommClosedError) as e: - raise e.__class__(f"{e}: while trying to call remote method {key!r}") + if comm: + raise type(e)( + f"Exception while trying to call remote method {key!r} before comm was established." + ) from e + else: + raise type(e)( + f"Exception while trying to call remote method {key!r} using comm {comm!r}." + ) from e self.comms[comm] = True # mark as open return result @@ -1020,7 +1028,7 @@ async def connect(self, addr, timeout=None): try: if self.status != Status.running: raise CommClosedError( - f"ConnectionPool not running. Status: {self.status}" + f"ConnectionPool not running. Status: {self.status}" ) fut = asyncio.ensure_future( @@ -1044,7 +1052,7 @@ async def connect(self, addr, timeout=None): except asyncio.CancelledError as exc: self.semaphore.release() raise CommClosedError( - f"ConnectionPool not running. Status: {self.status}" + f"ConnectionPool not running. Status: {self.status}" ) from exc except Exception as exc: self.semaphore.release() diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ec17b96f5f..6a5d704568 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5195,7 +5195,9 @@ def report(self, msg: dict, ts: TaskState = None, client: str = None): # logger.debug("Scheduler sends message to client %s", msg) except CommClosedError: if self.status == Status.running: - logger.critical("Tried writing to closed comm: %s", msg) + logger.critical( + "Closed comm %r while trying to write %s", c, msg, exc_info=True + ) async def add_client(self, comm, client=None, versions=None): """Add client to network @@ -5496,7 +5498,9 @@ def client_send(self, client, msg): c.send(msg) except CommClosedError: if self.status == Status.running: - logger.critical("Tried writing to closed comm: %s", msg) + logger.critical( + "Closed comm %r while trying to write %s", c, msg, exc_info=True + ) def send_all(self, client_msgs: dict, worker_msgs: dict): """Send messages to client and workers""" @@ -5512,7 +5516,12 @@ def send_all(self, client_msgs: dict, worker_msgs: dict): c.send(*msgs) except CommClosedError: if self.status == Status.running: - logger.critical("Tried writing to closed comm: %s", msgs) + logger.critical( + "Closed comm %r while trying to write %s", + c, + msgs, + exc_info=True, + ) for worker, msgs in worker_msgs.items(): try: diff --git a/distributed/stealing.py b/distributed/stealing.py index e3398b4c9a..1c45e1cfb9 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -177,7 +177,7 @@ def move_task_request(self, ts, victim, thief): self.in_flight_occupancy[victim] -= victim_duration self.in_flight_occupancy[thief] += thief_duration except CommClosedError: - logger.info("Worker comm closed while stealing: %s", victim) + logger.info("Worker comm %r closed while stealing: %r", victim, ts) except Exception as e: logger.exception(e) if LOG_PDB: diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 85c4550e46..78e5b7cc5a 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -648,7 +648,7 @@ async def connect_to_server(): close_fut = asyncio.create_task(pool.close()) with pytest.raises( - CommClosedError, match="ConnectionPool not running. Status: Status.closed" + CommClosedError, match="ConnectionPool not running. Status: Status.closed" ): await asyncio.gather(*tasks) diff --git a/distributed/worker.py b/distributed/worker.py index 39e77a8bb9..a64913b51e 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1014,7 +1014,7 @@ async def heartbeat(self): self.bandwidth_workers.clear() self.bandwidth_types.clear() except CommClosedError: - logger.warning("Heartbeat to scheduler failed") + logger.warning("Heartbeat to scheduler failed", exc_info=True) if not self.reconnect: await self.close(report=False) except OSError as e: