Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2.0: Remove rpc- and pubsub-client version querying (backport of #2045) #2052

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 9 additions & 57 deletions pubsub-client/src/nonblocking/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,17 @@ use {
RpcTransactionLogsFilter,
},
error_object::RpcErrorObject,
filter::maybe_map_filters,
response::{
Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
RpcSignatureResult, RpcVersionInfo, RpcVote, SlotInfo, SlotUpdate,
RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
},
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
std::collections::BTreeMap,
thiserror::Error,
tokio::{
net::TcpStream,
sync::{mpsc, oneshot, RwLock},
sync::{mpsc, oneshot},
task::JoinHandle,
time::{sleep, Duration},
},
Expand Down Expand Up @@ -265,9 +264,8 @@ type RequestMsg = (
#[derive(Debug)]
pub struct PubsubClient {
subscribe_sender: mpsc::UnboundedSender<SubscribeRequestMsg>,
request_sender: mpsc::UnboundedSender<RequestMsg>,
_request_sender: mpsc::UnboundedSender<RequestMsg>,
shutdown_sender: oneshot::Sender<()>,
node_version: RwLock<Option<semver::Version>>,
ws: JoinHandle<PubsubClientResult>,
}

Expand All @@ -279,14 +277,14 @@ impl PubsubClient {
.map_err(PubsubClientError::ConnectionError)?;

let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
let (request_sender, request_receiver) = mpsc::unbounded_channel();
let (_request_sender, request_receiver) = mpsc::unbounded_channel();
let (shutdown_sender, shutdown_receiver) = oneshot::channel();

#[allow(clippy::used_underscore_binding)]
Ok(Self {
subscribe_sender,
request_sender,
_request_sender,
shutdown_sender,
node_version: RwLock::new(None),
ws: tokio::spawn(PubsubClient::run_ws(
ws,
subscribe_receiver,
Expand All @@ -301,43 +299,11 @@ impl PubsubClient {
self.ws.await.unwrap() // WS future should not be cancelled or panicked
}

pub async fn set_node_version(&self, version: semver::Version) -> Result<(), ()> {
let mut w_node_version = self.node_version.write().await;
*w_node_version = Some(version);
#[deprecated(since = "2.0.2", note = "PubsubClient::node_version is no longer used")]
pub async fn set_node_version(&self, _version: semver::Version) -> Result<(), ()> {
Ok(())
}

async fn get_node_version(&self) -> PubsubClientResult<semver::Version> {
let r_node_version = self.node_version.read().await;
if let Some(version) = &*r_node_version {
Ok(version.clone())
} else {
drop(r_node_version);
let mut w_node_version = self.node_version.write().await;
let node_version = self.get_version().await?;
*w_node_version = Some(node_version.clone());
Ok(node_version)
}
}

async fn get_version(&self) -> PubsubClientResult<semver::Version> {
let (response_sender, response_receiver) = oneshot::channel();
self.request_sender
.send(("getVersion".to_string(), Value::Null, response_sender))
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
let result = response_receiver
.await
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
let node_version: RpcVersionInfo = serde_json::from_value(result)?;
let node_version = semver::Version::parse(&node_version.solana_core).map_err(|e| {
PubsubClientError::RequestFailed {
reason: format!("failed to parse cluster version: {e}"),
message: "getVersion".to_string(),
}
})?;
Ok(node_version)
}

async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T>
where
T: DeserializeOwned + Send + 'a,
Expand Down Expand Up @@ -426,22 +392,8 @@ impl PubsubClient {
pub async fn program_subscribe(
&self,
pubkey: &Pubkey,
mut config: Option<RpcProgramAccountsConfig>,
config: Option<RpcProgramAccountsConfig>,
) -> SubscribeResult<'_, RpcResponse<RpcKeyedAccount>> {
if let Some(ref mut config) = config {
if let Some(ref mut filters) = config.filters {
let node_version = self.get_node_version().await.ok();
// If node does not support the pubsub `getVersion` method, assume version is old
// and filters should be mapped (node_version.is_none()).
maybe_map_filters(node_version, filters).map_err(|e| {
PubsubClientError::RequestFailed {
reason: e,
message: "maybe_map_filters".to_string(),
}
})?;
}
}

let params = json!([pubkey.to_string(), config]);
self.subscribe("program", params).await
}
Expand Down
42 changes: 1 addition & 41 deletions pubsub-client/src/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ use {
RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
filter,
response::{
Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
Expand Down Expand Up @@ -207,35 +206,6 @@ where
.map_err(|err| err.into())
}

fn get_version(
writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
) -> Result<semver::Version, PubsubClientError> {
writable_socket.write().unwrap().send(Message::Text(
json!({
"jsonrpc":"2.0","id":1,"method":"getVersion",
})
.to_string(),
))?;
let message = writable_socket.write().unwrap().read()?;
let message_text = &message.into_text()?;

if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
if let Some(Object(version_map)) = json_msg.get("result") {
if let Some(node_version) = version_map.get("solana-core") {
if let Some(node_version) = node_version.as_str() {
if let Ok(parsed) = semver::Version::parse(node_version) {
return Ok(parsed);
}
}
}
}
}

Err(PubsubClientError::UnexpectedGetVersionResponse(format!(
"msg={message_text}"
)))
}

fn read_message(
writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
) -> Result<Option<T>, PubsubClientError> {
Expand Down Expand Up @@ -523,7 +493,7 @@ impl PubsubClient {
pub fn program_subscribe(
url: &str,
pubkey: &Pubkey,
mut config: Option<RpcProgramAccountsConfig>,
config: Option<RpcProgramAccountsConfig>,
) -> Result<ProgramSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
Expand All @@ -534,16 +504,6 @@ impl PubsubClient {
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();

if let Some(ref mut config) = config {
if let Some(ref mut filters) = config.filters {
let node_version = PubsubProgramClientSubscription::get_version(&socket_clone).ok();
// If node does not support the pubsub `getVersion` method, assume version is old
// and filters should be mapped (node_version.is_none()).
filter::maybe_map_filters(node_version, filters)
.map_err(PubsubClientError::RequestError)?;
}
}

let body = json!({
"jsonrpc":"2.0",
"id":1,
Expand Down
29 changes: 0 additions & 29 deletions rpc-client-api/src/filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(deprecated)]
use {
crate::version_req::VersionReq,
solana_inline_spl::{token::GenericTokenAccount, token_2022::Account},
solana_sdk::account::{AccountSharedData, ReadableAccount},
std::borrow::Cow,
Expand Down Expand Up @@ -298,34 +297,6 @@ impl From<RpcMemcmp> for Memcmp {
}
}

pub fn maybe_map_filters(
node_version: Option<semver::Version>,
filters: &mut [RpcFilterType],
) -> Result<(), String> {
let version_reqs = VersionReq::from_strs(&["<1.11.2", "~1.13"])?;
let needs_mapping = node_version
.map(|version| version_reqs.matches_any(&version))
.unwrap_or(true);
if needs_mapping {
for filter in filters.iter_mut() {
if let RpcFilterType::Memcmp(memcmp) = filter {
match &memcmp.bytes {
MemcmpEncodedBytes::Base58(string) => {
memcmp.bytes = MemcmpEncodedBytes::Binary(string.clone());
}
MemcmpEncodedBytes::Base64(_) => {
return Err("RPC node on old version does not support base64 \
encoding for memcmp filters"
.to_string());
}
_ => {}
}
}
}
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion rpc-client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pub mod error_object;
pub mod filter;
pub mod request;
pub mod response;
pub mod version_req;

#[macro_use]
extern crate serde_derive;
20 changes: 0 additions & 20 deletions rpc-client-api/src/version_req.rs

This file was deleted.

64 changes: 5 additions & 59 deletions rpc-client/src/nonblocking/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use {
Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult,
},
config::{RpcAccountInfoConfig, *},
filter::{self, RpcFilterType},
request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter},
response::*,
},
Expand All @@ -57,7 +56,7 @@ use {
str::FromStr,
time::{Duration, Instant},
},
tokio::{sync::RwLock, time::sleep},
tokio::time::sleep,
};

/// A client of a remote Solana node.
Expand Down Expand Up @@ -141,7 +140,6 @@ use {
pub struct RpcClient {
sender: Box<dyn RpcSender + Send + Sync + 'static>,
config: RpcClientConfig,
node_version: RwLock<Option<semver::Version>>,
}

impl RpcClient {
Expand All @@ -157,7 +155,6 @@ impl RpcClient {
) -> Self {
Self {
sender: Box::new(sender),
node_version: RwLock::new(None),
config,
}
}
Expand Down Expand Up @@ -509,30 +506,11 @@ impl RpcClient {
self.sender.url()
}

pub async fn set_node_version(&self, version: semver::Version) -> Result<(), ()> {
let mut w_node_version = self.node_version.write().await;
*w_node_version = Some(version);
#[deprecated(since = "2.0.2", note = "RpcClient::node_version is no longer used")]
pub async fn set_node_version(&self, _version: semver::Version) -> Result<(), ()> {
Ok(())
}

async fn get_node_version(&self) -> Result<semver::Version, RpcError> {
let r_node_version = self.node_version.read().await;
if let Some(version) = &*r_node_version {
Ok(version.clone())
} else {
drop(r_node_version);
let mut w_node_version = self.node_version.write().await;
let node_version = self.get_version().await.map_err(|e| {
RpcError::RpcRequestError(format!("cluster version query failed: {e}"))
})?;
let node_version = semver::Version::parse(&node_version.solana_core).map_err(|e| {
RpcError::RpcRequestError(format!("failed to parse cluster version: {e}"))
})?;
*w_node_version = Some(node_version.clone());
Ok(node_version)
}
}

/// Get the configured default [commitment level][cl].
///
/// [cl]: https://solana.com/docs/rpc#configuring-state-commitment
Expand All @@ -550,17 +528,6 @@ impl RpcClient {
self.config.commitment_config
}

#[allow(deprecated)]
async fn maybe_map_filters(
&self,
mut filters: Vec<RpcFilterType>,
) -> Result<Vec<RpcFilterType>, RpcError> {
let node_version = self.get_node_version().await?;
filter::maybe_map_filters(Some(node_version), &mut filters)
.map_err(RpcError::RpcRequestError)?;
Ok(filters)
}

/// Submit a transaction and wait for confirmation.
///
/// Once this function returns successfully, the given transaction is
Expand Down Expand Up @@ -895,11 +862,7 @@ impl RpcClient {
transaction: &impl SerializableTransaction,
config: RpcSendTransactionConfig,
) -> ClientResult<Signature> {
let encoding = if let Some(encoding) = config.encoding {
encoding
} else {
self.default_cluster_transaction_encoding().await?
};
let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64);
let preflight_commitment = CommitmentConfig {
commitment: config.preflight_commitment.unwrap_or_default(),
};
Expand Down Expand Up @@ -1185,16 +1148,6 @@ impl RpcClient {
}
}

async fn default_cluster_transaction_encoding(
&self,
) -> Result<UiTransactionEncoding, RpcError> {
if self.get_node_version().await? < semver::Version::new(1, 3, 16) {
Ok(UiTransactionEncoding::Base58)
} else {
Ok(UiTransactionEncoding::Base64)
}
}

/// Simulates sending a transaction.
///
/// If the transaction fails, then the [`err`] field of the returned
Expand Down Expand Up @@ -1344,11 +1297,7 @@ impl RpcClient {
transaction: &impl SerializableTransaction,
config: RpcSimulateTransactionConfig,
) -> RpcResult<RpcSimulateTransactionResult> {
let encoding = if let Some(encoding) = config.encoding {
encoding
} else {
self.default_cluster_transaction_encoding().await?
};
let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64);
let commitment = config.commitment.unwrap_or_default();
let config = RpcSimulateTransactionConfig {
encoding: Some(encoding),
Expand Down Expand Up @@ -4046,9 +3995,6 @@ impl RpcClient {
.commitment
.unwrap_or_else(|| self.commitment());
config.account_config.commitment = Some(commitment);
if let Some(filters) = config.filters {
config.filters = Some(self.maybe_map_filters(filters).await?);
}

let accounts = self
.send::<OptionalContext<Vec<RpcKeyedAccount>>>(
Expand Down