Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

fix: fix re-subscription on websocket reconnect #2419

Merged
merged 2 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions ethers-providers/src/rpc/transports/ws/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,25 +351,29 @@ impl RequestManager {
old_backend.shutdown();

tracing::debug!(count = self.subs.count(), "Re-starting active subscriptions");
let req_cnt = self.reqs.len();

// reissue subscriptionps
// reissue subscriptions
for (id, sub) in self.subs.to_reissue() {
self.backend
.dispatcher
.unbounded_send(sub.serialize_raw(*id)?)
.map_err(|_| WsClientError::DeadChannel)?;
let (tx, _rx) = oneshot::channel();
let in_flight = InFlight {
method: "eth_subscribe".to_string(),
params: sub.params.clone(),
channel: tx,
};
// Need an entry in reqs to ensure response with new server sub ID is processed
self.reqs.insert(*id, in_flight);
}

tracing::debug!(count = self.reqs.len(), "Re-issuing pending requests");
// reissue requests. We filter these to prevent in-flight requests for
// subscriptions to be re-issued twice (once in above loop, once in this loop).
for (id, req) in self.reqs.iter().filter(|(id, _)| !self.subs.has(**id)) {
tracing::debug!(count = req_cnt, "Re-issuing pending requests");
// reissue requests, including the re-subscription requests we just added above
for (id, req) in self.reqs.iter() {
self.backend
.dispatcher
.unbounded_send(req.serialize_raw(*id)?)
.map_err(|_| WsClientError::DeadChannel)?;
}
tracing::info!(subs = self.subs.count(), reqs = self.reqs.len(), "Re-connection complete");
tracing::info!(subs = self.subs.count(), reqs = req_cnt, "Re-connection complete");

Ok(())
}
Expand Down
25 changes: 24 additions & 1 deletion ethers-providers/tests/it/ws_errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use ethers_core::types::Filter;
use ethers_core::{types::Filter, utils::Anvil};
use ethers_providers::{Middleware, Provider, StreamExt};
use futures_util::SinkExt;
use std::time::Duration;
Expand Down Expand Up @@ -64,3 +64,26 @@ async fn graceful_disconnect_on_ws_errors() {

assert!(stream.next().await.is_none());
}

#[tokio::test]
async fn resubscribe_on_ws_reconnect() {
let anvil = Anvil::new().block_time(1u64).spawn();
let port = anvil.port();
let provider = Provider::connect_with_reconnects(anvil.ws_endpoint(), 1).await.unwrap();

// Attempt to ensure a different server-side subscription id after reconnect by making
// the subscription we care about be the second one after initial startup, but the first
// (and only) one after reconnection.
let ignored_sub = provider.subscribe_blocks().await.unwrap();
let mut blocks = provider.subscribe_blocks().await.unwrap();
ignored_sub.unsubscribe().await.expect("unsubscribe failed");

blocks.next().await.expect("no block notice before reconnect");

// Kill & restart using the same port so we end up with the same endpoint url:
drop(anvil);
let _anvil = Anvil::new().port(port).block_time(1u64).spawn();

// Wait for the next block on existing subscription. Will fail w/o resubscription:
blocks.next().await.expect("no block notice after reconnect");
}