diff --git a/core/src/client/async_client/helpers.rs b/core/src/client/async_client/helpers.rs index ee167bd0fd..0b7cf2c05b 100644 --- a/core/src/client/async_client/helpers.rs +++ b/core/src/client/async_client/helpers.rs @@ -28,6 +28,7 @@ use std::time::Duration; use crate::client::async_client::manager::{RequestManager, RequestStatus}; use crate::client::{RequestMessage, TransportSenderT}; +use crate::error::SubscriptionClosed; use crate::Error; use futures_channel::{mpsc, oneshot}; @@ -84,7 +85,16 @@ pub(crate) fn process_subscription_response( }; match manager.as_subscription_mut(&request_id) { - Some(send_back_sink) => match send_back_sink.try_send(response.params.result) { + Some(send_back_sink) => match send_back_sink.try_send(response.params.result.clone()) { + // The server sent a subscription closed notification, then close down the subscription. + Ok(()) if serde_json::from_value::(response.params.result).is_ok() => { + if manager.remove_subscription(request_id, sub_id.clone()).is_some() { + Ok(()) + } else { + tracing::error!("The server tried to close down an invalid subscription: {:?}", sub_id); + Err(None) + } + } Ok(()) => Ok(()), Err(err) => { tracing::error!("Dropping subscription {:?} error: {:?}", sub_id, err); diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index c59be25a6f..7d8b6e8e90 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -459,6 +459,7 @@ async fn ws_server_cancels_sub_stream_after_err() { let exp = SubscriptionClosed::new(SubscriptionClosedReason::Server(err.to_string())); // The server closed down the subscription with the underlying error from the stream. assert!(matches!(sub.next().await, Some(Err(Error::SubscriptionClosed(close_reason))) if close_reason == exp)); + assert!(sub.next().await.is_none()); } #[tokio::test]