Skip to content

Commit

Permalink
Remove rpc- and pubsub-client version querying (#2045)
Browse files Browse the repository at this point in the history
* Remove filter mapping

* Remove unused internal module

* Remove unused pubsub-client stuff

* Remove unused RpcClient encoding maping, node_version
  • Loading branch information
CriesofCarrots authored Jul 9, 2024
1 parent 69a1e86 commit 69d2eb8
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 207 deletions.
66 changes: 9 additions & 57 deletions pubsub-client/src/nonblocking/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,18 +183,17 @@ use {
RpcTransactionLogsFilter,
},
error_object::RpcErrorObject,
filter::maybe_map_filters,
response::{
Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
RpcSignatureResult, RpcVersionInfo, RpcVote, SlotInfo, SlotUpdate,
RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
},
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
std::collections::BTreeMap,
thiserror::Error,
tokio::{
net::TcpStream,
sync::{mpsc, oneshot, RwLock},
sync::{mpsc, oneshot},
task::JoinHandle,
time::{sleep, Duration},
},
Expand Down Expand Up @@ -265,9 +264,8 @@ type RequestMsg = (
#[derive(Debug)]
pub struct PubsubClient {
subscribe_sender: mpsc::UnboundedSender<SubscribeRequestMsg>,
request_sender: mpsc::UnboundedSender<RequestMsg>,
_request_sender: mpsc::UnboundedSender<RequestMsg>,
shutdown_sender: oneshot::Sender<()>,
node_version: RwLock<Option<semver::Version>>,
ws: JoinHandle<PubsubClientResult>,
}

Expand All @@ -279,14 +277,14 @@ impl PubsubClient {
.map_err(PubsubClientError::ConnectionError)?;

let (subscribe_sender, subscribe_receiver) = mpsc::unbounded_channel();
let (request_sender, request_receiver) = mpsc::unbounded_channel();
let (_request_sender, request_receiver) = mpsc::unbounded_channel();
let (shutdown_sender, shutdown_receiver) = oneshot::channel();

#[allow(clippy::used_underscore_binding)]
Ok(Self {
subscribe_sender,
request_sender,
_request_sender,
shutdown_sender,
node_version: RwLock::new(None),
ws: tokio::spawn(PubsubClient::run_ws(
ws,
subscribe_receiver,
Expand All @@ -301,43 +299,11 @@ impl PubsubClient {
self.ws.await.unwrap() // WS future should not be cancelled or panicked
}

pub async fn set_node_version(&self, version: semver::Version) -> Result<(), ()> {
let mut w_node_version = self.node_version.write().await;
*w_node_version = Some(version);
#[deprecated(since = "2.0.2", note = "PubsubClient::node_version is no longer used")]
pub async fn set_node_version(&self, _version: semver::Version) -> Result<(), ()> {
Ok(())
}

async fn get_node_version(&self) -> PubsubClientResult<semver::Version> {
let r_node_version = self.node_version.read().await;
if let Some(version) = &*r_node_version {
Ok(version.clone())
} else {
drop(r_node_version);
let mut w_node_version = self.node_version.write().await;
let node_version = self.get_version().await?;
*w_node_version = Some(node_version.clone());
Ok(node_version)
}
}

async fn get_version(&self) -> PubsubClientResult<semver::Version> {
let (response_sender, response_receiver) = oneshot::channel();
self.request_sender
.send(("getVersion".to_string(), Value::Null, response_sender))
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))?;
let result = response_receiver
.await
.map_err(|err| PubsubClientError::ConnectionClosed(err.to_string()))??;
let node_version: RpcVersionInfo = serde_json::from_value(result)?;
let node_version = semver::Version::parse(&node_version.solana_core).map_err(|e| {
PubsubClientError::RequestFailed {
reason: format!("failed to parse cluster version: {e}"),
message: "getVersion".to_string(),
}
})?;
Ok(node_version)
}

async fn subscribe<'a, T>(&self, operation: &str, params: Value) -> SubscribeResult<'a, T>
where
T: DeserializeOwned + Send + 'a,
Expand Down Expand Up @@ -426,22 +392,8 @@ impl PubsubClient {
pub async fn program_subscribe(
&self,
pubkey: &Pubkey,
mut config: Option<RpcProgramAccountsConfig>,
config: Option<RpcProgramAccountsConfig>,
) -> SubscribeResult<'_, RpcResponse<RpcKeyedAccount>> {
if let Some(ref mut config) = config {
if let Some(ref mut filters) = config.filters {
let node_version = self.get_node_version().await.ok();
// If node does not support the pubsub `getVersion` method, assume version is old
// and filters should be mapped (node_version.is_none()).
maybe_map_filters(node_version, filters).map_err(|e| {
PubsubClientError::RequestFailed {
reason: e,
message: "maybe_map_filters".to_string(),
}
})?;
}
}

let params = json!([pubkey.to_string(), config]);
self.subscribe("program", params).await
}
Expand Down
42 changes: 1 addition & 41 deletions pubsub-client/src/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ use {
RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig,
RpcTransactionLogsFilter,
},
filter,
response::{
Response as RpcResponse, RpcBlockUpdate, RpcKeyedAccount, RpcLogsResponse,
RpcSignatureResult, RpcVote, SlotInfo, SlotUpdate,
Expand Down Expand Up @@ -207,35 +206,6 @@ where
.map_err(|err| err.into())
}

fn get_version(
writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
) -> Result<semver::Version, PubsubClientError> {
writable_socket.write().unwrap().send(Message::Text(
json!({
"jsonrpc":"2.0","id":1,"method":"getVersion",
})
.to_string(),
))?;
let message = writable_socket.write().unwrap().read()?;
let message_text = &message.into_text()?;

if let Ok(json_msg) = serde_json::from_str::<Map<String, Value>>(message_text) {
if let Some(Object(version_map)) = json_msg.get("result") {
if let Some(node_version) = version_map.get("solana-core") {
if let Some(node_version) = node_version.as_str() {
if let Ok(parsed) = semver::Version::parse(node_version) {
return Ok(parsed);
}
}
}
}
}

Err(PubsubClientError::UnexpectedGetVersionResponse(format!(
"msg={message_text}"
)))
}

fn read_message(
writable_socket: &Arc<RwLock<WebSocket<MaybeTlsStream<TcpStream>>>>,
) -> Result<Option<T>, PubsubClientError> {
Expand Down Expand Up @@ -523,7 +493,7 @@ impl PubsubClient {
pub fn program_subscribe(
url: &str,
pubkey: &Pubkey,
mut config: Option<RpcProgramAccountsConfig>,
config: Option<RpcProgramAccountsConfig>,
) -> Result<ProgramSubscription, PubsubClientError> {
let url = Url::parse(url)?;
let socket = connect_with_retry(url)?;
Expand All @@ -534,16 +504,6 @@ impl PubsubClient {
let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone();

if let Some(ref mut config) = config {
if let Some(ref mut filters) = config.filters {
let node_version = PubsubProgramClientSubscription::get_version(&socket_clone).ok();
// If node does not support the pubsub `getVersion` method, assume version is old
// and filters should be mapped (node_version.is_none()).
filter::maybe_map_filters(node_version, filters)
.map_err(PubsubClientError::RequestError)?;
}
}

let body = json!({
"jsonrpc":"2.0",
"id":1,
Expand Down
29 changes: 0 additions & 29 deletions rpc-client-api/src/filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(deprecated)]
use {
crate::version_req::VersionReq,
solana_inline_spl::{token::GenericTokenAccount, token_2022::Account},
solana_sdk::account::{AccountSharedData, ReadableAccount},
std::borrow::Cow,
Expand Down Expand Up @@ -298,34 +297,6 @@ impl From<RpcMemcmp> for Memcmp {
}
}

pub fn maybe_map_filters(
node_version: Option<semver::Version>,
filters: &mut [RpcFilterType],
) -> Result<(), String> {
let version_reqs = VersionReq::from_strs(&["<1.11.2", "~1.13"])?;
let needs_mapping = node_version
.map(|version| version_reqs.matches_any(&version))
.unwrap_or(true);
if needs_mapping {
for filter in filters.iter_mut() {
if let RpcFilterType::Memcmp(memcmp) = filter {
match &memcmp.bytes {
MemcmpEncodedBytes::Base58(string) => {
memcmp.bytes = MemcmpEncodedBytes::Binary(string.clone());
}
MemcmpEncodedBytes::Base64(_) => {
return Err("RPC node on old version does not support base64 \
encoding for memcmp filters"
.to_string());
}
_ => {}
}
}
}
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 0 additions & 1 deletion rpc-client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pub mod error_object;
pub mod filter;
pub mod request;
pub mod response;
pub mod version_req;

#[macro_use]
extern crate serde_derive;
20 changes: 0 additions & 20 deletions rpc-client-api/src/version_req.rs

This file was deleted.

64 changes: 5 additions & 59 deletions rpc-client/src/nonblocking/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use {
Error as ClientError, ErrorKind as ClientErrorKind, Result as ClientResult,
},
config::{RpcAccountInfoConfig, *},
filter::{self, RpcFilterType},
request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter},
response::*,
},
Expand All @@ -57,7 +56,7 @@ use {
str::FromStr,
time::{Duration, Instant},
},
tokio::{sync::RwLock, time::sleep},
tokio::time::sleep,
};

/// A client of a remote Solana node.
Expand Down Expand Up @@ -141,7 +140,6 @@ use {
pub struct RpcClient {
sender: Box<dyn RpcSender + Send + Sync + 'static>,
config: RpcClientConfig,
node_version: RwLock<Option<semver::Version>>,
}

impl RpcClient {
Expand All @@ -157,7 +155,6 @@ impl RpcClient {
) -> Self {
Self {
sender: Box::new(sender),
node_version: RwLock::new(None),
config,
}
}
Expand Down Expand Up @@ -509,30 +506,11 @@ impl RpcClient {
self.sender.url()
}

pub async fn set_node_version(&self, version: semver::Version) -> Result<(), ()> {
let mut w_node_version = self.node_version.write().await;
*w_node_version = Some(version);
#[deprecated(since = "2.0.2", note = "RpcClient::node_version is no longer used")]
pub async fn set_node_version(&self, _version: semver::Version) -> Result<(), ()> {
Ok(())
}

async fn get_node_version(&self) -> Result<semver::Version, RpcError> {
let r_node_version = self.node_version.read().await;
if let Some(version) = &*r_node_version {
Ok(version.clone())
} else {
drop(r_node_version);
let mut w_node_version = self.node_version.write().await;
let node_version = self.get_version().await.map_err(|e| {
RpcError::RpcRequestError(format!("cluster version query failed: {e}"))
})?;
let node_version = semver::Version::parse(&node_version.solana_core).map_err(|e| {
RpcError::RpcRequestError(format!("failed to parse cluster version: {e}"))
})?;
*w_node_version = Some(node_version.clone());
Ok(node_version)
}
}

/// Get the configured default [commitment level][cl].
///
/// [cl]: https://solana.com/docs/rpc#configuring-state-commitment
Expand All @@ -550,17 +528,6 @@ impl RpcClient {
self.config.commitment_config
}

#[allow(deprecated)]
async fn maybe_map_filters(
&self,
mut filters: Vec<RpcFilterType>,
) -> Result<Vec<RpcFilterType>, RpcError> {
let node_version = self.get_node_version().await?;
filter::maybe_map_filters(Some(node_version), &mut filters)
.map_err(RpcError::RpcRequestError)?;
Ok(filters)
}

/// Submit a transaction and wait for confirmation.
///
/// Once this function returns successfully, the given transaction is
Expand Down Expand Up @@ -895,11 +862,7 @@ impl RpcClient {
transaction: &impl SerializableTransaction,
config: RpcSendTransactionConfig,
) -> ClientResult<Signature> {
let encoding = if let Some(encoding) = config.encoding {
encoding
} else {
self.default_cluster_transaction_encoding().await?
};
let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64);
let preflight_commitment = CommitmentConfig {
commitment: config.preflight_commitment.unwrap_or_default(),
};
Expand Down Expand Up @@ -1185,16 +1148,6 @@ impl RpcClient {
}
}

async fn default_cluster_transaction_encoding(
&self,
) -> Result<UiTransactionEncoding, RpcError> {
if self.get_node_version().await? < semver::Version::new(1, 3, 16) {
Ok(UiTransactionEncoding::Base58)
} else {
Ok(UiTransactionEncoding::Base64)
}
}

/// Simulates sending a transaction.
///
/// If the transaction fails, then the [`err`] field of the returned
Expand Down Expand Up @@ -1344,11 +1297,7 @@ impl RpcClient {
transaction: &impl SerializableTransaction,
config: RpcSimulateTransactionConfig,
) -> RpcResult<RpcSimulateTransactionResult> {
let encoding = if let Some(encoding) = config.encoding {
encoding
} else {
self.default_cluster_transaction_encoding().await?
};
let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base64);
let commitment = config.commitment.unwrap_or_default();
let config = RpcSimulateTransactionConfig {
encoding: Some(encoding),
Expand Down Expand Up @@ -4046,9 +3995,6 @@ impl RpcClient {
.commitment
.unwrap_or_else(|| self.commitment());
config.account_config.commitment = Some(commitment);
if let Some(filters) = config.filters {
config.filters = Some(self.maybe_map_filters(filters).await?);
}

let accounts = self
.send::<OptionalContext<Vec<RpcKeyedAccount>>>(
Expand Down

0 comments on commit 69d2eb8

Please sign in to comment.