Skip to content

Commit

Permalink
CPU usage optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
Az107 committed Jun 24, 2024
1 parent 597ddfb commit 6b1a67f
Showing 1 changed file with 52 additions and 23 deletions.
75 changes: 52 additions & 23 deletions src/hteapot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::net::{TcpListener, TcpStream};
use std::thread::sleep;
use std::time::Duration;
use std::{str, thread};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Condvar};


#[derive(Debug)]
Expand Down Expand Up @@ -152,7 +152,9 @@ pub struct Hteapot {
port: u16,
address: String,
cache: HashMap<String,String>,
pool: Arc<Mutex<Vec<TcpStream>>>
pool: Arc<(Mutex<Vec<TcpStream>>, Condvar)>,
//pool2: Arc<(Mutex<Vec<TcpStream>>, Condvar)>

// this will store a map from path to their actions
// path_table: HashMap<HttpMethod, HashMap<String, HashMap<HttpMethod, fn(HttpRequest) -> String>>>,
}
Expand All @@ -165,7 +167,9 @@ impl Hteapot {
port: port,
address: address.to_string(),
cache: HashMap::new(),
pool: Arc::new(Mutex::new(Vec::new())),
pool: Arc::new((Mutex::new(Vec::new()), Condvar::new())),
//pool2: Arc::new((Mutex::new(Vec::new()), Condvar::new())),

// path_table: HashMap::new(),
}
}
Expand All @@ -182,33 +186,56 @@ impl Hteapot {
}
};
let arc_action = Arc::new(action);
listener.set_nonblocking(true).expect("set_nonblocking call failed");
listener.set_nonblocking(false).expect("set_nonblocking call failed");
let pool_clone = self.pool.clone();
let greeter_loop = thread::spawn(move || {
loop {
for stream in listener.incoming() {
if stream.is_err() {break;}
let stream = stream.unwrap();
stream.set_nonblocking(true).expect("set_nonblocking call failed");
stream.set_nodelay(true).expect("set_nodelay call failed");
let mut pool = pool_clone.lock().expect("Error locking pool");
pool.push(stream);
}
sleep(Duration::from_millis(10));
for stream in listener.incoming() {

if stream.is_err() {continue;}
let stream = stream.unwrap();
let (lock, cvar) = &*pool_clone;
let mut pool = lock.lock().expect("Error locking pool");
pool.push(stream);
cvar.notify_one(); // Notify one waiting thread
}
});
let pool_clone = self.pool.clone();
//let pool2_clone = self.pool2.clone();
thread::spawn(move || {
loop {
let mut pool = pool_clone.lock().expect("Error locking pool");
pool.retain(|stream| {
let action_clone = arc_action.clone();
Hteapot::handle_client(stream, move |request| {
action_clone(request)
})
});
let (lock, cvar) = &*pool_clone;
let mut pool = lock.lock().expect("Error locking pool");

while pool.is_empty() {
pool = cvar.wait(pool).expect("Error waiting on cvar");
}
pool.retain(|stream| {
let action_clone = arc_action.clone();
Hteapot::handle_client(stream, move |request| {
action_clone(request)
})
});
}
});

// let pool2_clone = self.pool2.clone();
// thread::spawn(move || {
// loop {
// let (lock, cvar) = &*pool2_clone;
// let mut pool = lock.lock().expect("Error locking pool");

// while pool.is_empty() {
// pool = cvar.wait(pool).expect("Error waiting on cvar");
// }
// for stream in pool.iter() {
// let action_clone = arc_action.clone();
// Hteapot::handle_client(stream, move |request| {
// action_clone(request)
// });
// }
// pool.clear();
// }
// });
greeter_loop.join();
}

Expand Down Expand Up @@ -305,6 +332,7 @@ impl Hteapot {

// Handle the client when a request is received
fn handle_client(stream: &TcpStream , action: impl Fn(HttpRequest) -> String + Send + Sync + 'static ) -> bool{
stream.set_nodelay(true);
let mut reader = BufReader::new(stream);
let mut writer = BufWriter::new(stream);
let mut request_buffer = Vec::new();
Expand All @@ -314,10 +342,11 @@ impl Hteapot {
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
println!("would have blocked");
return true;
},
_ => return false,
_ => {
return false;
},
}
},
Ok(m) => {
Expand Down

0 comments on commit 6b1a67f

Please sign in to comment.