Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for legacy Backend impl #1751

Merged
merged 11 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ wasm-bindgen-futures = { workspace = true, optional = true }
bitvec = { workspace = true }
codec = { workspace = true, features = ["derive", "bit-vec"] }
scale-info = { workspace = true, features = ["bit-vec"] }
tokio = { workspace = true, features = ["macros", "time", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "time", "rt-multi-thread", "sync"] }
sp-core = { workspace = true }
sp-keyring = { workspace = true }
sp-runtime = { workspace = true }
Expand Down
40 changes: 24 additions & 16 deletions subxt/src/backend/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,36 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
keys: Vec<Vec<u8>>,
at: T::Hash,
) -> Result<StreamOfResults<StorageResponse>, Error> {
retry(|| async {
let keys = keys.clone();
let methods = self.methods.clone();

// For each key, return it + a future to get the result.
let iter = keys.into_iter().map(move |key| {
fn get_entry<T: Config>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow what you fixed with this but I think the previous code would try to fetch all storage keys in one retry block i.e, all storage key calls needs to be successful or retry it.

Looks more efficient to try each storage entry individually as you do now but maybe some other issue with it that I don't follow?

Copy link
Contributor Author

@pkhry pkhry Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say we tried to fetch 2 entries:

  • "A" and "B".
  • "A" returns an OK result
  • "B" returns an error
    The function would return Ok(streamOf(OK,Err)

So it just would not retry before this change

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup ok, so previously we wrapped code in retry, but that code didn't actually do any async stuff and just returned a StreamOf. Now, we are retrying each individual async request to fetch a storage value. Good catch!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, you could also create a separate PR for this if you want get it in ASAP :)

key: Vec<u8>,
at: T::Hash,
methods: LegacyRpcMethods<T>,
) -> impl Future<Output = Result<Option<StorageResponse>, Error>> {
retry(move || {
let methods = methods.clone();
let key = key.clone();
async move {
let res = methods.state_get_storage(&key, Some(at)).await?;
Ok(res.map(|value| StorageResponse { key, value }))
Ok(res.map(move |value| StorageResponse { key, value }))
}
});
})
}

let s = stream::iter(iter)
// Resolve the future
.then(|fut| fut)
// Filter any Options out (ie if we didn't find a value at some key we return nothing for it).
.filter_map(|r| future::ready(r.transpose()));
let keys = keys.clone();
let methods = self.methods.clone();

Ok(StreamOf(Box::pin(s)))
})
.await
// For each key, return it + a future to get the result.
let iter = keys
.into_iter()
.map(move |key| get_entry(key, at, methods.clone()));

let s = stream::iter(iter)
// Resolve the future
.then(|fut| fut)
// Filter any Options out (ie if we didn't find a value at some key we return nothing for it).
.filter_map(|r| future::ready(r.transpose()));

Ok(StreamOf(Box::pin(s)))
}

async fn storage_fetch_descendant_keys(
Expand Down
1 change: 1 addition & 0 deletions subxt/src/backend/legacy/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ pub type EncodedJustification = Vec<u8>;
/// the RPC call `state_getRuntimeVersion`,
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[cfg_attr(test, derive(serde::Serialize))]
pub struct RuntimeVersion {
/// Version of the runtime specification. A full-node will not attempt to use its native
/// runtime in substitute for the on-chain Wasm runtime unless all of `spec_name`,
Expand Down
Loading
Loading