diff --git a/remoc/src/rch/base/sender.rs b/remoc/src/rch/base/sender.rs index 32846ab..e2b4b7f 100644 --- a/remoc/src/rch/base/sender.rs +++ b/remoc/src/rch/base/sender.rs @@ -60,6 +60,11 @@ impl SendErrorKind { pub fn is_final(&self) -> bool { matches!(self, Self::Send(_)) } + + /// Returns whether the error is caused by the item to be sent. + pub fn is_item_specific(&self) -> bool { + matches!(self, Self::Serialize(_) | Self::MaxItemSizeExceeded) + } } impl SendError { @@ -82,6 +87,11 @@ impl SendError { self.kind.is_final() } + /// Returns whether the error is caused by the item to be sent. + pub fn is_item_specific(&self) -> bool { + self.kind.is_item_specific() + } + /// Returns the error without the contained item. pub fn without_item(self) -> SendError<()> { SendError { kind: self.kind, item: () } diff --git a/remoc/src/rch/watch/mod.rs b/remoc/src/rch/watch/mod.rs index ae4b59a..e248936 100644 --- a/remoc/src/rch/watch/mod.rs +++ b/remoc/src/rch/watch/mod.rs @@ -137,7 +137,10 @@ async fn send_impl( Ok(()) => { let value = rx.borrow_and_update().clone(); if let Err(err) = remote_tx.send(value).await { - let _ = remote_send_err_tx.try_send(RemoteSendError::Send(err.kind)); + let _ = remote_send_err_tx.try_send(RemoteSendError::Send(err.kind.clone())); + if err.is_item_specific() { + break + } } } Err(_) => break, diff --git a/remoc/tests/rch/watch.rs b/remoc/tests/rch/watch.rs index 3c7e645..2449eb7 100644 --- a/remoc/tests/rch/watch.rs +++ b/remoc/tests/rch/watch.rs @@ -1,9 +1,10 @@ use futures::StreamExt; +use remoc::rch::base::SendErrorKind; use std::time::Duration; use tokio::time::sleep; use crate::{droppable_loop_channel, loop_channel}; -use remoc::rch::watch::{self, ReceiverStream}; +use remoc::rch::watch::{self, ChangedError, ReceiverStream, SendError}; #[tokio::test] async fn simple() { @@ -164,3 +165,68 @@ async fn conn_failure() { Err(err) => panic!("wrong error after close: {err}"), } } + +#[tokio::test] +async fn max_item_size_exceeded() { + crate::init(); + let ((mut a_tx, _), (_, mut b_rx)) = loop_channel::>>().await; + + println!("Sending remote mpsc channel receiver"); + let (mut tx, rx) = watch::channel(Vec::new()); + a_tx.send(rx).await.unwrap(); + println!("Receiving remote mpsc channel receiver"); + let mut rx = b_rx.recv().await.unwrap().unwrap(); + + assert_eq!(tx.max_item_size(), rx.max_item_size()); + let max_item_size = tx.max_item_size(); + println!("Maximum send and recv item size is {max_item_size}"); + + { + let value = rx.borrow().unwrap(); + println!("Initial value: {value:?}"); + } + + let recv_task = tokio::spawn(async move { + loop { + let res = rx.changed().await; + println!("RX changed result: {res:?}"); + if res.is_err() { + break res; + } + + let value = rx.borrow_and_update().unwrap().clone(); + println!("Received value change: {} elements", value.len()); + } + }); + + // Happy case: sent data size is under limit. + // JSON encoding will result in much larger transfer size. + let elems = max_item_size / 10; + println!("Sending {elems} elements"); + let value = vec![100; elems]; + tx.send(value.clone()).unwrap(); + assert_eq!(*tx.borrow(), value); + + sleep(Duration::from_millis(100)).await; + + // Failure case: sent data size exceeds limits. + let elems = max_item_size * 10; + println!("Sending {elems} elements"); + let value = vec![100; elems]; + tx.send(value.clone()).unwrap(); + assert_eq!(*tx.borrow(), value); + + println!("Waiting for receive task"); + assert!(matches!(recv_task.await.unwrap(), Err(ChangedError::Closed))); + + // Send one more element to obtain error. + println!("Sending one more element to obtain error"); + let res = tx.send(vec![1; 1]); + println!("Result: {res:?}"); + assert!(matches!(res, Err(SendError::RemoteSend(SendErrorKind::MaxItemSizeExceeded)))); + + // Test error clearing. + assert!(matches!(tx.error(), Some(SendError::RemoteSend(SendErrorKind::MaxItemSizeExceeded)))); + tx.clear_error(); + assert!(matches!(tx.error(), None)); +}