diff --git a/Cargo.toml b/Cargo.toml index 6e0d190a..6c7b105e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ chrono = { version = "0.4.19", default-features = false, features = [ "clock", ] } hex = { version = "0.4.3", default-features = false, features = ["alloc"] } -rand = { version = "0.8.4", default-features = false, features = ["getrandom"] } +rand = { version = "0.8.5", default-features = false, features = ["getrandom"] } serde = { version = "1.0.130", default-features = false, features = ["derive"] } serde_json = { version = "1.0.68", default-features = false, features = [ "alloc", @@ -54,26 +54,27 @@ serde_json = { version = "1.0.68", default-features = false, features = [ serde_with = "3.2.0" serde_repr = "0.1" zeroize = "1.5.7" -hashbrown = { version = "0.14.0", default-features = false, features = [ - "serde", -] } +hashbrown = { version = "0.14.5", features = ["serde"] } fnv = { version = "1.0.7", default-features = false } derive-new = { version = "0.5.9", default-features = false } thiserror-no-std = "2.0.2" anyhow = { version = "1.0.69", default-features = false } tokio = { version = "1.28.0", default-features = false, optional = true } url = { version = "2.2.2", default-features = false, optional = true } -futures = { version = "0.3.28", default-features = false, optional = true } +futures = { version = "0.3.28", default-features = false, features = [ + "alloc", +], optional = true } rand_core = { version = "0.6.4", default-features = false } tokio-tungstenite = { version = "0.20.0", optional = true } - -[dependencies.embedded-websocket] -# git version needed to use `framer_async` -git = "https://github.com/ninjasource/embedded-websocket" -version = "0.9.2" -rev = "8d87d46f46fa0c75e099ca8aad37e8d00c8854f8" -default-features = false -optional = true +embassy-sync = { version = "0.6.0", default-features = false } +embedded-io-async = "0.6.1" +futures-sink = { version = "0.3.30", default-features = false } +futures-core = { version = "0.3.30", default-features = false } +futures-util = { version = "0.3.30", optional = true } +tokio-util = { version = "0.7.7", features = ["codec"], optional = true } +bytes = { version = "1.4.0", default-features = false } +embassy-futures = "0.1.1" +embedded-websocket = { version = "0.9.3", optional = true } [dev-dependencies] criterion = "0.5.1" @@ -82,9 +83,7 @@ cargo-husky = { version = "1.5.0", default-features = false, features = [ ] } tokio = { version = "1.28.0", features = ["full"] } tokio-util = { version = "0.7.7", features = ["codec"] } -futures-util = "0.3.30" -bytes = { version = "1.4.0", default-features = false } -rand = { version = "0.8.4", default-features = false, features = [ +rand = { version = "0.8.5", default-features = false, features = [ "getrandom", "std", "std_rng", @@ -96,13 +95,20 @@ harness = false [features] default = ["std", "core", "models", "utils", "tungstenite"] -models = ["core", "transactions", "requests", "ledger"] +models = ["core", "transactions", "requests", "ledger", "results"] transactions = ["core", "amounts", "currencies"] requests = ["core", "amounts", "currencies"] +results = ["core", "amounts", "currencies"] ledger = ["core", "amounts", "currencies"] amounts = ["core"] currencies = ["core"] -tungstenite = ["url", "futures", "tokio/full", "tokio-tungstenite/native-tls"] +tungstenite = [ + "url", + "futures", + "tokio/net", + "tokio-tungstenite/native-tls", + "futures-util", +] embedded-ws = ["url", "futures", "embedded-websocket"] core = ["utils"] utils = [] @@ -119,4 +125,5 @@ std = [ "serde/std", "indexmap/std", "secp256k1/std", + "dep:tokio-util", ] diff --git a/src/asynch/clients/async_client.rs b/src/asynch/clients/async_client.rs new file mode 100644 index 00000000..57a776ab --- /dev/null +++ b/src/asynch/clients/async_client.rs @@ -0,0 +1,19 @@ +use super::client::Client; +use crate::models::{requests::Request, results::XRPLResponse}; +use anyhow::Result; +use serde::{Deserialize, Serialize}; + +#[allow(async_fn_in_trait)] +pub trait AsyncClient<'a>: Client<'a> { + async fn request< + Res: Serialize + for<'de> Deserialize<'de>, + Req: Serialize + for<'de> Deserialize<'de> + Request<'a>, + >( + &'a self, + request: Req, + ) -> Result> { + self.request_impl(request).await + } +} + +impl<'a, T: Client<'a>> AsyncClient<'a> for T {} diff --git a/src/asynch/clients/client.rs b/src/asynch/clients/client.rs new file mode 100644 index 00000000..7b7cd241 --- /dev/null +++ b/src/asynch/clients/client.rs @@ -0,0 +1,40 @@ +use crate::models::{requests::Request, results::XRPLResponse}; +#[cfg(feature = "std")] +use crate::utils::get_random_id; +use alloc::borrow::Cow; +use anyhow::Result; +use serde::{Deserialize, Serialize}; + +#[allow(async_fn_in_trait)] +pub trait Client<'a> { + async fn request_impl< + Res: Serialize + for<'de> Deserialize<'de>, + Req: Serialize + for<'de> Deserialize<'de> + Request<'a>, + >( + &'a self, + request: Req, + ) -> Result>; + fn set_request_id< + Res: Serialize + for<'de> Deserialize<'de>, + Req: Serialize + for<'de> Deserialize<'de> + Request<'a>, + >( + &'a self, + request: &mut Req, + ) -> Cow<'_, str> { + let common_fields = request.get_common_fields(); + let request_id: Cow<'_, str> = match common_fields.id.clone() { + Some(id) => id, + None => { + #[cfg(feature = "std")] + { + let mut rng = rand::thread_rng(); + Cow::Owned(get_random_id(&mut rng)) + } + #[cfg(not(feature = "std"))] + unimplemented!("get_random_id is not yet implemented for no_std. Please provide an `id` in the request."); + } + }; + request.get_common_fields_mut().id = Some(request_id.clone()); + request_id + } +} diff --git a/src/asynch/clients/embedded_ws.rs b/src/asynch/clients/embedded_ws.rs deleted file mode 100644 index 76106dee..00000000 --- a/src/asynch/clients/embedded_ws.rs +++ /dev/null @@ -1,168 +0,0 @@ -use super::{ - exceptions::XRPLWebsocketException, - {WebsocketClosed, WebsocketOpen}, -}; -use crate::Err; -use anyhow::Result; -use core::marker::PhantomData; -use core::{fmt::Debug, ops::Deref}; -pub use embedded_websocket::{ - framer_async::{ - Framer as EmbeddedWebsocketFramer, FramerError as EmbeddedWebsocketFramerError, - ReadResult as EmbeddedWebsocketReadMessageType, - }, - Client as EmbeddedWebsocketClient, Error as EmbeddedWebsocketError, - WebSocket as EmbeddedWebsocket, WebSocketCloseStatusCode as EmbeddedWebsocketCloseStatusCode, - WebSocketOptions as EmbeddedWebsocketOptions, - WebSocketSendMessageType as EmbeddedWebsocketSendMessageType, - WebSocketState as EmbeddedWebsocketState, -}; -use futures::{Sink, Stream}; -use rand_core::RngCore; - -pub struct AsyncWebsocketClient { - inner: EmbeddedWebsocketFramer, - status: PhantomData, -} - -impl AsyncWebsocketClient { - pub fn is_open(&self) -> bool { - core::any::type_name::() == core::any::type_name::() - } -} - -impl AsyncWebsocketClient { - /// Open a websocket connection. - pub async fn open<'a, B, E>( - stream: &mut (impl Stream> + Sink<&'a [u8], Error = E> + Unpin), - buffer: &'a mut [u8], - rng: Rng, - websocket_options: &EmbeddedWebsocketOptions<'_>, - ) -> Result> - where - B: AsRef<[u8]>, - E: Debug, - { - let websocket = EmbeddedWebsocket::::new_client(rng); - let mut framer = EmbeddedWebsocketFramer::new(websocket); - match framer.connect(stream, buffer, websocket_options).await { - Ok(Some(_)) => {} - Ok(None) => {} - Err(error) => return Err!(XRPLWebsocketException::from(error)), - } - - Ok(AsyncWebsocketClient { - inner: framer, - status: PhantomData::, - }) - } -} - -impl AsyncWebsocketClient { - /// Encode a message to be sent over the websocket. - pub fn encode( - &mut self, - message_type: EmbeddedWebsocketSendMessageType, - end_of_message: bool, - from: &[u8], - to: &mut [u8], - ) -> Result - where - E: Debug, - { - match self - .inner - .encode::(message_type, end_of_message, from, to) - { - Ok(bytes_written) => Ok(bytes_written), - Err(error) => Err!(XRPLWebsocketException::from(error)), - } - } - - /// Send a message over the websocket. - pub async fn send<'b, E, R: serde::Serialize>( - &mut self, - stream: &mut (impl Sink<&'b [u8], Error = E> + Unpin), - stream_buf: &'b mut [u8], - end_of_message: bool, - frame_buf: R, - ) -> Result<()> - where - E: Debug, - { - match serde_json::to_vec(&frame_buf) { - Ok(frame_buf) => match self - .inner - .write( - stream, - stream_buf, - EmbeddedWebsocketSendMessageType::Text, - end_of_message, - frame_buf.as_slice(), - ) - .await - { - Ok(()) => Ok(()), - Err(error) => Err!(XRPLWebsocketException::from(error)), - }, - Err(error) => Err!(error), - } - } - - /// Close the websocket connection. - pub async fn close<'b, E>( - &mut self, - stream: &mut (impl Sink<&'b [u8], Error = E> + Unpin), - stream_buf: &'b mut [u8], - close_status: EmbeddedWebsocketCloseStatusCode, - status_description: Option<&str>, - ) -> Result<()> - where - E: Debug, - { - match self - .inner - .close(stream, stream_buf, close_status, status_description) - .await - { - Ok(()) => Ok(()), - Err(error) => Err!(XRPLWebsocketException::from(error)), - } - } - - /// Read a message from the websocket. - pub async fn next<'a, B: Deref, E>( - &'a mut self, - stream: &mut (impl Stream> + Sink<&'a [u8], Error = E> + Unpin), - buffer: &'a mut [u8], - ) -> Option>> - // TODO: Change to Response as soon as implemented - where - E: Debug, - { - match self.inner.read(stream, buffer).await { - Some(Ok(read_result)) => Some(Ok(read_result)), - Some(Err(error)) => Some(Err!(XRPLWebsocketException::from(error))), - None => None, - } - } - - /// Read a message from the websocket. - /// - /// This is similar to the `next` method, but returns a `Result>` rather than an `Option>`, making for easy use with the ? operator. - pub async fn try_next<'a, B: Deref, E>( - &'a mut self, - stream: &mut (impl Stream> + Sink<&'a [u8], Error = E> + Unpin), - buffer: &'a mut [u8], - ) -> Result>> - // TODO: Change to Response as soon as implemented - where - E: Debug, - { - match self.inner.read(stream, buffer).await { - Some(Ok(read_result)) => Ok(Some(read_result)), - Some(Err(error)) => Err!(XRPLWebsocketException::from(error)), - None => Ok(None), - } - } -} diff --git a/src/asynch/clients/mod.rs b/src/asynch/clients/mod.rs index 54eb0495..591a459f 100644 --- a/src/asynch/clients/mod.rs +++ b/src/asynch/clients/mod.rs @@ -1,14 +1,6 @@ -pub mod exceptions; +mod async_client; +mod client; +mod websocket; -pub struct WebsocketOpen; -pub struct WebsocketClosed; - -#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] -mod embedded_websocket; -#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] -mod tungstenite; - -#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] -pub use embedded_websocket::*; -#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] -pub use tungstenite::*; +pub use async_client::*; +pub use websocket::*; diff --git a/src/asynch/clients/tungstenite.rs b/src/asynch/clients/tungstenite.rs deleted file mode 100644 index c03bf281..00000000 --- a/src/asynch/clients/tungstenite.rs +++ /dev/null @@ -1,112 +0,0 @@ -use super::{ - exceptions::XRPLWebsocketException, - {WebsocketClosed, WebsocketOpen}, -}; -use crate::Err; - -use anyhow::Result; -use core::marker::PhantomData; -use core::{pin::Pin, task::Poll}; -use futures::{Sink, Stream}; -use tokio::net::TcpStream; -use tokio_tungstenite::{ - connect_async as tungstenite_connect_async, MaybeTlsStream as TungsteniteMaybeTlsStream, - WebSocketStream as TungsteniteWebsocketStream, -}; -use url::Url; - -pub use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; - -pub struct AsyncWebsocketClient { - inner: TungsteniteWebsocketStream>, - status: PhantomData, -} - -impl AsyncWebsocketClient { - pub fn is_open(&self) -> bool { - core::any::type_name::() == core::any::type_name::() - } -} - -impl Sink for AsyncWebsocketClient -where - I: serde::Serialize, -{ - type Error = anyhow::Error; - - fn poll_ready( - mut self: core::pin::Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> core::task::Poll> { - match Pin::new(&mut self.inner).poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(error)) => Poll::Ready(Err!(error)), - Poll::Pending => Poll::Pending, - } - } - - fn start_send(mut self: core::pin::Pin<&mut Self>, item: I) -> Result<()> { - match serde_json::to_string(&item) { - Ok(json) => { - match Pin::new(&mut self.inner).start_send(TungsteniteMessage::Text(json)) { - Ok(()) => Ok(()), - Err(error) => Err!(error), - } - } - Err(error) => Err!(error), - } - } - - fn poll_flush( - mut self: core::pin::Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> core::task::Poll> { - match Pin::new(&mut self.inner).poll_flush(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(error)) => Poll::Ready(Err!(error)), - Poll::Pending => Poll::Pending, - } - } - - fn poll_close( - mut self: core::pin::Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> core::task::Poll> { - match Pin::new(&mut self.inner).poll_close(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(error)) => Poll::Ready(Err!(error)), - Poll::Pending => Poll::Pending, - } - } -} - -impl Stream for AsyncWebsocketClient { - type Item = > as Stream>::Item; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> Poll> { - match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} - -impl AsyncWebsocketClient { - pub async fn open(uri: Url) -> Result> { - match tungstenite_connect_async(uri).await { - Ok((websocket_stream, _)) => Ok(AsyncWebsocketClient { - inner: websocket_stream, - status: PhantomData::, - }), - Err(error) => { - Err!(XRPLWebsocketException::UnableToConnect::( - error - )) - } - } - } -} diff --git a/src/asynch/clients/websocket/codec.rs b/src/asynch/clients/websocket/codec.rs new file mode 100644 index 00000000..95e9b09b --- /dev/null +++ b/src/asynch/clients/websocket/codec.rs @@ -0,0 +1,30 @@ +use alloc::{io, vec::Vec}; +use bytes::{BufMut, BytesMut}; +use tokio_util::codec::{Decoder, Encoder}; + +pub struct Codec; + +impl Decoder for Codec { + type Item = Vec; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result>, io::Error> { + if !src.is_empty() { + let len = src.len(); + let data = src.split_to(len).to_vec(); + Ok(Some(data)) + } else { + Ok(None) + } + } +} + +impl Encoder<&[u8]> for Codec { + type Error = io::Error; + + fn encode(&mut self, data: &[u8], buf: &mut BytesMut) -> Result<(), io::Error> { + buf.reserve(data.len()); + buf.put(data); + Ok(()) + } +} diff --git a/src/asynch/clients/websocket/embedded_websocket.rs b/src/asynch/clients/websocket/embedded_websocket.rs new file mode 100644 index 00000000..77c48dd8 --- /dev/null +++ b/src/asynch/clients/websocket/embedded_websocket.rs @@ -0,0 +1,297 @@ +use core::{ + fmt::{Debug, Display}, + marker::PhantomData, + ops::{Deref, DerefMut}, +}; + +use alloc::{ + string::{String, ToString}, + sync::Arc, +}; +use anyhow::Result; +use embassy_sync::blocking_mutex::raw::RawMutex; +use embassy_sync::mutex::Mutex; +use embedded_io_async::{ErrorType, Read, Write}; +use embedded_websocket::{ + framer_async::{Framer, ReadResult}, + Client, WebSocketClient, WebSocketOptions, WebSocketSendMessageType, +}; +use futures_core::Stream; +use futures_sink::Sink; +use rand_core::RngCore; +use serde::{Deserialize, Serialize}; +use url::Url; + +use super::{SingleExecutorMutex, WebsocketClosed, WebsocketOpen}; +use crate::{ + asynch::clients::{ + client::Client as ClientTrait, + websocket::websocket_base::{MessageHandler, WebsocketBase}, + }, + models::{requests::Request, results::XRPLResponse}, + Err, +}; + +use super::exceptions::XRPLWebsocketException; + +pub struct AsyncWebsocketClient< + const BUF: usize, + Tcp, + B, + E, + Rng: RngCore, + M = SingleExecutorMutex, + Status = WebsocketClosed, +> where + M: RawMutex, + B: Deref + AsRef<[u8]>, + Tcp: Stream> + for<'a> Sink<&'a [u8], Error = E> + Unpin, +{ + tcp: Arc>, + websocket: Arc>>, + tx_buffer: [u8; BUF], + websocket_base: Arc>>, + status: PhantomData, +} + +impl + AsyncWebsocketClient +where + M: RawMutex, + B: Deref + AsRef<[u8]>, + E: Debug + Display, + Tcp: Stream> + for<'a> Sink<&'a [u8], Error = E> + Unpin, +{ + pub async fn open( + rng: Rng, + tcp: Tcp, + url: Url, + ) -> Result> { + // replace the scheme with http or https + let scheme = match url.scheme() { + "wss" => "https", + "ws" => "http", + _ => url.scheme(), + }; + let port = match url.port() { + Some(port) => port, + None => match url.scheme() { + "wss" => 443, + "ws" => 80, + _ => 80, + }, + } + .to_string(); + let path = url.path(); + let host = match url.host_str() { + Some(host) => host, + None => return Err!(XRPLWebsocketException::::Disconnected), + }; + let origin = scheme.to_string() + "://" + host + ":" + &port + path; + let websocket_options = WebSocketOptions { + path, + host, + origin: &origin, + sub_protocols: None, + additional_headers: None, + }; + let websocket = Arc::new(Mutex::new(Framer::new(WebSocketClient::new_client(rng)))); + let tcp = Arc::new(Mutex::new(tcp)); + let mut buffer = [0; BUF]; + if let Err(error) = websocket + .lock() + .await + .connect( + tcp.lock().await.deref_mut(), + &mut buffer, + &websocket_options, + ) + .await + { + match error { + // FramerError::WebSocket(embedded_websocket::Error::HttpResponseCodeInvalid( + // Some(308), + // )) => (), + error => return Err!(XRPLWebsocketException::from(error)), + } + } + + Ok(AsyncWebsocketClient { + tcp, + websocket, + tx_buffer: buffer, + websocket_base: Arc::new(Mutex::new(WebsocketBase::new())), + status: PhantomData::, + }) + } +} + +impl + AsyncWebsocketClient +where + M: RawMutex, + B: Deref + AsRef<[u8]>, + E: Debug + Display, + Tcp: Stream> + for<'a> Sink<&'a [u8], Error = E> + Unpin, +{ + async fn do_write(&self, buf: &[u8]) -> Result::Error> { + let mut inner = self.websocket.lock().await; + let mut tcp = self.tcp.lock().await; + let mut buffer = self.tx_buffer; + match inner + .write( + tcp.deref_mut(), + &mut buffer, + WebSocketSendMessageType::Text, + false, + buf, + ) + .await + { + Ok(()) => Ok(buf.len()), + Err(error) => Err(XRPLWebsocketException::::from(error)), + } + } + + async fn do_read(&self, buf: &mut [u8]) -> Result::Error> { + let mut inner = self.websocket.lock().await; + let mut tcp = self.tcp.lock().await; + match inner.read(tcp.deref_mut(), buf).await { + Some(Ok(ReadResult::Text(t))) => Ok(t.len()), + Some(Ok(ReadResult::Binary(b))) => Ok(b.len()), + Some(Ok(ReadResult::Ping(_))) => Ok(0), + Some(Ok(ReadResult::Pong(_))) => Ok(0), + Some(Ok(ReadResult::Close(_))) => Err(XRPLWebsocketException::::Disconnected), + Some(Err(error)) => Err(XRPLWebsocketException::::from(error)), + None => Err(XRPLWebsocketException::::Disconnected), + } + } +} + +impl ErrorType + for AsyncWebsocketClient +where + M: RawMutex, + B: Deref + AsRef<[u8]>, + E: Debug + Display, + Tcp: Stream> + for<'a> Sink<&'a [u8], Error = E> + Unpin, +{ + type Error = XRPLWebsocketException; +} + +impl Write + for AsyncWebsocketClient +where + M: RawMutex, + B: Deref + AsRef<[u8]>, + E: Debug + Display, + Tcp: Stream> + for<'a> Sink<&'a [u8], Error = E> + Unpin, +{ + async fn write(&mut self, buf: &[u8]) -> Result { + self.do_write(buf).await + } +} + +impl Read + for AsyncWebsocketClient +where + M: RawMutex, + B: Deref + AsRef<[u8]>, + E: Debug + Display, + Tcp: Stream> + for<'a> Sink<&'a [u8], Error = E> + Unpin, +{ + async fn read(&mut self, buf: &mut [u8]) -> Result { + self.do_read(buf).await + } +} + +impl MessageHandler + for AsyncWebsocketClient +where + M: RawMutex, + B: Deref + AsRef<[u8]>, + E: Debug + Display, + Tcp: Stream> + for<'a> Sink<&'a [u8], Error = E> + Unpin, +{ + async fn setup_request_future(&mut self, id: String) { + let mut websocket_base = self.websocket_base.lock().await; + websocket_base.setup_request_future(id).await; + } + + async fn handle_message(&mut self, message: String) -> Result<()> { + let mut websocket_base = self.websocket_base.lock().await; + websocket_base.handle_message(message).await + } + + async fn pop_message(&mut self) -> String { + let mut websocket_base = self.websocket_base.lock().await; + websocket_base.pop_message().await + } + + async fn try_recv_request(&mut self, id: String) -> Result> { + let mut websocket_base = self.websocket_base.lock().await; + websocket_base.try_recv_request(id).await + } +} + +impl<'a, const BUF: usize, M, Tcp, B, E, Rng: RngCore> ClientTrait<'a> + for AsyncWebsocketClient +where + M: RawMutex, + B: Deref + AsRef<[u8]>, + E: Debug + Display, + Tcp: Stream> + for<'b> Sink<&'b [u8], Error = E> + Unpin, +{ + async fn request_impl< + Res: Serialize + for<'de> Deserialize<'de>, + Req: Serialize + for<'de> Deserialize<'de> + Request<'a>, + >( + &'a self, + mut request: Req, + ) -> Result> { + // setup request future + let request_id = self.set_request_id::(&mut request); + let mut websocket_base = self.websocket_base.lock().await; + websocket_base + .setup_request_future(request_id.to_string()) + .await; + // send request + let request_string = match serde_json::to_string(&request) { + Ok(request_string) => request_string, + Err(error) => return Err!(error), + }; + if let Err(error) = self.do_write(request_string.as_bytes()).await { + return Err!(error); + } + // wait for response + loop { + let mut rx_buffer = [0; 1024]; + match self.do_read(&mut rx_buffer).await { + Ok(u_size) => { + // If the buffer is empty, continue to the next iteration. + if u_size == 0 { + continue; + } + let message_str = match core::str::from_utf8(&rx_buffer[..u_size]) { + Ok(response_str) => response_str, + Err(error) => return Err!(XRPLWebsocketException::::Utf8(error)), + }; + websocket_base + .handle_message(message_str.to_string()) + .await?; + let message_opt = websocket_base + .try_recv_request(request_id.to_string()) + .await?; + if let Some(message) = message_opt { + let response = match serde_json::from_str(&message) { + Ok(response) => response, + Err(error) => return Err!(error), + }; + return Ok(response); + } + } + Err(error) => return Err!(error), + } + } + } +} diff --git a/src/asynch/clients/exceptions.rs b/src/asynch/clients/websocket/exceptions.rs similarity index 68% rename from src/asynch/clients/exceptions.rs rename to src/asynch/clients/websocket/exceptions.rs index 220084cb..88b1161a 100644 --- a/src/asynch/clients/exceptions.rs +++ b/src/asynch/clients/websocket/exceptions.rs @@ -1,6 +1,8 @@ use core::fmt::Debug; use core::str::Utf8Error; #[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] +use embedded_io_async::{Error as EmbeddedIoError, ErrorKind}; +#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] use embedded_websocket::framer_async::FramerError; use thiserror_no_std::Error; @@ -25,6 +27,17 @@ pub enum XRPLWebsocketException { Disconnected, #[error("Read buffer is too small (size: {0:?})")] RxBufferTooSmall(usize), + #[error("Unexpected message type")] + UnexpectedMessageType, + #[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] + #[error("Embedded I/O error: {0:?}")] + EmbeddedIoError(ErrorKind), + #[error("Missing request channel sender.")] + MissingRequestSender, + #[error("Missing request channel receiver.")] + MissingRequestReceiver, + #[error("Invalid message.")] + InvalidMessage, } #[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] @@ -42,5 +55,15 @@ impl From> for XRPLWebsocketException { } } +#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] +impl EmbeddedIoError for XRPLWebsocketException { + fn kind(&self) -> ErrorKind { + match self { + XRPLWebsocketException::EmbeddedIoError(e) => e.kind(), + _ => ErrorKind::Other, + } + } +} + #[cfg(feature = "std")] impl alloc::error::Error for XRPLWebsocketException {} diff --git a/src/asynch/clients/websocket/mod.rs b/src/asynch/clients/websocket/mod.rs new file mode 100644 index 00000000..6cc1d398 --- /dev/null +++ b/src/asynch/clients/websocket/mod.rs @@ -0,0 +1,137 @@ +use core::fmt::{Debug, Display}; + +use crate::{models::results::XRPLResponse, Err}; +#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] +use alloc::string::String; +#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] +use alloc::string::ToString; +use anyhow::Result; +use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; +#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] +use embedded_io_async::{ErrorType, Read as EmbeddedIoRead, Write as EmbeddedIoWrite}; +#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] +use exceptions::XRPLWebsocketException; +#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] +use futures::{Sink, SinkExt, Stream, StreamExt}; +use serde::{Deserialize, Serialize}; + +mod websocket_base; +use websocket_base::MessageHandler; + +#[cfg(all(feature = "embedded-ws", feature = "std", not(feature = "tungstenite")))] +pub mod codec; +#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] +mod embedded_websocket; +pub mod exceptions; +#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] +mod tungstenite; + +#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] +pub use embedded_websocket::AsyncWebsocketClient; +#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] +pub use tungstenite::AsyncWebsocketClient; + +pub struct WebsocketOpen; +pub struct WebsocketClosed; + +pub type MultiExecutorMutex = CriticalSectionRawMutex; +pub type SingleExecutorMutex = NoopRawMutex; + +#[allow(async_fn_in_trait)] +pub trait XRPLWebsocketIO { + async fn xrpl_send(&mut self, message: Req) -> Result<()>; + async fn xrpl_receive< + Res: Serialize + for<'de> Deserialize<'de> + Debug, + Req: Serialize + for<'de> Deserialize<'de> + Debug, + >( + &mut self, + ) -> Result>>; +} + +#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] +impl XRPLWebsocketIO for T +where + ::Error: Display, +{ + async fn xrpl_send(&mut self, message: Req) -> Result<()> { + let message = match serde_json::to_string(&message) { + Ok(message) => message, + Err(error) => return Err!(error), + }; + let message_buffer = message.as_bytes(); + match self.write(message_buffer).await { + Ok(_) => Ok(()), + Err(e) => Err!(e), + } + } + + async fn xrpl_receive< + Res: Serialize + for<'de> Deserialize<'de> + Debug, + Req: Serialize + for<'de> Deserialize<'de> + Debug, + >( + &mut self, + ) -> Result>> { + let mut buffer = [0; 1024]; + loop { + match self.read(&mut buffer).await { + Ok(u_size) => { + // If the buffer is empty, continue to the next iteration. + if u_size == 0 { + continue; + } + let response_str = match core::str::from_utf8(&buffer[..u_size]) { + Ok(response_str) => response_str, + Err(error) => { + return Err!(XRPLWebsocketException::::Utf8(error)) + } + }; + self.handle_message(response_str.to_string()).await?; + let message = self.pop_message().await; + match serde_json::from_str(&message) { + Ok(response) => return Ok(response), + Err(error) => return Err!(error), + } + } + Err(error) => return Err!(error), + } + } + } +} + +#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] +impl XRPLWebsocketIO for T +where + T: Stream> + Sink + MessageHandler + Unpin, + >::Error: Debug + Display, +{ + async fn xrpl_send(&mut self, message: Req) -> Result<()> { + let message = match serde_json::to_string(&message) { + Ok(message) => message, + Err(error) => return Err!(error), + }; + match self.send(message).await { + Ok(()) => Ok(()), + Err(error) => Err!(error), + } + } + + async fn xrpl_receive< + Res: Serialize + for<'de> Deserialize<'de> + Debug, + Req: Serialize + for<'de> Deserialize<'de> + Debug, + >( + &mut self, + ) -> Result>> { + match self.next().await { + Some(Ok(item)) => { + self.handle_message(item).await?; + let message = self.pop_message().await; + match serde_json::from_str(&message) { + Ok(response) => Ok(response), + Err(error) => Err!(error), + } + } + Some(Err(error)) => Err!(error), + None => Ok(None), + } + } +} diff --git a/src/asynch/clients/websocket/tungstenite.rs b/src/asynch/clients/websocket/tungstenite.rs new file mode 100644 index 00000000..0f85ceba --- /dev/null +++ b/src/asynch/clients/websocket/tungstenite.rs @@ -0,0 +1,265 @@ +use super::exceptions::XRPLWebsocketException; +use super::{SingleExecutorMutex, WebsocketClosed, WebsocketOpen}; +use crate::asynch::clients::client::Client; +use crate::asynch::clients::websocket::websocket_base::{MessageHandler, WebsocketBase}; +use crate::models::requests::Request; +use crate::models::results::XRPLResponse; +use crate::Err; + +use alloc::string::{String, ToString}; +use alloc::sync::Arc; +use anyhow::Result; +use core::marker::PhantomData; +use core::{pin::Pin, task::Poll}; +use embassy_futures::block_on; +use embassy_sync::blocking_mutex::raw::RawMutex; +use embassy_sync::mutex::Mutex; +use futures::{Sink, Stream, StreamExt}; +use futures_util::SinkExt; +use serde::{Deserialize, Serialize}; +use tokio::net::TcpStream; +use tokio_tungstenite::{ + connect_async as tungstenite_connect_async, MaybeTlsStream as TungsteniteMaybeTlsStream, + WebSocketStream as TungsteniteWebsocketStream, +}; +use url::Url; + +pub use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; + +pub type AsyncWebsocketConnection = + Arc>>>; + +pub struct AsyncWebsocketClient +where + M: RawMutex, +{ + websocket: AsyncWebsocketConnection, + websocket_base: Arc>>, + status: PhantomData, +} + +impl Sink for AsyncWebsocketClient +where + M: RawMutex, + Self: Unpin, +{ + type Error = anyhow::Error; + + fn poll_ready( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + let mut guard = block_on(self.websocket.lock()); + match Pin::new(&mut *guard).poll_ready(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(error)) => Poll::Ready(Err!(error)), + Poll::Pending => Poll::Pending, + } + } + + fn start_send(self: core::pin::Pin<&mut Self>, item: String) -> Result<()> { + let mut guard = block_on(self.websocket.lock()); + match Pin::new(&mut *guard).start_send(TungsteniteMessage::Text(item)) { + Ok(()) => Ok(()), + Err(error) => Err!(error), + } + } + + fn poll_flush( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + let mut guard = block_on(self.websocket.lock()); + match Pin::new(&mut *guard).poll_flush(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(error)) => Poll::Ready(Err!(error)), + Poll::Pending => Poll::Pending, + } + } + + fn poll_close( + self: core::pin::Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> core::task::Poll> { + let mut guard = block_on(self.websocket.lock()); + match Pin::new(&mut *guard).poll_close(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(error)) => Poll::Ready(Err!(error)), + Poll::Pending => Poll::Pending, + } + } +} + +impl Stream for AsyncWebsocketClient +where + M: RawMutex, +{ + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut core::task::Context<'_>, + ) -> Poll> { + let mut guard = block_on(self.websocket.lock()); + match Pin::new(&mut *guard).poll_next(cx) { + Poll::Ready(Some(item)) => match item { + Ok(message) => match message { + TungsteniteMessage::Text(response) => Poll::Ready(Some(Ok(response))), + TungsteniteMessage::Binary(response) => { + let response_string = match String::from_utf8(response) { + Ok(string) => string, + Err(error) => { + return Poll::Ready(Some(Err!(XRPLWebsocketException::< + anyhow::Error, + >::Utf8( + error.utf8_error() + )))); + } + }; + Poll::Ready(Some(Ok(response_string))) + } + TungsteniteMessage::Close(_) => Poll::Ready(Some(Err!( + XRPLWebsocketException::::Disconnected + ))), + _ => Poll::Ready(Some(Err!( + XRPLWebsocketException::::UnexpectedMessageType + ))), + }, + Err(error) => Poll::Ready(Some(Err!(error))), + }, + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl AsyncWebsocketClient +where + M: RawMutex, +{ + pub async fn open(uri: Url) -> Result> { + match tungstenite_connect_async(uri).await { + Ok((websocket_stream, _)) => Ok(AsyncWebsocketClient { + websocket: Arc::new(Mutex::new(websocket_stream)), + websocket_base: Arc::new(Mutex::new(WebsocketBase::new())), + status: PhantomData::, + }), + Err(error) => { + Err!(XRPLWebsocketException::UnableToConnect::( + error + )) + } + } + } +} + +impl MessageHandler for AsyncWebsocketClient +where + M: RawMutex, +{ + async fn setup_request_future(&mut self, id: String) { + let mut websocket_base = self.websocket_base.lock().await; + websocket_base.setup_request_future(id).await; + } + + async fn handle_message(&mut self, message: String) -> Result<()> { + let mut websocket_base = self.websocket_base.lock().await; + websocket_base.handle_message(message).await + } + + async fn pop_message(&mut self) -> String { + let mut websocket_base = self.websocket_base.lock().await; + websocket_base.pop_message().await + } + + async fn try_recv_request(&mut self, id: String) -> Result> { + let mut websocket_base = self.websocket_base.lock().await; + websocket_base.try_recv_request(id).await + } +} + +impl<'a, M> Client<'a> for AsyncWebsocketClient +where + M: RawMutex, +{ + async fn request_impl< + Res: Serialize + for<'de> Deserialize<'de>, + Req: Serialize + for<'de> Deserialize<'de> + Request<'a>, + >( + &'a self, + mut request: Req, + ) -> Result> { + // setup request future + let request_id = self.set_request_id::(&mut request); + let mut websocket_base = self.websocket_base.lock().await; + websocket_base + .setup_request_future(request_id.to_string()) + .await; + // send request + let mut websocket = self.websocket.lock().await; + let request_string = match serde_json::to_string(&request) { + Ok(request_string) => request_string, + Err(error) => return Err!(error), + }; + if let Err(error) = websocket + .send(TungsteniteMessage::Text(request_string)) + .await + { + return Err!(error); + } + // wait for response + loop { + let message = websocket.next().await; + match message { + Some(Ok(TungsteniteMessage::Text(message))) => { + websocket_base.handle_message(message).await?; + let message_opt = websocket_base + .try_recv_request(request_id.to_string()) + .await?; + if let Some(message) = message_opt { + let response = match serde_json::from_str(&message) { + Ok(response) => response, + Err(error) => return Err!(error), + }; + return Ok(response); + } + } + Some(Ok(TungsteniteMessage::Binary(response))) => { + let message = match String::from_utf8(response) { + Ok(string) => string, + Err(error) => { + return Err!(XRPLWebsocketException::::Utf8( + error.utf8_error() + )); + } + }; + match serde_json::from_str(&message) { + Ok(response) => return Ok(response), + Err(error) => return Err!(error), + } + } + Some(Ok(TungsteniteMessage::Close(_))) => { + return Err!(XRPLWebsocketException::::Disconnected) + } + Some(Ok(_)) => { + return Err!(XRPLWebsocketException::::UnexpectedMessageType); + } + Some(Err(error)) => return Err!(error), + None => continue, + } + } + } + + // async fn get_common_fields(&self) -> Result> { + // let server_state = self + // .request::(requests::ServerState::new(None)) + // .await?; + // let state = server_state.result.state; + // let common_fields = CommonFields { + // network_id: state.network_id, + // build_version: Some(state.build_version), + // }; + + // Ok(common_fields) + // } +} diff --git a/src/asynch/clients/websocket/websocket_base.rs b/src/asynch/clients/websocket/websocket_base.rs new file mode 100644 index 00000000..ab43e3a8 --- /dev/null +++ b/src/asynch/clients/websocket/websocket_base.rs @@ -0,0 +1,112 @@ +use alloc::string::{String, ToString}; +use anyhow::Result; +use embassy_sync::{blocking_mutex::raw::RawMutex, channel::Channel}; +use futures::channel::oneshot::{self, Receiver, Sender}; +use hashbrown::HashMap; +use serde_json::Value; + +use crate::{asynch::clients::exceptions::XRPLWebsocketException, Err}; + +const _MAX_CHANNEL_MSG_CNT: usize = 10; + +/// A struct that handles futures of websocket messages. +pub struct WebsocketBase +where + M: RawMutex, +{ + /// The messages the user requests, which means he is waiting for a specific `id`. + pending_requests: HashMap>, + request_senders: HashMap>, + /// The messages the user waits for when sending and receiving normally. + messages: Channel, +} + +impl WebsocketBase +where + M: RawMutex, +{ + pub fn new() -> Self { + Self { + pending_requests: HashMap::new(), + request_senders: HashMap::new(), + messages: Channel::new(), + } + } + + pub fn close(&mut self) { + self.pending_requests.clear(); + self.request_senders.clear(); + self.messages.clear(); + } +} + +pub(crate) trait MessageHandler { + /// Setup an empty future for a request. + async fn setup_request_future(&mut self, id: String); + async fn handle_message(&mut self, message: String) -> Result<()>; + async fn pop_message(&mut self) -> String; + async fn try_recv_request(&mut self, id: String) -> Result>; +} + +impl MessageHandler for WebsocketBase +where + M: RawMutex, +{ + async fn setup_request_future(&mut self, id: String) { + if self.pending_requests.contains_key(&id) { + return; + } + let (sender, receiver) = oneshot::channel::(); + self.pending_requests.insert(id.clone(), receiver); + self.request_senders.insert(id, sender); + } + + async fn handle_message(&mut self, message: String) -> Result<()> { + let message_value: Value = match serde_json::from_str(&message) { + Ok(value) => value, + Err(error) => return Err!(error), + }; + let id = match message_value.get("id") { + Some(id) => match id.as_str() { + Some(id) => id.to_string(), + None => return Err!(XRPLWebsocketException::::InvalidMessage), + }, + None => String::new(), + }; + if let Some(_receiver) = self.pending_requests.get(&id) { + let sender = match self.request_senders.remove(&id) { + Some(sender) => sender, + None => return Err!(XRPLWebsocketException::::MissingRequestSender), + }; + match sender.send(message) { + Ok(()) => (), + Err(error) => return Err!(error), + }; + } else { + self.messages.send(message).await; + } + Ok(()) + } + + async fn pop_message(&mut self) -> String { + self.messages.receive().await + } + + async fn try_recv_request(&mut self, id: String) -> Result> { + let fut = match self.pending_requests.get_mut(&id) { + Some(fut) => fut, + None => return Err!(XRPLWebsocketException::::MissingRequestReceiver), + }; + match fut.try_recv() { + Ok(Some(message)) => { + // Remove the future from the hashmap. + self.pending_requests.remove(&id); + Ok(Some(message)) + } + Ok(None) => Ok(None), + Err(error) => { + return Err!(error); + } + } + } +} diff --git a/src/models/mod.rs b/src/models/mod.rs index 7cd5e1dc..6d9c5ebd 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -14,6 +14,8 @@ pub mod model; #[cfg(feature = "requests")] #[allow(clippy::too_many_arguments)] pub mod requests; +#[cfg(feature = "results")] +pub mod results; #[cfg(feature = "transactions")] #[allow(clippy::too_many_arguments)] pub mod transactions; diff --git a/src/models/requests/account_channels.rs b/src/models/requests/account_channels.rs index 5965eb52..df0753ed 100644 --- a/src/models/requests/account_channels.rs +++ b/src/models/requests/account_channels.rs @@ -4,7 +4,7 @@ use serde_with::skip_serializing_none; use crate::models::{requests::RequestMethod, Model}; -use super::CommonFields; +use super::{CommonFields, Request}; /// This request returns information about an account's Payment /// Channels. This includes only channels where the specified @@ -85,3 +85,13 @@ impl<'a> AccountChannels<'a> { } } } + +impl<'a> Request<'a> for AccountChannels<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields + } +} diff --git a/src/models/requests/account_currencies.rs b/src/models/requests/account_currencies.rs index c6307543..4ff2e373 100644 --- a/src/models/requests/account_currencies.rs +++ b/src/models/requests/account_currencies.rs @@ -37,9 +37,13 @@ pub struct AccountCurrencies<'a> { impl<'a> Model for AccountCurrencies<'a> {} -impl<'a> Request for AccountCurrencies<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for AccountCurrencies<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/account_info.rs b/src/models/requests/account_info.rs index 1db06aa4..84d8f42d 100644 --- a/src/models/requests/account_info.rs +++ b/src/models/requests/account_info.rs @@ -44,9 +44,13 @@ pub struct AccountInfo<'a> { impl<'a> Model for AccountInfo<'a> {} -impl<'a> Request for AccountInfo<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for AccountInfo<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/account_lines.rs b/src/models/requests/account_lines.rs index 432f74d4..3571cf2a 100644 --- a/src/models/requests/account_lines.rs +++ b/src/models/requests/account_lines.rs @@ -38,9 +38,13 @@ pub struct AccountLines<'a> { impl<'a> Model for AccountLines<'a> {} -impl<'a> Request for AccountLines<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for AccountLines<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/account_nfts.rs b/src/models/requests/account_nfts.rs index 2447f3af..a9e79047 100644 --- a/src/models/requests/account_nfts.rs +++ b/src/models/requests/account_nfts.rs @@ -29,9 +29,13 @@ pub struct AccountNfts<'a> { impl<'a> Model for AccountNfts<'a> {} -impl<'a> Request for AccountNfts<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for AccountNfts<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/account_objects.rs b/src/models/requests/account_objects.rs index c51bcc4f..d78c4381 100644 --- a/src/models/requests/account_objects.rs +++ b/src/models/requests/account_objects.rs @@ -63,9 +63,13 @@ pub struct AccountObjects<'a> { impl<'a> Model for AccountObjects<'a> {} -impl<'a> Request for AccountObjects<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for AccountObjects<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/account_offers.rs b/src/models/requests/account_offers.rs index 5e2cfb02..647d3f3e 100644 --- a/src/models/requests/account_offers.rs +++ b/src/models/requests/account_offers.rs @@ -40,9 +40,13 @@ pub struct AccountOffers<'a> { impl<'a> Model for AccountOffers<'a> {} -impl<'a> Request for AccountOffers<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for AccountOffers<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/account_tx.rs b/src/models/requests/account_tx.rs index 48da5afd..1f16e80a 100644 --- a/src/models/requests/account_tx.rs +++ b/src/models/requests/account_tx.rs @@ -53,9 +53,13 @@ pub struct AccountTx<'a> { impl<'a> Model for AccountTx<'a> {} -impl<'a> Request for AccountTx<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for AccountTx<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/book_offers.rs b/src/models/requests/book_offers.rs index 57887d38..ca27a3d5 100644 --- a/src/models/requests/book_offers.rs +++ b/src/models/requests/book_offers.rs @@ -46,9 +46,13 @@ pub struct BookOffers<'a> { impl<'a> Model for BookOffers<'a> {} -impl<'a> Request for BookOffers<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for BookOffers<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/channel_authorize.rs b/src/models/requests/channel_authorize.rs index 835cdd7e..466fa425 100644 --- a/src/models/requests/channel_authorize.rs +++ b/src/models/requests/channel_authorize.rs @@ -80,9 +80,13 @@ impl<'a> Model for ChannelAuthorize<'a> { } } -impl<'a> Request for ChannelAuthorize<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for ChannelAuthorize<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/channel_verify.rs b/src/models/requests/channel_verify.rs index 949b6ad1..59e814a9 100644 --- a/src/models/requests/channel_verify.rs +++ b/src/models/requests/channel_verify.rs @@ -30,9 +30,13 @@ pub struct ChannelVerify<'a> { impl<'a> Model for ChannelVerify<'a> {} -impl<'a> Request for ChannelVerify<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for ChannelVerify<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/deposit_authorize.rs b/src/models/requests/deposit_authorize.rs index 5204bc69..2c58e473 100644 --- a/src/models/requests/deposit_authorize.rs +++ b/src/models/requests/deposit_authorize.rs @@ -30,9 +30,13 @@ pub struct DepositAuthorized<'a> { impl<'a> Model for DepositAuthorized<'a> {} -impl<'a> Request for DepositAuthorized<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for DepositAuthorized<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/fee.rs b/src/models/requests/fee.rs index 2ab0ee8a..05052d96 100644 --- a/src/models/requests/fee.rs +++ b/src/models/requests/fee.rs @@ -23,9 +23,13 @@ pub struct Fee<'a> { impl<'a> Model for Fee<'a> {} -impl<'a> Request for Fee<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for Fee<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/gateway_balances.rs b/src/models/requests/gateway_balances.rs index 58dc61e7..e5f38386 100644 --- a/src/models/requests/gateway_balances.rs +++ b/src/models/requests/gateway_balances.rs @@ -36,9 +36,13 @@ pub struct GatewayBalances<'a> { impl<'a> Model for GatewayBalances<'a> {} -impl<'a> Request for GatewayBalances<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for GatewayBalances<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/ledger.rs b/src/models/requests/ledger.rs index 7ba5faec..d567cd41 100644 --- a/src/models/requests/ledger.rs +++ b/src/models/requests/ledger.rs @@ -58,9 +58,13 @@ pub struct Ledger<'a> { impl<'a> Model for Ledger<'a> {} -impl<'a> Request for Ledger<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for Ledger<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/ledger_closed.rs b/src/models/requests/ledger_closed.rs index c65f486c..7b601ea6 100644 --- a/src/models/requests/ledger_closed.rs +++ b/src/models/requests/ledger_closed.rs @@ -22,9 +22,13 @@ pub struct LedgerClosed<'a> { impl<'a> Model for LedgerClosed<'a> {} -impl<'a> Request for LedgerClosed<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for LedgerClosed<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/ledger_current.rs b/src/models/requests/ledger_current.rs index 9b408ee6..aec482f6 100644 --- a/src/models/requests/ledger_current.rs +++ b/src/models/requests/ledger_current.rs @@ -22,9 +22,13 @@ pub struct LedgerCurrent<'a> { impl<'a> Model for LedgerCurrent<'a> {} -impl<'a> Request for LedgerCurrent<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for LedgerCurrent<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/ledger_data.rs b/src/models/requests/ledger_data.rs index c71e012c..c30bb753 100644 --- a/src/models/requests/ledger_data.rs +++ b/src/models/requests/ledger_data.rs @@ -36,9 +36,13 @@ pub struct LedgerData<'a> { impl<'a> Model for LedgerData<'a> {} -impl<'a> Request for LedgerData<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for LedgerData<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/ledger_entry.rs b/src/models/requests/ledger_entry.rs index 1dbc4c1d..0022437c 100644 --- a/src/models/requests/ledger_entry.rs +++ b/src/models/requests/ledger_entry.rs @@ -159,9 +159,13 @@ impl<'a> LedgerEntryError for LedgerEntry<'a> { } } -impl<'a> Request for LedgerEntry<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for LedgerEntry<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/manifest.rs b/src/models/requests/manifest.rs index d2615195..e9f38d94 100644 --- a/src/models/requests/manifest.rs +++ b/src/models/requests/manifest.rs @@ -27,9 +27,13 @@ pub struct Manifest<'a> { impl<'a> Model for Manifest<'a> {} -impl<'a> Request for Manifest<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for Manifest<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/mod.rs b/src/models/requests/mod.rs index 860b7504..3d81058f 100644 --- a/src/models/requests/mod.rs +++ b/src/models/requests/mod.rs @@ -148,14 +148,9 @@ pub struct CommonFields<'a> { pub id: Option>, } -impl Request for CommonFields<'_> { - fn get_command(&self) -> RequestMethod { - self.command.clone() - } -} - /// The base trait for all request models. /// Used to identify the model as a request. -pub trait Request { - fn get_command(&self) -> RequestMethod; +pub trait Request<'a> { + fn get_common_fields(&self) -> &CommonFields<'a>; + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a>; } diff --git a/src/models/requests/nft_buy_offers.rs b/src/models/requests/nft_buy_offers.rs index 3e9ce276..d8a4c4c5 100644 --- a/src/models/requests/nft_buy_offers.rs +++ b/src/models/requests/nft_buy_offers.rs @@ -31,9 +31,13 @@ pub struct NftBuyOffers<'a> { impl<'a> Model for NftBuyOffers<'a> {} -impl<'a> Request for NftBuyOffers<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for NftBuyOffers<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/nft_sell_offers.rs b/src/models/requests/nft_sell_offers.rs index 7404d59c..fa4dd6f9 100644 --- a/src/models/requests/nft_sell_offers.rs +++ b/src/models/requests/nft_sell_offers.rs @@ -19,9 +19,13 @@ pub struct NftSellOffers<'a> { impl<'a> Model for NftSellOffers<'a> {} -impl<'a> Request for NftSellOffers<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for NftSellOffers<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/no_ripple_check.rs b/src/models/requests/no_ripple_check.rs index f2467efa..21bffb7f 100644 --- a/src/models/requests/no_ripple_check.rs +++ b/src/models/requests/no_ripple_check.rs @@ -58,9 +58,13 @@ pub struct NoRippleCheck<'a> { impl<'a> Model for NoRippleCheck<'a> {} -impl<'a> Request for NoRippleCheck<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for NoRippleCheck<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/path_find.rs b/src/models/requests/path_find.rs index e2ab7074..55b34865 100644 --- a/src/models/requests/path_find.rs +++ b/src/models/requests/path_find.rs @@ -90,9 +90,13 @@ pub struct PathFind<'a> { impl<'a> Model for PathFind<'a> {} -impl<'a> Request for PathFind<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for PathFind<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/ping.rs b/src/models/requests/ping.rs index 079b81be..5c70e863 100644 --- a/src/models/requests/ping.rs +++ b/src/models/requests/ping.rs @@ -21,9 +21,13 @@ pub struct Ping<'a> { impl<'a> Model for Ping<'a> {} -impl<'a> Request for Ping<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for Ping<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/random.rs b/src/models/requests/random.rs index 8bc0029f..19c7e986 100644 --- a/src/models/requests/random.rs +++ b/src/models/requests/random.rs @@ -22,9 +22,13 @@ pub struct Random<'a> { impl<'a> Model for Random<'a> {} -impl<'a> Request for Random<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for Random<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/ripple_path_find.rs b/src/models/requests/ripple_path_find.rs index ac923963..4f7f5cfd 100644 --- a/src/models/requests/ripple_path_find.rs +++ b/src/models/requests/ripple_path_find.rs @@ -62,9 +62,13 @@ pub struct RipplePathFind<'a> { impl<'a> Model for RipplePathFind<'a> {} -impl<'a> Request for RipplePathFind<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for RipplePathFind<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/server_info.rs b/src/models/requests/server_info.rs index 378fc0f0..7044ee5d 100644 --- a/src/models/requests/server_info.rs +++ b/src/models/requests/server_info.rs @@ -22,9 +22,13 @@ pub struct ServerInfo<'a> { impl<'a> Model for ServerInfo<'a> {} -impl<'a> Request for ServerInfo<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for ServerInfo<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/server_state.rs b/src/models/requests/server_state.rs index 4a0b7306..85c910c5 100644 --- a/src/models/requests/server_state.rs +++ b/src/models/requests/server_state.rs @@ -27,9 +27,13 @@ pub struct ServerState<'a> { impl<'a> Model for ServerState<'a> {} -impl<'a> Request for ServerState<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for ServerState<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/submit.rs b/src/models/requests/submit.rs index 132a9910..e68dec41 100644 --- a/src/models/requests/submit.rs +++ b/src/models/requests/submit.rs @@ -49,9 +49,13 @@ pub struct Submit<'a> { impl<'a> Model for Submit<'a> {} -impl<'a> Request for Submit<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for Submit<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/submit_multisigned.rs b/src/models/requests/submit_multisigned.rs index 82fdc1cc..913539c9 100644 --- a/src/models/requests/submit_multisigned.rs +++ b/src/models/requests/submit_multisigned.rs @@ -30,9 +30,13 @@ pub struct SubmitMultisigned<'a> { impl<'a> Model for SubmitMultisigned<'a> {} -impl<'a> Request for SubmitMultisigned<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for SubmitMultisigned<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/subscribe.rs b/src/models/requests/subscribe.rs index 0487ca22..fb501f87 100644 --- a/src/models/requests/subscribe.rs +++ b/src/models/requests/subscribe.rs @@ -77,9 +77,13 @@ pub struct Subscribe<'a> { impl<'a> Model for Subscribe<'a> {} -impl<'a> Request for Subscribe<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for Subscribe<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/transaction_entry.rs b/src/models/requests/transaction_entry.rs index 24ad02cb..8f3c9885 100644 --- a/src/models/requests/transaction_entry.rs +++ b/src/models/requests/transaction_entry.rs @@ -31,9 +31,13 @@ pub struct TransactionEntry<'a> { impl<'a> Model for TransactionEntry<'a> {} -impl<'a> Request for TransactionEntry<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for TransactionEntry<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/tx.rs b/src/models/requests/tx.rs index 0dd547ee..29b88373 100644 --- a/src/models/requests/tx.rs +++ b/src/models/requests/tx.rs @@ -34,9 +34,13 @@ pub struct Tx<'a> { impl<'a> Model for Tx<'a> {} -impl<'a> Request for Tx<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for Tx<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/requests/unsubscribe.rs b/src/models/requests/unsubscribe.rs index 90f123d0..b056b688 100644 --- a/src/models/requests/unsubscribe.rs +++ b/src/models/requests/unsubscribe.rs @@ -62,9 +62,13 @@ pub struct Unsubscribe<'a> { impl<'a> Model for Unsubscribe<'a> {} -impl<'a> Request for Unsubscribe<'a> { - fn get_command(&self) -> RequestMethod { - self.common_fields.command.clone() +impl<'a> Request<'a> for Unsubscribe<'a> { + fn get_common_fields(&self) -> &CommonFields<'a> { + &self.common_fields + } + + fn get_common_fields_mut(&mut self) -> &mut CommonFields<'a> { + &mut self.common_fields } } diff --git a/src/models/results/fee.rs b/src/models/results/fee.rs new file mode 100644 index 00000000..59284566 --- /dev/null +++ b/src/models/results/fee.rs @@ -0,0 +1,16 @@ +use serde::{Deserialize, Serialize}; + +use crate::models::amount::XRPAmount; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Fee<'a> { + pub drops: Drops<'a>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Drops<'a> { + pub base_fee: XRPAmount<'a>, + pub median_fee: XRPAmount<'a>, + pub minimum_fee: XRPAmount<'a>, + pub open_ledger_fee: XRPAmount<'a>, +} diff --git a/src/models/results/mod.rs b/src/models/results/mod.rs new file mode 100644 index 00000000..5c74b564 --- /dev/null +++ b/src/models/results/mod.rs @@ -0,0 +1,103 @@ +use alloc::{borrow::Cow, string::ToString, vec::Vec}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +mod fee; +pub use fee::{Fee as FeeResult, *}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum ResponseStatus { + Success, + Error, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum ResponseType { + Response, + LedgerClosed, + Transaction, +} + +#[derive(Debug, Clone, Serialize)] +pub struct XRPLResponse<'a, Res, Req> { + pub id: Option>, + pub error: Option>, + pub error_code: Option, + pub error_message: Option>, + pub forwarded: Option, + pub request: Option, + pub result: Option, + pub status: Option, + pub r#type: Option, + pub warning: Option>, + pub warnings: Option>>, +} + +impl<'a, 'de, Res, Req> Deserialize<'de> for XRPLResponse<'a, Res, Req> +where + Res: DeserializeOwned, + Req: DeserializeOwned, +{ + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + // TODO: add validation for fields that can not coexist in the same response + let mut map = serde_json::Map::deserialize(deserializer)?; + if map.is_empty() { + return Err(serde::de::Error::custom("Empty response")); + } + Ok(XRPLResponse { + id: map.remove("id").map(|item| match item.as_str() { + Some(item_str) => Cow::Owned(item_str.to_string()), + None => Cow::Borrowed(""), + }), + error: map.remove("error").map(|item| match item.as_str() { + Some(item_str) => Cow::Owned(item_str.to_string()), + None => Cow::Borrowed(""), + }), + error_code: map + .remove("error_code") + .and_then(|v| v.as_i64()) + .map(|v| v as i32), + error_message: map.remove("error_message").map(|item| match item.as_str() { + Some(item_str) => Cow::Owned(item_str.to_string()), + None => Cow::Borrowed(""), + }), + forwarded: map.remove("forwarded").and_then(|v| v.as_bool()), + request: map + .remove("request") + .map(|v| serde_json::from_value(v).unwrap()), + result: map + .remove("result") + .map(|v| serde_json::from_value(v).unwrap()), + status: map + .remove("status") + .map(|v| serde_json::from_value(v).unwrap()), + r#type: map + .remove("type") + .map(|v| serde_json::from_value(v).unwrap()), + warning: map.remove("warning").map(|item| match item.as_str() { + Some(item_str) => Cow::Owned(item_str.to_string()), + None => Cow::Borrowed(""), + }), + warnings: map + .remove("warnings") + .and_then(|v| serde_json::from_value(v).ok()), + }) + } +} + +impl<'a, Res, Req> XRPLResponse<'a, Res, Req> { + pub fn is_success(&self) -> bool { + self.status == Some(ResponseStatus::Success) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct XRPLWarning<'a> { + pub id: Cow<'a, str>, + pub message: Cow<'a, str>, + pub forwarded: Option, +} diff --git a/src/models/transactions/escrow_finish.rs b/src/models/transactions/escrow_finish.rs index 36d4b9a1..2648498a 100644 --- a/src/models/transactions/escrow_finish.rs +++ b/src/models/transactions/escrow_finish.rs @@ -158,6 +158,8 @@ mod test_escrow_finish_errors { #[cfg(test)] mod tests { + use serde_json::Value; + use super::*; #[test] @@ -182,9 +184,9 @@ mod tests { ); let default_json_str = r#"{"Account":"rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn","TransactionType":"EscrowFinish","Owner":"rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn","OfferSequence":7,"Condition":"A0258020E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855810100","Fulfillment":"A0028000"}"#; // Serialize - let default_json_value = serde_json::to_value(default_json_str).unwrap(); - let serialized_string = serde_json::to_string(&default_txn).unwrap(); - let serialized_value = serde_json::to_value(&serialized_string).unwrap(); + let default_json_value: Value = serde_json::from_str(default_json_str).unwrap(); + // let serialized_string = serde_json::to_string(&default_txn).unwrap(); + let serialized_value = serde_json::to_value(&default_txn).unwrap(); assert_eq!(serialized_value, default_json_value); // Deserialize diff --git a/src/models/transactions/mod.rs b/src/models/transactions/mod.rs index 57ad9163..2ee57b1b 100644 --- a/src/models/transactions/mod.rs +++ b/src/models/transactions/mod.rs @@ -54,10 +54,7 @@ pub use ticket_create::*; pub use trust_set::*; use crate::models::amount::XRPAmount; -use crate::{ - _serde::txn_flags, - serde_with_tag, -}; +use crate::{_serde::txn_flags, serde_with_tag}; use alloc::borrow::Cow; use alloc::string::String; use alloc::vec::Vec; @@ -156,6 +153,7 @@ where pub fee: Option>, /// Set of bit-flags for this transaction. #[serde(with = "txn_flags")] + #[serde(default = "optional_flag_collection_default")] pub flags: Option>, /// Highest ledger index this transaction can appear in. /// Specifying this field places a strict upper limit on how long @@ -234,6 +232,13 @@ where } } +fn optional_flag_collection_default() -> Option> +where + T: IntoEnumIterator + Serialize + core::fmt::Debug, +{ + None +} + serde_with_tag! { /// An arbitrary piece of data attached to a transaction. A /// transaction can have multiple Memo objects as an array diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 41508f7c..9fff8a72 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -8,7 +8,10 @@ pub use self::time_conversion::*; pub use self::xrpl_conversion::*; use crate::constants::*; +use alloc::string::String; +use alloc::string::ToString; use alloc::vec::Vec; +use rand::Rng; use regex::Regex; /// Determine if the address string is a hex address. @@ -65,6 +68,12 @@ pub fn is_iso_hex(value: &str) -> bool { regex.is_match(value) } +/// Generate a random id. +pub fn get_random_id(rng: &mut T) -> String { + let id: u32 = rng.gen(); + id.to_string() +} + /// Converter to byte array with endianness. pub trait ToBytes { /// Return the byte array of self. diff --git a/tests/common/constants.rs b/tests/common/constants.rs index fb33485a..732059d2 100644 --- a/tests/common/constants.rs +++ b/tests/common/constants.rs @@ -1,2 +1,6 @@ -pub const ECHO_WS_SERVER: &'static str = "ws://ws.vi-server.org/mirror/"; -pub const ECHO_WSS_SERVER: &'static str = "wss://ws.vi-server.org/mirror/"; +pub const ECHO_WS_SERVER: &'static str = "ws://ws.vi-server.org/mirror"; +pub const ECHO_WSS_SERVER: &'static str = "wss://ws.vi-server.org/mirror"; + +// pub const XRPL_TEST_NET: &'static str = "ws://s2.livenet.ripple.com/"; +pub const XRPL_WSS_TEST_NET: &'static str = "wss://testnet.xrpl-labs.com/"; +pub const XRPL_WS_TEST_NET: &'static str = "wss://s.altnet.rippletest.net:51233/"; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 1c406bcd..4c451809 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,54 +1,64 @@ -pub mod codec; - -use anyhow::anyhow; -use anyhow::Result; +mod constants; -#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] -use tokio::net::TcpStream; -#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] -use tokio_util::codec::Framed; #[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] -use xrpl::asynch::clients::AsyncWebsocketClient; -use xrpl::asynch::clients::WebsocketOpen; -#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] -use xrpl::asynch::clients::{AsyncWebsocketClient, EmbeddedWebsocketOptions}; +mod tungstenite_clients { + use super::constants::*; + use anyhow::anyhow; + use anyhow::Result; + use xrpl::asynch::clients::AsyncWebsocketClient; + use xrpl::asynch::clients::{SingleExecutorMutex, WebsocketOpen}; -mod constants; -pub use constants::*; + pub async fn connect_to_wss_tungstinite_test_net( + ) -> Result> { + match XRPL_WSS_TEST_NET.parse() { + Ok(url) => match AsyncWebsocketClient::open(url).await { + Ok(websocket) => { + // assert!(websocket.is_open()); + Ok(websocket) + } + Err(err) => Err(anyhow!("Error connecting to websocket: {:?}", err)), + }, + Err(err) => Err(anyhow!("Error parsing url: {:?}", err)), + } + } +} -#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] -pub async fn connect_to_wss_tungstinite_echo() -> Result> { - match ECHO_WSS_SERVER.parse() { - Ok(url) => match AsyncWebsocketClient::open(url).await { +#[cfg(all(feature = "embedded-ws", feature = "std", not(feature = "tungstenite")))] +mod embedded_ws_clients { + use super::constants::*; + use anyhow::anyhow; + use anyhow::Result; + use std::io; + use tokio::net::TcpStream; + use tokio_util::codec::Framed; + use xrpl::asynch::clients::codec::Codec; + + pub async fn connect_to_ws_embedded_websocket_tokio_echo( + stream: Framed, + ) -> Result< + AsyncWebsocketClient< + 4096, + Framed, + Vec, + io::Error, + rand_core::OsRng, + SingleExecutorMutex, + WebsocketOpen, + >, + > { + let rng = rand_core::OsRng; + let url = ECHO_WS_SERVER.parse().unwrap(); + match AsyncWebsocketClient::open(rng, stream, url).await { Ok(websocket) => { - assert!(websocket.is_open()); + // assert!(websocket.is_open()); Ok(websocket) } Err(err) => Err(anyhow!("Error connecting to websocket: {:?}", err)), - }, - Err(err) => Err(anyhow!("Error parsing url: {:?}", err)), - } -} - -#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] -pub async fn connect_to_ws_embedded_websocket_tokio_echo( - stream: &mut Framed, - buffer: &mut [u8], -) -> Result> { - let rng = rand::thread_rng(); - let websocket_options = EmbeddedWebsocketOptions { - path: "/mirror", - host: "ws.vi-server.org", - origin: "http://ws.vi-server.org:80", - sub_protocols: None, - additional_headers: None, - }; - - match AsyncWebsocketClient::open(stream, buffer, rng, &websocket_options).await { - Ok(websocket) => { - assert!(websocket.is_open()); - Ok(websocket) } - Err(err) => Err(anyhow!("Error connecting to websocket: {:?}", err)), } } + +#[cfg(all(feature = "embedded-ws", feature = "std", not(feature = "tungstenite")))] +pub use embedded_ws_clients::*; +#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] +pub use tungstenite_clients::*; diff --git a/tests/integration/clients/mod.rs b/tests/integration/clients/mod.rs index 9a76a87d..d66e9d9d 100644 --- a/tests/integration/clients/mod.rs +++ b/tests/integration/clients/mod.rs @@ -1,100 +1,75 @@ -use anyhow::anyhow; use anyhow::Result; #[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] -pub async fn test_websocket_tungstenite_echo() -> Result<()> { - use super::common::connect_to_wss_tungstinite_echo; - use futures_util::{SinkExt, TryStreamExt}; - use xrpl::asynch::clients::TungsteniteMessage; - use xrpl::models::requests::AccountInfo; +pub async fn test_websocket_tungstenite_test_net() -> Result<()> { + use crate::common::connect_to_wss_tungstinite_test_net; + use xrpl::{ + asynch::clients::XRPLWebsocketIO, models::requests::Fee, models::results::FeeResult, + }; - let mut websocket = connect_to_wss_tungstinite_echo().await?; - let account_info = AccountInfo::new( - None, - "rJumr5e1HwiuV543H7bqixhtFreChWTaHH".into(), - None, - None, - None, - None, - None, - ); + let mut websocket = connect_to_wss_tungstinite_test_net().await?; + let fee = Fee::new(None); - websocket.send(&account_info).await?; - while let Ok(Some(TungsteniteMessage::Text(response))) = websocket.try_next().await { - let account_info_echo = serde_json::from_str::(response.as_str()); - match account_info_echo { - Ok(account_info_echo) => { - assert_eq!(account_info, account_info_echo); - return Ok(()); - } - Err(err) => { - return Err(anyhow!("Error parsing response: {:?}", err)); - } - }; - } + websocket.xrpl_send(fee).await.unwrap(); + let message = websocket + .xrpl_receive::, Fee<'_>>() + .await + .unwrap(); + assert!(message.unwrap().result.is_some()); + Ok(()) +} +#[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] +pub async fn test_websocket_tungstenite_request() -> Result<()> { + use crate::common::connect_to_wss_tungstinite_test_net; + use xrpl::{asynch::clients::AsyncClient, models::requests::Fee, models::results::FeeResult}; + + let websocket = connect_to_wss_tungstinite_test_net().await?; + let fee = Fee::new(None); + + let message = websocket.request::, _>(fee).await.unwrap(); + assert!(message.result.is_some()); Ok(()) } -#[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] +#[cfg(all(feature = "embedded-ws", feature = "std", not(feature = "tungstenite")))] pub async fn test_embedded_websocket_echo() -> Result<()> { - use super::common::{codec::Codec, connect_to_ws_embedded_websocket_tokio_echo}; + use crate::common::connect_to_ws_embedded_websocket_tokio_echo; use tokio_util::codec::Framed; - use xrpl::asynch::clients::EmbeddedWebsocketReadMessageType; - use xrpl::models::requests::AccountInfo; + use xrpl::asynch::clients::codec::Codec; + use xrpl::asynch::clients::XRPLWebsocketIO; + use xrpl::models::requests::Fee; + use xrpl::models::results::FeeResult; let tcp_stream = tokio::net::TcpStream::connect("ws.vi-server.org:80") .await - .map_err(|_| anyhow!("Error connecting to websocket"))?; - let mut framed = Framed::new(tcp_stream, Codec::new()); - let mut buffer = [0u8; 4096]; - let mut websocket = - connect_to_ws_embedded_websocket_tokio_echo(&mut framed, &mut buffer).await?; - let account_info = AccountInfo::new( - None, - "rJumr5e1HwiuV543H7bqixhtFreChWTaHH".into(), - None, - None, - None, - None, - None, - ); - websocket - .send(&mut framed, &mut buffer, false, &account_info) - .await?; + .unwrap(); + let framed = Framed::new(tcp_stream, Codec); + let mut websocket = connect_to_ws_embedded_websocket_tokio_echo(framed).await?; + let fee = Fee::new(None); + websocket.xrpl_send(fee).await?; + let _ = websocket + .xrpl_receive::, Fee<'_>>() + .await + .unwrap(); + Ok(()) +} + +#[cfg(all(feature = "embedded-ws", feature = "std", not(feature = "tungstenite")))] +pub async fn test_embedded_websocket_request() -> Result<()> { + use crate::common::connect_to_ws_embedded_websocket_tokio_echo; + use tokio_util::codec::Framed; + use xrpl::asynch::clients::codec::Codec; + use xrpl::asynch::clients::AsyncClient; + use xrpl::models::requests::Fee; + use xrpl::models::results::FeeResult; - let mut ping_counter = 0; - loop { - match websocket.try_next(&mut framed, &mut buffer).await? { - Some(message) => match message { - EmbeddedWebsocketReadMessageType::Ping(_) => { - ping_counter += 1; - if ping_counter > 1 { - return Err(anyhow!("Expected only one ping")); - } - } - EmbeddedWebsocketReadMessageType::Text(text) => { - match serde_json::from_str::(text) { - Ok(account_info_echo) => { - assert_eq!(account_info, account_info_echo); - return Ok(()); - } - Err(err) => { - return Err(anyhow!("Error parsing response: {:?}", err)); - } - } - } - EmbeddedWebsocketReadMessageType::Binary(_) => { - panic!("Expected text message found binary") - } - EmbeddedWebsocketReadMessageType::Pong(_) => { - panic!("Expected text message found pong") - } - EmbeddedWebsocketReadMessageType::Close(_) => { - panic!("Expected text message found close") - } - }, - None => return Err(anyhow!("No message received")), - } - } + let tcp_stream = tokio::net::TcpStream::connect("ws.vi-server.org:80") + .await + .unwrap(); + let framed = Framed::new(tcp_stream, Codec); + let websocket = connect_to_ws_embedded_websocket_tokio_echo(framed).await?; + let fee = Fee::new(None); + let _res = websocket.request::(fee).await?; + Ok(()) } diff --git a/tests/integration/mod.rs b/tests/integration/mod.rs index eb5264ce..705f46db 100644 --- a/tests/integration/mod.rs +++ b/tests/integration/mod.rs @@ -1,3 +1 @@ -use super::common; - pub mod clients; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 179d20bf..73d75eeb 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -5,12 +5,24 @@ mod integration; use anyhow::Result; +#[cfg(any(feature = "tungstenite", all(feature = "embedded-ws", feature = "std")))] #[tokio::test] async fn test_asynch_clients() -> Result<()> { #[cfg(all(feature = "tungstenite", not(feature = "embedded-ws")))] - return integration::clients::test_websocket_tungstenite_echo().await; - #[cfg(all(feature = "embedded-ws", not(feature = "tungstenite")))] + return integration::clients::test_websocket_tungstenite_test_net().await; + #[cfg(all(feature = "embedded-ws", feature = "std", not(feature = "tungstenite")))] return integration::clients::test_embedded_websocket_echo().await; - #[cfg(all(feature = "tungstenite", feature = "embedded-ws"))] + #[allow(unreachable_code)] + Ok(()) +} + +#[cfg(any(feature = "tungstenite", feature = "embedded-ws", feature = "std"))] +#[tokio::test] +async fn test_asynch_clients_request() -> Result<()> { + #[cfg(all(feature = "tungstenite", feature = "std", not(feature = "embedded-ws")))] + return integration::clients::test_websocket_tungstenite_request().await; + #[cfg(all(feature = "embedded-ws", feature = "std", not(feature = "tungstenite")))] + return integration::clients::test_embedded_websocket_request().await; + #[allow(unreachable_code)] Ok(()) }