From 6b1a67f403ff15283c888bf5b844310497176839 Mon Sep 17 00:00:00 2001 From: Alberto Ruiz <17555470+Az107@users.noreply.github.com> Date: Mon, 24 Jun 2024 12:27:59 +0200 Subject: [PATCH] CPU usage optimization --- src/hteapot.rs | 75 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/src/hteapot.rs b/src/hteapot.rs index 3580cc1..1def0c7 100644 --- a/src/hteapot.rs +++ b/src/hteapot.rs @@ -9,7 +9,7 @@ use std::net::{TcpListener, TcpStream}; use std::thread::sleep; use std::time::Duration; use std::{str, thread}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Condvar}; #[derive(Debug)] @@ -152,7 +152,9 @@ pub struct Hteapot { port: u16, address: String, cache: HashMap, - pool: Arc>> + pool: Arc<(Mutex>, Condvar)>, + //pool2: Arc<(Mutex>, Condvar)> + // this will store a map from path to their actions // path_table: HashMap String>>>, } @@ -165,7 +167,9 @@ impl Hteapot { port: port, address: address.to_string(), cache: HashMap::new(), - pool: Arc::new(Mutex::new(Vec::new())), + pool: Arc::new((Mutex::new(Vec::new()), Condvar::new())), + //pool2: Arc::new((Mutex::new(Vec::new()), Condvar::new())), + // path_table: HashMap::new(), } } @@ -182,33 +186,56 @@ impl Hteapot { } }; let arc_action = Arc::new(action); - listener.set_nonblocking(true).expect("set_nonblocking call failed"); + listener.set_nonblocking(false).expect("set_nonblocking call failed"); let pool_clone = self.pool.clone(); let greeter_loop = thread::spawn(move || { - loop { - for stream in listener.incoming() { - if stream.is_err() {break;} - let stream = stream.unwrap(); - stream.set_nonblocking(true).expect("set_nonblocking call failed"); - stream.set_nodelay(true).expect("set_nodelay call failed"); - let mut pool = pool_clone.lock().expect("Error locking pool"); - pool.push(stream); - } - sleep(Duration::from_millis(10)); + for stream in listener.incoming() { + + if stream.is_err() {continue;} + let stream = stream.unwrap(); + let (lock, cvar) = &*pool_clone; + let mut pool = lock.lock().expect("Error locking pool"); + pool.push(stream); + cvar.notify_one(); // Notify one waiting thread } }); let pool_clone = self.pool.clone(); + //let pool2_clone = self.pool2.clone(); thread::spawn(move || { loop { - let mut pool = pool_clone.lock().expect("Error locking pool"); - pool.retain(|stream| { - let action_clone = arc_action.clone(); - Hteapot::handle_client(stream, move |request| { - action_clone(request) - }) - }); + let (lock, cvar) = &*pool_clone; + let mut pool = lock.lock().expect("Error locking pool"); + + while pool.is_empty() { + pool = cvar.wait(pool).expect("Error waiting on cvar"); + } + pool.retain(|stream| { + let action_clone = arc_action.clone(); + Hteapot::handle_client(stream, move |request| { + action_clone(request) + }) + }); } }); + + // let pool2_clone = self.pool2.clone(); + // thread::spawn(move || { + // loop { + // let (lock, cvar) = &*pool2_clone; + // let mut pool = lock.lock().expect("Error locking pool"); + + // while pool.is_empty() { + // pool = cvar.wait(pool).expect("Error waiting on cvar"); + // } + // for stream in pool.iter() { + // let action_clone = arc_action.clone(); + // Hteapot::handle_client(stream, move |request| { + // action_clone(request) + // }); + // } + // pool.clear(); + // } + // }); greeter_loop.join(); } @@ -305,6 +332,7 @@ impl Hteapot { // Handle the client when a request is received fn handle_client(stream: &TcpStream , action: impl Fn(HttpRequest) -> String + Send + Sync + 'static ) -> bool{ + stream.set_nodelay(true); let mut reader = BufReader::new(stream); let mut writer = BufWriter::new(stream); let mut request_buffer = Vec::new(); @@ -314,10 +342,11 @@ impl Hteapot { Err(e) => { match e.kind() { io::ErrorKind::WouldBlock => { - println!("would have blocked"); return true; }, - _ => return false, + _ => { + return false; + }, } }, Ok(m) => {