Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensures poll order wrt subscription ID's #1620

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 38 additions & 13 deletions subxt/src/backend/unstable/follow_stream_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ pub struct FollowStreamDriverHandle<Hash: BlockHash> {
impl<Hash: BlockHash> FollowStreamDriverHandle<Hash> {
/// Subscribe to follow events.
pub fn subscribe(&self) -> FollowStreamDriverSubscription<Hash> {
self.shared.subscribe()
self.shared.subscribe(true)
}

/// Returns if Followstream has reconnected
pub fn reconnected(&self) -> impl Stream<Item = bool> {
self.shared.subscribe(false).reconnected()
}
}

Expand Down Expand Up @@ -137,6 +142,21 @@ impl<Hash: BlockHash> FollowStreamDriverSubscription<Hash> {
}
}

/// Returns if the backend has reconnected/is reconnecting
pub fn reconnected(self) -> impl Stream<Item = bool> {
self.filter_map(|ev| {
let result = match ev {
FollowStreamMsg::Ready(_) => None,
FollowStreamMsg::Event(ev) => match ev {
FollowEvent::Initialized(_) => Some(true),
FollowEvent::Stop => Some(false),
_ => None,
},
};
std::future::ready(result)
})
}

/// Subscribe to the follow events, ignoring any other messages.
pub fn events(self) -> impl Stream<Item = FollowEvent<BlockRef<Hash>>> + Send + Sync {
self.filter_map(|ev| std::future::ready(ev.into_event()))
Expand All @@ -145,7 +165,7 @@ impl<Hash: BlockHash> FollowStreamDriverSubscription<Hash> {

impl<Hash: BlockHash> Clone for FollowStreamDriverSubscription<Hash> {
fn clone(&self) -> Self {
self.shared.subscribe()
self.shared.subscribe(true)
}
}

Expand Down Expand Up @@ -330,7 +350,10 @@ impl<Hash: BlockHash> Shared<Hash> {
}

/// Create a new subscription.
pub fn subscribe(&self) -> FollowStreamDriverSubscription<Hash> {
pub fn subscribe(
&self,
insert_subscription_data: bool,
) -> FollowStreamDriverSubscription<Hash> {
let mut shared = self.0.lock().unwrap();

let id = shared.next_id;
Expand All @@ -349,16 +372,18 @@ impl<Hash: BlockHash> Shared<Hash> {
// it means the subscription is currently stopped, and we should expect new Ready/Init
// messages anyway once it restarts.
let mut local_items = VecDeque::new();
if let Some(sub_id) = &shared.current_subscription_id {
local_items.push_back(FollowStreamMsg::Ready(sub_id.clone()));
}
if let Some(init_msg) = &shared.current_init_message {
local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized(
init_msg.clone(),
)));
}
for ev in &shared.block_events_for_new_subscriptions {
local_items.push_back(FollowStreamMsg::Event(ev.clone()));
if insert_subscription_data {
if let Some(sub_id) = &shared.current_subscription_id {
local_items.push_back(FollowStreamMsg::Ready(sub_id.clone()));
}
if let Some(init_msg) = &shared.current_init_message {
local_items.push_back(FollowStreamMsg::Event(FollowEvent::Initialized(
init_msg.clone(),
)));
}
for ev in &shared.block_events_for_new_subscriptions {
local_items.push_back(FollowStreamMsg::Event(ev.clone()));
}
}

drop(shared);
Expand Down
76 changes: 71 additions & 5 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ use crate::Config;
use async_trait::async_trait;
use follow_stream_driver::{FollowStreamDriver, FollowStreamDriverHandle};
use futures::future::Either;
use futures::{Stream, StreamExt};
use futures::{pin_mut, Future, FutureExt, Stream, StreamExt};
use std::collections::HashMap;
use std::pin::Pin;
use std::task::Poll;
use storage_items::StorageItems;

Expand Down Expand Up @@ -302,15 +303,15 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
}

async fn block_header(&self, at: T::Hash) -> Result<Option<T::Header>, Error> {
retry(|| async {
retry_with_reset_on_reconnect(&self.follow_handle, || async {
let sub_id = get_subscription_id(&self.follow_handle).await?;
self.methods.chainhead_v1_header(&sub_id, at).await
})
.await
}

async fn block_body(&self, at: T::Hash) -> Result<Option<Vec<Vec<u8>>>, Error> {
retry(|| async {
retry_with_reset_on_reconnect(&self.follow_handle, || async {
let sub_id = get_subscription_id(&self.follow_handle).await?;

// Subscribe to the body response and get our operationId back.
Expand Down Expand Up @@ -669,9 +670,8 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
call_parameters: Option<&[u8]>,
at: T::Hash,
) -> Result<Vec<u8>, Error> {
retry(|| async {
retry_with_reset_on_reconnect(&self.follow_handle, || async {
let sub_id = get_subscription_id(&self.follow_handle).await?;

// Subscribe to the body response and get our operationId back.
let follow_events = self.follow_handle.subscribe().events();
let call_parameters = call_parameters.unwrap_or(&[]);
Expand Down Expand Up @@ -716,3 +716,69 @@ async fn get_subscription_id<Hash: BlockHash>(

Ok(sub_id)
}

/// A helper to restart calls on subscription reconnect.
async fn retry_with_reset_on_reconnect<Hash, T, F, R>(
follow_handle: &FollowStreamDriverHandle<Hash>,
mut fun: F,
) -> Result<R, Error>
where
Hash: BlockHash,
F: FnMut() -> T,
T: Future<Output = Result<R, Error>>,
{
let reconnected = follow_handle.reconnected().fuse();
pin_mut!(reconnected);

async fn check_for_reconnect(
mut reconnected: Pin<&mut impl Stream<Item = bool>>,
) -> Result<(), Error> {
loop {
match reconnected.next().await {
Some(true) => {
break;
}
Some(false) => (),
None => {
return Err(RpcError::SubscriptionDropped.into());
}
}
}
Ok(())
}

loop {
let action = retry(&mut fun).fuse();

pin_mut!(action);

let result = futures::future::select(reconnected.next(), action).await;
match result {
// We reconnected and received FollowEvent::Initialized()
Either::Left((Some(has_reconnected), _)) if has_reconnected => {}
Either::Left((Some(_), _)) => {
// Wait until we see Initialized Event
check_for_reconnect(reconnected.as_mut()).await?
}
Either::Right((result, reset)) => {
let is_reconnected = reset.now_or_never();
if is_reconnected.is_none() {
return result;
}
let is_reconnected = is_reconnected.flatten();
if let Some(has_reconnected) = is_reconnected {
// Wait until we see Initialized Event
if !has_reconnected {
check_for_reconnect(reconnected.as_mut()).await?
}
} else {
break;
}
}
Either::Left((None, _)) => {
break;
}
}
}
Err(RpcError::SubscriptionDropped.into())
}
16 changes: 0 additions & 16 deletions subxt/src/backend/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ where
F: FnMut() -> T,
T: Future<Output = Result<R, Error>>,
{
const REJECTED_MAX_RETRIES: usize = 10;
let mut rejected_retries = 0;

loop {
match retry_future().await {
Ok(v) => return Ok(v),
Expand All @@ -117,19 +114,6 @@ where
continue;
}

// TODO: https://github.com/paritytech/subxt/issues/1567
// This is a hack because if a reconnection occurs
// the order of pending calls is not guaranteed.
//
// Such that it's possible the a pending future completes
// before `chainHead_follow` is established with fresh
// subscription id.
//
if e.is_rejected() && rejected_retries < REJECTED_MAX_RETRIES {
rejected_retries += 1;
continue;
}

return Err(e);
}
}
Expand Down
Loading