From 307a4205d47bfd13c6bd3ca7e8015b1848904ee9 Mon Sep 17 00:00:00 2001 From: Junyeong Jeong Date: Sat, 27 Mar 2021 14:33:49 +0900 Subject: [PATCH] Upgrade version of Tokio to ^1.0.1 --- cargo-bpf/Cargo.toml | 2 +- cargo-bpf/src/load.rs | 9 +++-- examples/example-userspace/Cargo.toml | 2 +- .../example-userspace/examples/biolatpcts.rs | 6 ++-- examples/example-userspace/examples/echo.rs | 8 ++--- .../examples/mallocstacks.rs | 8 +++-- .../examples/tcp-lifetime.rs | 2 +- .../example-userspace/examples/vfsreadlat.rs | 14 +++++--- redbpf-macros/Cargo.toml | 2 +- redbpf-probes/Cargo.toml | 2 +- redbpf-tools/Cargo.toml | 2 +- redbpf-tools/src/bin/redbpf-iotop.rs | 13 ++++--- redbpf-tools/src/bin/redbpf-tcp-knock.rs | 9 +++-- redbpf/Cargo.toml | 2 +- redbpf/src/load/map_io.rs | 36 ++++++++++++------- 15 files changed, 72 insertions(+), 45 deletions(-) diff --git a/cargo-bpf/Cargo.toml b/cargo-bpf/Cargo.toml index d69ca8f3..01d588c5 100644 --- a/cargo-bpf/Cargo.toml +++ b/cargo-bpf/Cargo.toml @@ -24,7 +24,7 @@ toml_edit = { version = "0.2", optional = true } bpf-sys = { version = "^1.3.0", path = "../bpf-sys", optional = true } redbpf = { version = "^1.3.0", path = "../redbpf", default-features = false, optional = true } futures = { version = "0.3", optional = true } -tokio = { version = "^0.2.4", features = ["rt-core", "io-driver", "macros", "signal"], optional = true } +tokio = { version = "^1.0.1", features = ["rt", "macros", "signal"], optional = true } hexdump = { version = "0.1", optional = true } libc = {version = "0.2.66", optional = true} llvm-sys = { version = "110", optional = true} diff --git a/cargo-bpf/src/load.rs b/cargo-bpf/src/load.rs index 2e19b0d9..215b8323 100644 --- a/cargo-bpf/src/load.rs +++ b/cargo-bpf/src/load.rs @@ -12,7 +12,7 @@ use hexdump::hexdump; use redbpf::xdp; use redbpf::{load::Loader, Program::*}; use std::path::PathBuf; -use tokio::runtime::Runtime; +use tokio::runtime; use tokio::signal; pub fn load( @@ -21,8 +21,11 @@ pub fn load( uprobe_path: Option<&str>, pid: Option, ) -> Result<(), CommandError> { - let mut runtime = Runtime::new().unwrap(); - runtime.block_on(async { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { // Load all the programs and maps included in the program let mut loader = Loader::load_file(&program).expect("error loading file"); diff --git a/examples/example-userspace/Cargo.toml b/examples/example-userspace/Cargo.toml index 9b688b77..a2e11278 100644 --- a/examples/example-userspace/Cargo.toml +++ b/examples/example-userspace/Cargo.toml @@ -10,6 +10,6 @@ cargo-bpf = { version = "", path = "../../cargo-bpf", default-features = false, [dependencies] probes = { path = "../example-probes", package = "example-probes" } libc = "0.2" -tokio = { version = "^0.2.4", features = ["signal", "time", "io-util", "tcp", "rt-util", "sync"] } +tokio = { version = "^1.0.1", features = ["rt", "signal", "time", "io-util", "net", "sync"] } redbpf = { version = "", path = "../../redbpf", features = ["load"] } futures = "0.3" diff --git a/examples/example-userspace/examples/biolatpcts.rs b/examples/example-userspace/examples/biolatpcts.rs index 1d653ca8..1e898994 100644 --- a/examples/example-userspace/examples/biolatpcts.rs +++ b/examples/example-userspace/examples/biolatpcts.rs @@ -4,7 +4,7 @@ // find another half, a BPF program, at example-probes/biolatpcts/main.rs use std::cmp; use std::time::Duration; -use tokio::time::delay_for; +use tokio::time::sleep; use redbpf::load::Loader; use redbpf::PerCpuArray; @@ -64,7 +64,7 @@ fn calc_lat_pct( return pcts; } -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() -> ! { if unsafe { libc::getuid() != 0 } { eprintln!("You must be root to use eBPF!"); @@ -102,7 +102,7 @@ async fn main() -> ! { let mut lat_10us = [0; 100]; loop { - delay_for(Duration::from_secs(3)).await; + sleep(Duration::from_secs(3)).await; let mut lat_total = 0; diff --git a/examples/example-userspace/examples/echo.rs b/examples/example-userspace/examples/echo.rs index 049b5a85..166ce6a7 100644 --- a/examples/example-userspace/examples/echo.rs +++ b/examples/example-userspace/examples/echo.rs @@ -19,7 +19,7 @@ enum Command { Delete { key: IdxMapKey }, } -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() { if unsafe { libc::getuid() != 0 } { eprintln!("You must be root to use eBPF!"); @@ -33,10 +33,10 @@ async fn main() { .parse() .expect("invalid port number"); - let (mut tx, mut rx) = mpsc::channel(128); + let (tx, mut rx) = mpsc::channel(128); let local = task::LocalSet::new(); local.spawn_local(async move { - let mut listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))) + let listener = TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))) .await .unwrap(); loop { @@ -51,7 +51,7 @@ async fn main() { port: u32::to_be(client_addr.port().into()), }; tx.send(Command::Set { fd, key }).await.unwrap(); - let mut tx = tx.clone(); + let tx = tx.clone(); task::spawn_local(async move { let mut buf = [0; 0]; // Even though it awaits for something to read, it only diff --git a/examples/example-userspace/examples/mallocstacks.rs b/examples/example-userspace/examples/mallocstacks.rs index 3665a8d4..2efe8a1b 100644 --- a/examples/example-userspace/examples/mallocstacks.rs +++ b/examples/example-userspace/examples/mallocstacks.rs @@ -7,7 +7,7 @@ use std::process; use std::ptr; use std::sync::{Arc, Mutex}; use tokio; -use tokio::runtime::Runtime; +use tokio::runtime; use tokio::signal; use redbpf::load::{Loaded, Loader}; @@ -76,7 +76,11 @@ fn main() { }); let acc: Acc = Arc::new(Mutex::new(HashMap::new())); - let _ = Runtime::new().unwrap().block_on(async { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let _ = rt.block_on(async { let mut loaded = Loader::load(probe_code()).expect("error loading BPF program"); for prb in loaded.uprobes_mut() { diff --git a/examples/example-userspace/examples/tcp-lifetime.rs b/examples/example-userspace/examples/tcp-lifetime.rs index 4ca11595..09ad4c11 100644 --- a/examples/example-userspace/examples/tcp-lifetime.rs +++ b/examples/example-userspace/examples/tcp-lifetime.rs @@ -23,7 +23,7 @@ use redbpf::HashMap; use probes::tcp_lifetime::{SocketAddr, TCPLifetime}; -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() { if unsafe { libc::getuid() != 0 } { eprintln!("You must be root to use eBPF!"); diff --git a/examples/example-userspace/examples/vfsreadlat.rs b/examples/example-userspace/examples/vfsreadlat.rs index a4f7c8c9..369981b9 100644 --- a/examples/example-userspace/examples/vfsreadlat.rs +++ b/examples/example-userspace/examples/vfsreadlat.rs @@ -8,9 +8,9 @@ use std::ptr; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio; -use tokio::runtime::Runtime; +use tokio::runtime; use tokio::signal; -use tokio::time::delay_for; +use tokio::time::sleep; const UNDER_ONE: &str = "~ 0"; const ONE_TO_TEN: &str = "1 ~ 10"; @@ -31,7 +31,7 @@ fn start_reporter(counts: Counts) { println!("{:>8} ms\t{}", range, cnt); *cnt = 0; } - delay_for(Duration::from_secs(1)).await + sleep(Duration::from_secs(1)).await } }); } @@ -44,7 +44,7 @@ fn start_perf_event_handler(mut loaded: Loaded, counts: Counts) { match name.as_str() { "pid" => { let vev = unsafe { ptr::read(event.as_ptr() as *const VFSEvent) }; - let latency = vev.latency / 1000_0000; + let latency = vev.latency / 1_000_000; let range = if latency < 1 { UNDER_ONE } else if 1 <= latency && latency < 10 { @@ -80,7 +80,11 @@ fn main() { .cloned() .collect(), )); - let _ = Runtime::new().unwrap().block_on(async { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let _ = rt.block_on(async { let mut loaded = Loader::load(probe_code()).expect("error loading BPF program"); for kp in loaded.kprobes_mut() { kp.attach_kprobe(&kp.name(), 0) diff --git a/redbpf-macros/Cargo.toml b/redbpf-macros/Cargo.toml index 104019db..ac7ab556 100644 --- a/redbpf-macros/Cargo.toml +++ b/redbpf-macros/Cargo.toml @@ -24,4 +24,4 @@ rustc_version = "0.3.0" # redbpf-probes is needed by doctests redbpf-probes = { version = "^1.3.0", path = "../redbpf-probes" } # memoffset is needed by doctests -memoffset = "0.5" +memoffset = "0.6" diff --git a/redbpf-probes/Cargo.toml b/redbpf-probes/Cargo.toml index 68ff817a..bb9f98ea 100644 --- a/redbpf-probes/Cargo.toml +++ b/redbpf-probes/Cargo.toml @@ -21,7 +21,7 @@ quote = "1.0" glob = "0.3.0" [dev-dependencies] -memoffset = "0.5" +memoffset = "0.6" [features] default = [] diff --git a/redbpf-tools/Cargo.toml b/redbpf-tools/Cargo.toml index d1fd8b31..58e50c63 100644 --- a/redbpf-tools/Cargo.toml +++ b/redbpf-tools/Cargo.toml @@ -10,7 +10,7 @@ cargo-bpf = { version = "^1.3.0", path = "../cargo-bpf", default-features = fals [dependencies] probes = { path = "./probes" } redbpf = { version = "^1.3.0", path = "../redbpf", features = ["load"] } -tokio = { version = "^0.2.4", features = ["rt-core", "io-driver", "macros", "signal", "time"] } +tokio = { version = "^1.0.1", features = ["rt", "macros", "signal", "time"] } futures = "0.3" getopts = "0.2" libc = "0.2" diff --git a/redbpf-tools/src/bin/redbpf-iotop.rs b/redbpf-tools/src/bin/redbpf-iotop.rs index b9ee2d0f..52072770 100644 --- a/redbpf-tools/src/bin/redbpf-iotop.rs +++ b/redbpf-tools/src/bin/redbpf-iotop.rs @@ -13,9 +13,9 @@ use std::os::raw::c_char; use std::process; use std::time::Duration; use tokio; -use tokio::runtime::Runtime; +use tokio::runtime; use tokio::signal; -use tokio::time::delay_for; +use tokio::time::sleep; use probes::iotop::{Counter, CounterKey}; @@ -25,8 +25,11 @@ fn main() { process::exit(-1); } - let mut runtime = Runtime::new().unwrap(); - let _ = runtime.block_on(async { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let _ = rt.block_on(async { // load the BPF programs and maps let mut loader = Loader::load(probe_code()).expect("error loading probe"); @@ -43,7 +46,7 @@ fn main() { let disks = parse_diskstats().unwrap(); loop { - delay_for(Duration::from_millis(1000)).await; + sleep(Duration::from_millis(1000)).await; println!( "{:6} {:16} {:1} {:3} {:3} {:8} {:>5} {:>7} {:>6}", diff --git a/redbpf-tools/src/bin/redbpf-tcp-knock.rs b/redbpf-tools/src/bin/redbpf-tcp-knock.rs index bd1b5c82..c5a92739 100644 --- a/redbpf-tools/src/bin/redbpf-tcp-knock.rs +++ b/redbpf-tools/src/bin/redbpf-tcp-knock.rs @@ -12,7 +12,7 @@ use std::net::Ipv4Addr; use std::process; use std::ptr; use tokio; -use tokio::runtime::Runtime; +use tokio::runtime; use tokio::signal; use probes::knock::{Connection, KnockAttempt, PortSequence, MAX_SEQ_LEN}; @@ -28,8 +28,11 @@ fn main() { None => process::exit(1), }; - let mut runtime = Runtime::new().unwrap(); - let _ = runtime.block_on(async { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let _ = rt.block_on(async { let mut loader = Loader::load(probe_code()).expect("error loading probe"); // attach the xdp program diff --git a/redbpf/Cargo.toml b/redbpf/Cargo.toml index dfdcafaa..d1635e39 100644 --- a/redbpf/Cargo.toml +++ b/redbpf/Cargo.toml @@ -30,7 +30,7 @@ ring = { version = "0.16", optional = true } futures = { version = "0.3", optional = true } mio = { version = "0.6", optional = true } -tokio = { version = "^0.2.4", features = ["rt-core", "io-driver", "macros", "signal"], optional = true } +tokio = { version = "^1.0.1", features = ["rt", "macros", "signal", "net"], optional = true } [features] default = [] diff --git a/redbpf/src/load/map_io.rs b/redbpf/src/load/map_io.rs index 52857d96..e6cf0a7b 100644 --- a/redbpf/src/load/map_io.rs +++ b/redbpf/src/load/map_io.rs @@ -13,12 +13,18 @@ use std::os::unix::io::RawFd; use std::pin::Pin; use std::slice; use std::task::{Context, Poll}; -use tokio::io::PollEvented; +use tokio::io::unix::AsyncFd; +use tokio::io::Interest; use crate::{Event, PerfMap}; +// TODO Remove MapIo and upgrade mio. +// It is pub-visibility so removing this is semver breaking change. Since mio +// v0.7, `Evented` is not provided, new version of mio can not be used now. +#[deprecated] pub struct MapIo(RawFd); +#[allow(deprecated)] impl Evented for MapIo { fn register( &self, @@ -46,18 +52,19 @@ impl Evented for MapIo { } pub struct PerfMessageStream { - poll: PollEvented, + poll: AsyncFd, map: PerfMap, name: String, } impl PerfMessageStream { pub fn new(name: String, map: PerfMap) -> Self { - let io = MapIo(map.fd); - let poll = PollEvented::new(io).unwrap(); + let poll = AsyncFd::with_interest(map.fd, Interest::READABLE).unwrap(); PerfMessageStream { poll, map, name } } + // Note that all messages should be consumed. Because ready flag is + // cleared, the remaining messages will not be read soon. fn read_messages(&mut self) -> Vec> { let mut ret = Vec::new(); while let Some(ev) = self.map.read() { @@ -82,15 +89,18 @@ impl PerfMessageStream { impl Stream for PerfMessageStream { type Item = Vec>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let ready = Ready::readable(); - if let Poll::Pending = self.poll.poll_read_ready(cx, ready) { - return Poll::Pending; - } - - let messages = self.read_messages(); - self.poll.clear_read_ready(cx, ready).unwrap(); - Poll::Ready(Some(messages)) + match self.poll.poll_read_ready(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => { + // it should never happen + eprintln!("PerfMessageStream error: {:?}", e); + return Poll::Ready(None); + } + Poll::Ready(Ok(mut rg)) => rg.clear_ready(), + }; + // Must read all messages because AsyncFdReadyGuard::clear_ready is + // already called. + Some(self.read_messages()).into() } }