diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 8dfa92b34..a89b89bd9 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use std::{ + ops::Add, sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, Mutex, MutexGuard, @@ -127,6 +128,73 @@ impl StageInMutex { } } +enum DeadlineSetting { + Immediate, + Infinite, + Finite(Instant), +} + +struct LazyDeadline { + deadline: Option, + wait_time: Option, +} + +impl LazyDeadline { + fn new(wait_time: Option) -> Self { + Self { + deadline: None, + wait_time, + } + } + + fn advance(&mut self) { + let wait_time = self.wait_time; + match &mut self.deadline() { + DeadlineSetting::Immediate => {} + DeadlineSetting::Infinite => {} + DeadlineSetting::Finite(instant) => { + *instant = instant.add(unsafe { wait_time.unwrap_unchecked() }); + } + } + } + + #[inline] + fn deadline(&mut self) -> &mut DeadlineSetting { + self.deadline.get_or_insert_with(|| match self.wait_time { + Some(wait_time) => match wait_time.is_zero() { + true => DeadlineSetting::Immediate, + false => DeadlineSetting::Finite(Instant::now().add(wait_time)), + }, + None => DeadlineSetting::Infinite, + }) + } +} + +struct Deadline { + lazy_deadline: LazyDeadline, +} + +impl Deadline { + fn new(wait_time: Option) -> Self { + Self { + lazy_deadline: LazyDeadline::new(wait_time), + } + } + + #[inline] + fn wait(&mut self, s_ref: &StageInRefill) -> bool { + match self.lazy_deadline.deadline() { + DeadlineSetting::Immediate => false, + DeadlineSetting::Infinite => s_ref.wait(), + DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant), + } + } + + fn on_next_fragment(&mut self) { + self.lazy_deadline.advance(); + } +} + // This is the initial stage of the pipeline where messages are serliazed on struct StageIn { s_ref: StageInRefill, @@ -141,13 +209,13 @@ impl StageIn { &mut self, msg: &mut NetworkMessage, priority: Priority, - deadline: Option>, + deadline: &mut Deadline, ) -> bool { // Lock the current serialization batch. let mut c_guard = self.mutex.current(); macro_rules! zgetbatch_rets { - ($fragment:expr, $restore_sn:expr) => { + ($restore_sn:expr) => { loop { match c_guard.take() { Some(batch) => break batch, @@ -163,11 +231,7 @@ impl StageIn { None => { drop(c_guard); // Wait for an available batch until deadline - if !match deadline { - None => false, - Some(None) => self.s_ref.wait(), - Some(Some(deadline)) => self.s_ref.wait_deadline(deadline), - } { + if !deadline.wait(&self.s_ref) { // Still no available batch. // Restore the sequence number and drop the message $restore_sn; @@ -198,7 +262,7 @@ impl StageIn { } // Get the current serialization batch. - let mut batch = zgetbatch_rets!(false, {}); + let mut batch = zgetbatch_rets!({}); // Attempt the serialization on the current batch let e = match batch.encode(&*msg) { Ok(_) => zretok!(batch, msg), @@ -228,7 +292,7 @@ impl StageIn { if !batch.is_empty() { // Move out existing batch self.s_out.move_batch(batch); - batch = zgetbatch_rets!(false, tch.sn.set(sn).unwrap()); + batch = zgetbatch_rets!(tch.sn.set(sn).unwrap()); } // Attempt a second serialization on fully empty batch @@ -258,8 +322,7 @@ impl StageIn { let mut reader = self.fragbuf.reader(); while reader.can_read() { // Get the current serialization batch - // Treat all messages as non-droppable once we start fragmenting - batch = zgetbatch_rets!(true, tch.sn.set(sn).unwrap()); + batch = zgetbatch_rets!(tch.sn.set(sn).unwrap()); // Serialize the message fragment match batch.encode((&mut reader, &mut fragment)) { @@ -281,6 +344,9 @@ impl StageIn { break; } } + + // adopt deadline for the next fragment + deadline.on_next_fragment(); } // Clean the fragbuf @@ -628,10 +694,10 @@ impl TransmissionPipelineProducer { } else { self.wait_before_close }; - let deadline = (!wait_time.is_zero()).then_some(Instant::now().checked_add(wait_time)); + let mut deadline = Deadline::new(Some(wait_time)); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - queue.push_network_message(&mut msg, priority, deadline) + queue.push_network_message(&mut msg, priority, &mut deadline) } #[inline] diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 3a39a9059..3ccd8dd8c 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -463,12 +463,10 @@ async fn test_transport( "Sending {} messages... {:?} {}", MSG_COUNT, channel, msg_size ); - // TODO: workaround for https://github.com/eclipse-zenoh/zenoh/issues/1494 - //let cctrl = match channel.reliability { - // Reliability::Reliable => CongestionControl::Block, - // Reliability::BestEffort => CongestionControl::Drop, - //}; - let cctrl = CongestionControl::Block; + let cctrl = match channel.reliability { + Reliability::Reliable => CongestionControl::Block, + Reliability::BestEffort => CongestionControl::Drop, + }; // Create the message to send let message: NetworkMessage = Push {