Skip to content

Commit

Permalink
Refine the TODO comments
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Mar 4, 2024
1 parent 290547f commit 39befc3
Show file tree
Hide file tree
Showing 13 changed files with 26 additions and 39 deletions.
1 change: 0 additions & 1 deletion commons/zenoh-sync/src/object_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ impl<T> From<T> for RecyclingObject<T> {
}
}

// TODO: Check this necessity
impl<T> Drop for RecyclingObject<T> {
fn drop(&mut self) {
if let Some(pool) = self.pool.upgrade() {
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-link-commons/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ListenerUnicastIP {
}

pub struct ListenersUnicastIP {
// TODO: should we change this to AsyncRwLock?
// TODO(yuyuan): should we change this to AsyncRwLock?
listeners: Arc<RwLock<HashMap<SocketAddr, ListenerUnicastIP>>>,
pub token: CancellationToken,
}
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,9 @@ impl UnicastPipeListener {

let token = CancellationToken::new();
let c_token = token.clone();

// WARN: The spawn_blocking is mandatory verified by the ping/pong test
// create listening task
// TODO: Check the necessity of this spawn_blocking
tokio::task::spawn_blocking(move || {
ZRuntime::Acceptor.block_on(async move {
loop {
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ impl TransportManager {
lsu
}

// TODO: Can we make this async as above?
// TODO(yuyuan): Can we make this async as above?
pub fn get_locators(&self) -> Vec<Locator> {
let mut lsu = zenoh_runtime::ZRuntime::TX.block_in_place(self.get_locators_unicast());
let mut lsm = zenoh_runtime::ZRuntime::TX.block_in_place(self.get_locators_multicast());
Expand Down
3 changes: 1 addition & 2 deletions io/zenoh-transport/src/multicast/establishment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ pub(crate) async fn open_link(
w_guard.insert(locator.clone(), ti.clone());
drop(w_guard);

// TODO: resolve the structure entanglement below
// TODO(yuyuan): resolve the structure entanglement below
// Notify the transport event handler
let transport: TransportMulticast = (&ti).into();

// TODO: also check the dyn trait implementation in callback
let callback = match manager.config.handler.new_multicast(transport.clone()) {
Ok(c) => c,
Err(e) => {
Expand Down
5 changes: 2 additions & 3 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ pub(super) struct TransportLinkMulticastConfigUniversal {
pub(super) batch_size: BatchSize,
}

// TODO: Introduce TaskTracker and retire handle_tx, handle_rx, and signal_rx.
// TODO(yuyuan): Introduce TaskTracker or JoinSet and retire handle_tx, handle_rx, and signal_rx.
#[derive(Clone)]
pub(super) struct TransportLinkMulticastUniversal {
// The underlying link
Expand Down Expand Up @@ -329,6 +329,7 @@ impl TransportLinkMulticastUniversal {
// Spawn the TX task
let c_link = self.link.clone();
let c_transport = self.transport.clone();

let handle = zenoh_runtime::ZRuntime::TX.spawn(async move {
let res = tx_task(
consumer,
Expand All @@ -343,7 +344,6 @@ impl TransportLinkMulticastUniversal {
log::debug!("{}", e);
// Spawn a task to avoid a deadlock waiting for this same task
// to finish in the close() joining its handle
// TODO: check which ZRuntime should be used
zenoh_runtime::ZRuntime::Net.spawn(async move { c_transport.delete().await });
}
});
Expand Down Expand Up @@ -380,7 +380,6 @@ impl TransportLinkMulticastUniversal {
log::debug!("{}", e);
// Spawn a task to avoid a deadlock waiting for this same task
// to finish in the close() joining its handle
// TODO: check which ZRuntime should be used
zenoh_runtime::ZRuntime::Net.spawn(async move { c_transport.delete().await });
}
});
Expand Down
10 changes: 5 additions & 5 deletions io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl TransportMulticastInner {
cb.closed();
}

// TODO: unify the termination with the above
// TODO(yuyuan): use CancellationToken to unify the termination with the above
self.token.cancel();

Ok(())
Expand Down Expand Up @@ -366,7 +366,7 @@ impl TransportMulticastInner {
);

// Create lease event
// TODO: refine the clone behaviors
// TODO(yuyuan): refine the clone behaviors
let is_active = Arc::new(AtomicBool::new(false));
let c_is_active = is_active.clone();
let token = self.token.child_token();
Expand All @@ -389,10 +389,10 @@ impl TransportMulticastInner {
let _ = c_self.del_peer(&c_locator, close::reason::EXPIRED);
};

// TODO: Put it into TaskTracker properly
// TODO(yuyuan): Put it into TaskTracker or store as JoinHandle
zenoh_runtime::ZRuntime::Acceptor.spawn(task);

// TODO: Integrate the above async task into TransportMulticastPeer
// TODO(yuyuan): Integrate the above async task into TransportMulticastPeer
// Store the new peer
let peer = TransportMulticastPeer {
version: join.version,
Expand Down Expand Up @@ -423,7 +423,7 @@ impl TransportMulticastInner {
reason
);

// TODO: Unify the termination
// TODO(yuyuan): Unify the termination
peer.token.cancel();
peer.handler.closing();
drop(guard);
Expand Down
17 changes: 7 additions & 10 deletions io/zenoh-transport/src/unicast/lowlatency/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ use tokio_util::sync::CancellationToken;
use zenoh_buffers::{writer::HasWriter, ZSlice};
use zenoh_codec::*;
use zenoh_core::{zasyncread, zasyncwrite};
use zenoh_link::LinkUnicast;
use zenoh_protocol::transport::TransportMessageLowLatency;
use zenoh_protocol::transport::{KeepAlive, TransportBodyLowLatency};
use zenoh_result::{zerror, ZResult};
use zenoh_runtime::ZRuntime;

pub(crate) async fn send_with_link(
link: &TransportLinkUnicast,
link: &LinkUnicast,
msg: TransportMessageLowLatency,
#[cfg(feature = "stats")] stats: &Arc<TransportStats>,
) -> ZResult<()> {
let len;
let codec = Zenoh080::new();
if link.link.is_streamed() {
if link.is_streamed() {
let mut buffer = vec![0, 0, 0, 0];
let mut writer = buffer.writer();
codec
Expand All @@ -47,7 +48,7 @@ pub(crate) async fn send_with_link(

buffer[0..4].copy_from_slice(&le);

link.link.write_all(&buffer).await?;
link.write_all(&buffer).await?;
} else {
let mut buffer = vec![];
let mut writer = buffer.writer();
Expand All @@ -59,7 +60,7 @@ pub(crate) async fn send_with_link(
{
len = buffer.len() as u32;
}
link.link.write_all(&buffer).await?;
link.write_all(&buffer).await?;
}
log::trace!("Sent: {:?}", msg);

Expand Down Expand Up @@ -99,7 +100,7 @@ impl TransportUnicastLowlatency {

pub(super) async fn send_async(&self, msg: TransportMessageLowLatency) -> ZResult<()> {
let guard = zasyncwrite!(self.link);
let link = guard.as_ref().ok_or_else(|| zerror!("No link"))?;
let link = &guard.as_ref().ok_or_else(|| zerror!("No link"))?.link;
send_with_link(
link,
msg,
Expand Down Expand Up @@ -139,18 +140,15 @@ impl TransportUnicastLowlatency {
}

pub(super) fn internal_start_rx(&self, lease: Duration) {
// TODO: Tidy the complex dependencies
let rx_buffer_size = self.manager.config.link_rx_buffer_size;
let token = self.token.child_token();

// TODO: This can be improved to minimal
let c_transport = self.clone();
let task = async move {
let guard = zasyncread!(c_transport.link);
let link_rx = guard.as_ref().unwrap().rx();
drop(guard);

// TODO: link_rx.link and link.link
let is_streamed = link_rx.link.is_streamed();

// The pool of buffers
Expand Down Expand Up @@ -221,7 +219,6 @@ async fn keepalive_task(
token: CancellationToken,
#[cfg(feature = "stats")] stats: Arc<TransportStats>,
) -> ZResult<()> {
// TODO: check this necessity
let mut interval =
tokio::time::interval_at(tokio::time::Instant::now() + keep_alive, keep_alive);

Expand All @@ -233,7 +230,7 @@ async fn keepalive_task(
};

let guard = zasyncwrite!(link);
let link = guard.as_ref().ok_or_else(|| zerror!("No link"))?;
let link = &guard.as_ref().ok_or_else(|| zerror!("No link"))?.link;
let _ = send_with_link(
link,
keepailve,
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ impl TransportUnicastTrait for TransportUnicastLowlatency {
/*************************************/
/* LINK */
/*************************************/
// TODO: Check the correctness: is this called at most once?
async fn add_link(
&self,
link: LinkUnicastWithOpenAck,
Expand Down
13 changes: 5 additions & 8 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ impl TransportLinkUnicastUniversal {
(result, consumer)
}

// TODO: Not yet guaranteed is called at most once
pub(super) fn start_tx(
&mut self,
transport: TransportUnicastUniversal,
Expand All @@ -96,20 +95,19 @@ impl TransportLinkUnicastUniversal {
)
.await;

// TODO: improve this callback
if let Err(e) = res {
log::debug!("{}", e);
// Spawn a task to avoid a deadlock waiting for this same task
// to finish in the close() joining its handle
// TODO: check which ZRuntime should be used
// TODO(yuyuan): do more study to check which ZRuntime should be used or refine the
// termination
zenoh_runtime::ZRuntime::TX
.spawn(async move { transport.del_link(tx.inner.link()).await });
}
};
self.tracker.spawn_on(task, &zenoh_runtime::ZRuntime::TX);
}

// TODO: Not yet guaranteed is called at most once
pub(super) fn start_rx(&mut self, transport: TransportUnicastUniversal, lease: Duration) {
let mut rx = self.link.rx();
let token = self.token.clone();
Expand All @@ -124,21 +122,21 @@ impl TransportLinkUnicastUniversal {
)
.await;

// TODO: improve this callback
// TODO(yuyuan): improve this callback
if let Err(e) = res {
log::debug!("{}", e);

// Spawn a task to avoid a deadlock waiting for this same task
// to finish in the close() joining its handle
// TODO: check which ZRuntime should be used
// WARN: Must be spawned on RX
zenoh_runtime::ZRuntime::RX
.spawn(async move { transport.del_link((&rx.link).into()).await });

// // WARN: This ZRuntime blocks
// zenoh_runtime::ZRuntime::Net
// .spawn(async move { transport.del_link((&rx.link).into()).await });

// // WARN: Don't worry. This fix doesn't work
// // WARN: This cloud block
// transport.del_link((&rx.link).into()).await;
}
};
Expand Down Expand Up @@ -168,7 +166,6 @@ async fn tx_task(
token: CancellationToken,
#[cfg(feature = "stats")] stats: Arc<TransportStats>,
) -> ZResult<()> {
// TODO: check this necessity
let mut interval =
tokio::time::interval_at(tokio::time::Instant::now() + keep_alive, keep_alive);
loop {
Expand Down
4 changes: 1 addition & 3 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,8 @@ fn router_data(context: &AdminContext, query: Query) {
}
json
};
// TODO: Avoid this block_on
// TODO: Check this block_in_place
let transports: Vec<serde_json::Value> = zenoh_runtime::ZRuntime::Net
.block_on(transport_mgr.get_transports_unicast())
.block_in_place(transport_mgr.get_transports_unicast())
.iter()
.map(transport_to_json)
.collect();
Expand Down
1 change: 0 additions & 1 deletion zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,6 @@ impl Session {
_ => 1,
};

// TODO: check which ZRuntime should be used
zenoh_runtime::ZRuntime::Net.spawn({
let state = self.state.clone();
let zid = self.runtime.zid();
Expand Down
3 changes: 1 addition & 2 deletions zenoh/tests/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Task {
tokio::select! {
_ = token.cancelled() => break,

// TODO: this won't yield after a timeout raised from recipe
// WARN: this won't yield after a timeout since the put is a blocking call
res = tokio::time::timeout(std::time::Duration::from_secs(1), session
.put(ke, value.clone())
.congestion_control(CongestionControl::Block)
Expand Down Expand Up @@ -599,7 +599,6 @@ async fn three_node_combination() -> Result<()> {
)
.collect();

// TODO: It should be able to run concurrently
for chunks in recipe_list.chunks(4).map(|x| x.to_vec()) {
let mut join_set = tokio::task::JoinSet::new();
for (pubsub, getqueryable) in chunks {
Expand Down

0 comments on commit 39befc3

Please sign in to comment.