Skip to content

Commit

Permalink
utilize tokio notify to wake the driver when sending outgoing data
Browse files Browse the repository at this point in the history
  • Loading branch information
ozwaldorf committed Oct 5, 2023
1 parent 66f015c commit 241ab16
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
33 changes: 18 additions & 15 deletions server/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use str0m::channel::ChannelId;
use str0m::net::{Receive, Transmit};
use str0m::{Event, IceConnectionState, Input, Output, Rtc};
use tokio::net::UdpSocket;
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Notify;

use triomphe::Arc;

Expand Down Expand Up @@ -46,7 +48,7 @@ impl WebRtcDriver {
})
}

pub async fn run(self) {
pub async fn run(self, waker: Arc<Notify>) {
// UDP datagrams should be ~2KB max
let buf = &mut [0; 2 << 10];

Expand Down Expand Up @@ -76,21 +78,22 @@ impl WebRtcDriver {
.max(Duration::from_millis(1));

let now = Instant::now();

// Read incoming data from the socket
match tokio::time::timeout(timeout, self.socket.recv_from(buf)).await {
Ok(Ok((n, source))) => {
info!("read in {:?}", now.elapsed());
if let Err(e) = self.handle_input(source, &buf[..n]) {
error!("failed to handle socket input: {e}");
select! {
// Read incoming data from the socket
res = self.socket.recv_from(buf) => match res {
Ok((n, source)) => {
info!("read in {:?}", now.elapsed());
if let Err(e) = self.handle_input(source, &buf[..n]) {
error!("failed to handle socket input: {e}");
}
}
}
Ok(Err(e)) => {
error!("Failed to read from udp socket: {e}");
}
// timeout
Err(_) => {}
};
Err(_) => {}
},
// Waker, which is called after rtc.write()
_ = waker.notified() => {},
// Read timeout
_ = tokio::time::sleep(timeout) => {}
}

// Drive time forward in all clients.
let now = Instant::now();
Expand Down
12 changes: 9 additions & 3 deletions server/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use triomphe::Arc;

use anyhow::Result;
use dashmap::DashMap;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{mpsc::{channel, Receiver, Sender}, Notify};

use crate::{
driver::{ConnectionMap, WebRtcDriver},
Expand All @@ -23,6 +23,7 @@ pub struct WebRtcTransport {
conn_rx: Receiver<(HandshakeRequestFrame, IpAddr, Receiver<RequestFrame>)>,
/// Shared map of connections
conns: ConnectionMap,
waker: Arc<Notify>
}

impl WebRtcTransport {
Expand All @@ -44,9 +45,11 @@ impl WebRtcTransport {
.expect("Failed to setup server");
});

tokio::spawn(driver.run());
let waker = Arc::new(Notify::new());

Ok(Self { conn_rx, conns })
tokio::spawn(driver.run(waker.clone()));

Ok(Self { conn_rx, conns, waker })
}

pub async fn accept(
Expand All @@ -57,6 +60,7 @@ impl WebRtcTransport {
let sender = WebRtcSender {
addr,
conns: self.conns.clone(),
waker: self.waker.clone()
};
let receiver = WebRtcReceiver(receiver);

Expand All @@ -68,6 +72,7 @@ impl WebRtcTransport {
pub struct WebRtcSender {
addr: IpAddr,
conns: ConnectionMap,
waker: Arc<Notify>
}

macro_rules! webrtc_send {
Expand All @@ -76,6 +81,7 @@ macro_rules! webrtc_send {
if let Err(e) = conn.write($payload) {
warn!("failed to write outgoing payload to rtc instance: {e}");
}
$self.waker.notify_one();
}
};
}
Expand Down

0 comments on commit 241ab16

Please sign in to comment.