Skip to content

Commit

Permalink
watch: fail sender if send error is item-specific
Browse files Browse the repository at this point in the history
Previously all send errors in a watch channel were silently ignored
as long as a receiver was connected. This also included errors
caused by the item being sent, such as serialization or excessive
size errors. This resulted in such errors to go mostly undetected,
although they were in the sole responsibility of the sender.

Now the watch sender fails when encountering an error caused
by the item being sent. The next send attempt will thus return
an error.
  • Loading branch information
surban committed Mar 13, 2024
1 parent 6998602 commit a4e92c4
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 2 deletions.
10 changes: 10 additions & 0 deletions remoc/src/rch/base/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ impl SendErrorKind {
pub fn is_final(&self) -> bool {
matches!(self, Self::Send(_))
}

/// Returns whether the error is caused by the item to be sent.
pub fn is_item_specific(&self) -> bool {
matches!(self, Self::Serialize(_) | Self::MaxItemSizeExceeded)
}
}

impl<T> SendError<T> {
Expand All @@ -82,6 +87,11 @@ impl<T> SendError<T> {
self.kind.is_final()
}

/// Returns whether the error is caused by the item to be sent.
pub fn is_item_specific(&self) -> bool {
self.kind.is_item_specific()
}

/// Returns the error without the contained item.
pub fn without_item(self) -> SendError<()> {
SendError { kind: self.kind, item: () }
Expand Down
5 changes: 4 additions & 1 deletion remoc/src/rch/watch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ async fn send_impl<T, Codec>(
Ok(()) => {
let value = rx.borrow_and_update().clone();
if let Err(err) = remote_tx.send(value).await {
let _ = remote_send_err_tx.try_send(RemoteSendError::Send(err.kind));
let _ = remote_send_err_tx.try_send(RemoteSendError::Send(err.kind.clone()));
if err.is_item_specific() {
break
}
}
}
Err(_) => break,
Expand Down
68 changes: 67 additions & 1 deletion remoc/tests/rch/watch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use futures::StreamExt;
use remoc::rch::base::SendErrorKind;
use std::time::Duration;
use tokio::time::sleep;

use crate::{droppable_loop_channel, loop_channel};
use remoc::rch::watch::{self, ReceiverStream};
use remoc::rch::watch::{self, ChangedError, ReceiverStream, SendError};

#[tokio::test]
async fn simple() {
Expand Down Expand Up @@ -164,3 +165,68 @@ async fn conn_failure() {
Err(err) => panic!("wrong error after close: {err}"),
}
}

#[tokio::test]
async fn max_item_size_exceeded() {
crate::init();
let ((mut a_tx, _), (_, mut b_rx)) = loop_channel::<watch::Receiver<Vec<u8>>>().await;

println!("Sending remote mpsc channel receiver");
let (mut tx, rx) = watch::channel(Vec::new());
a_tx.send(rx).await.unwrap();
println!("Receiving remote mpsc channel receiver");
let mut rx = b_rx.recv().await.unwrap().unwrap();

assert_eq!(tx.max_item_size(), rx.max_item_size());
let max_item_size = tx.max_item_size();
println!("Maximum send and recv item size is {max_item_size}");

{
let value = rx.borrow().unwrap();
println!("Initial value: {value:?}");
}

let recv_task = tokio::spawn(async move {
loop {
let res = rx.changed().await;
println!("RX changed result: {res:?}");
if res.is_err() {
break res;
}

let value = rx.borrow_and_update().unwrap().clone();
println!("Received value change: {} elements", value.len());
}
});

// Happy case: sent data size is under limit.
// JSON encoding will result in much larger transfer size.
let elems = max_item_size / 10;
println!("Sending {elems} elements");
let value = vec![100; elems];
tx.send(value.clone()).unwrap();
assert_eq!(*tx.borrow(), value);

sleep(Duration::from_millis(100)).await;

// Failure case: sent data size exceeds limits.
let elems = max_item_size * 10;
println!("Sending {elems} elements");
let value = vec![100; elems];
tx.send(value.clone()).unwrap();
assert_eq!(*tx.borrow(), value);

println!("Waiting for receive task");
assert!(matches!(recv_task.await.unwrap(), Err(ChangedError::Closed)));

// Send one more element to obtain error.
println!("Sending one more element to obtain error");
let res = tx.send(vec![1; 1]);
println!("Result: {res:?}");
assert!(matches!(res, Err(SendError::RemoteSend(SendErrorKind::MaxItemSizeExceeded))));

// Test error clearing.
assert!(matches!(tx.error(), Some(SendError::RemoteSend(SendErrorKind::MaxItemSizeExceeded))));
tx.clear_error();
assert!(matches!(tx.error(), None));
}

0 comments on commit a4e92c4

Please sign in to comment.