Skip to content

Commit

Permalink
deadline adoption for multi-fragment message (#1510)
Browse files Browse the repository at this point in the history
* deadline adoption for multi-fragment message
* optimizations for batching deadline
* enable Drop tests
  • Loading branch information
yellowhatter authored Oct 10, 2024
1 parent bdb01fb commit 7811f8e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 19 deletions.
92 changes: 79 additions & 13 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{
ops::Add,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc, Mutex, MutexGuard,
Expand Down Expand Up @@ -127,6 +128,73 @@ impl StageInMutex {
}
}

enum DeadlineSetting {
Immediate,
Infinite,
Finite(Instant),
}

struct LazyDeadline {
deadline: Option<DeadlineSetting>,
wait_time: Option<Duration>,
}

impl LazyDeadline {
fn new(wait_time: Option<Duration>) -> Self {
Self {
deadline: None,
wait_time,
}
}

fn advance(&mut self) {
let wait_time = self.wait_time;
match &mut self.deadline() {
DeadlineSetting::Immediate => {}
DeadlineSetting::Infinite => {}
DeadlineSetting::Finite(instant) => {
*instant = instant.add(unsafe { wait_time.unwrap_unchecked() });
}
}
}

#[inline]
fn deadline(&mut self) -> &mut DeadlineSetting {
self.deadline.get_or_insert_with(|| match self.wait_time {
Some(wait_time) => match wait_time.is_zero() {
true => DeadlineSetting::Immediate,
false => DeadlineSetting::Finite(Instant::now().add(wait_time)),
},
None => DeadlineSetting::Infinite,
})
}
}

struct Deadline {
lazy_deadline: LazyDeadline,
}

impl Deadline {
fn new(wait_time: Option<Duration>) -> Self {
Self {
lazy_deadline: LazyDeadline::new(wait_time),
}
}

#[inline]
fn wait(&mut self, s_ref: &StageInRefill) -> bool {
match self.lazy_deadline.deadline() {
DeadlineSetting::Immediate => false,
DeadlineSetting::Infinite => s_ref.wait(),
DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant),
}
}

fn on_next_fragment(&mut self) {
self.lazy_deadline.advance();
}
}

// This is the initial stage of the pipeline where messages are serliazed on
struct StageIn {
s_ref: StageInRefill,
Expand All @@ -141,13 +209,13 @@ impl StageIn {
&mut self,
msg: &mut NetworkMessage,
priority: Priority,
deadline: Option<Option<Instant>>,
deadline: &mut Deadline,
) -> bool {
// Lock the current serialization batch.
let mut c_guard = self.mutex.current();

macro_rules! zgetbatch_rets {
($fragment:expr, $restore_sn:expr) => {
($restore_sn:expr) => {
loop {
match c_guard.take() {
Some(batch) => break batch,
Expand All @@ -163,11 +231,7 @@ impl StageIn {
None => {
drop(c_guard);
// Wait for an available batch until deadline
if !match deadline {
None => false,
Some(None) => self.s_ref.wait(),
Some(Some(deadline)) => self.s_ref.wait_deadline(deadline),
} {
if !deadline.wait(&self.s_ref) {
// Still no available batch.
// Restore the sequence number and drop the message
$restore_sn;
Expand Down Expand Up @@ -198,7 +262,7 @@ impl StageIn {
}

// Get the current serialization batch.
let mut batch = zgetbatch_rets!(false, {});
let mut batch = zgetbatch_rets!({});
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
Ok(_) => zretok!(batch, msg),
Expand Down Expand Up @@ -228,7 +292,7 @@ impl StageIn {
if !batch.is_empty() {
// Move out existing batch
self.s_out.move_batch(batch);
batch = zgetbatch_rets!(false, tch.sn.set(sn).unwrap());
batch = zgetbatch_rets!(tch.sn.set(sn).unwrap());
}

// Attempt a second serialization on fully empty batch
Expand Down Expand Up @@ -258,8 +322,7 @@ impl StageIn {
let mut reader = self.fragbuf.reader();
while reader.can_read() {
// Get the current serialization batch
// Treat all messages as non-droppable once we start fragmenting
batch = zgetbatch_rets!(true, tch.sn.set(sn).unwrap());
batch = zgetbatch_rets!(tch.sn.set(sn).unwrap());

// Serialize the message fragment
match batch.encode((&mut reader, &mut fragment)) {
Expand All @@ -281,6 +344,9 @@ impl StageIn {
break;
}
}

// adopt deadline for the next fragment
deadline.on_next_fragment();
}

// Clean the fragbuf
Expand Down Expand Up @@ -628,10 +694,10 @@ impl TransmissionPipelineProducer {
} else {
self.wait_before_close
};
let deadline = (!wait_time.is_zero()).then_some(Instant::now().checked_add(wait_time));
let mut deadline = Deadline::new(Some(wait_time));
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority, deadline)
queue.push_network_message(&mut msg, priority, &mut deadline)
}

#[inline]
Expand Down
10 changes: 4 additions & 6 deletions io/zenoh-transport/tests/unicast_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,10 @@ async fn test_transport(
"Sending {} messages... {:?} {}",
MSG_COUNT, channel, msg_size
);
// TODO: workaround for https://github.com/eclipse-zenoh/zenoh/issues/1494
//let cctrl = match channel.reliability {
// Reliability::Reliable => CongestionControl::Block,
// Reliability::BestEffort => CongestionControl::Drop,
//};
let cctrl = CongestionControl::Block;
let cctrl = match channel.reliability {
Reliability::Reliable => CongestionControl::Block,
Reliability::BestEffort => CongestionControl::Drop,
};

// Create the message to send
let message: NetworkMessage = Push {
Expand Down

0 comments on commit 7811f8e

Please sign in to comment.