Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/pool #3

Merged
merged 6 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ authors = ["Alb Ruiz G. <me@albruiz.dev>"]
[lib]
name = "hteapot"
path = "src/hteapot.rs"

[[bin]]
name = "hteapot"


164 changes: 123 additions & 41 deletions src/hteapot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
// Also provide utilities to parse the requests and build the responses

use std::collections::HashMap;
use std::io::{Read, Write};
use std::hash::Hash;
use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use std::{str, thread};
use std::sync::{Arc, Mutex, Condvar};


#[derive(Debug)]
Expand Down Expand Up @@ -148,6 +151,10 @@ pub struct HttpRequest {
pub struct Hteapot {
port: u16,
address: String,
cache: HashMap<String,String>,
pool: Arc<(Mutex<Vec<TcpStream>>, Condvar)>,
//pool2: Arc<(Mutex<Vec<TcpStream>>, Condvar)>

// this will store a map from path to their actions
// path_table: HashMap<HttpMethod, HashMap<String, HashMap<HttpMethod, fn(HttpRequest) -> String>>>,
}
Expand All @@ -159,12 +166,16 @@ impl Hteapot {
Hteapot {
port: port,
address: address.to_string(),
cache: HashMap::new(),
pool: Arc::new((Mutex::new(Vec::new()), Condvar::new())),
//pool2: Arc::new((Mutex::new(Vec::new()), Condvar::new())),

// path_table: HashMap::new(),
}
}

// Start the server
pub fn listen(&self, action: impl Fn(HttpRequest) -> String + Send + Sync + 'static ){
pub fn listen(&mut self, action: impl Fn(HttpRequest) -> String + Send + Sync + 'static ){
let addr = format!("{}:{}", self.address, self.port);
let listener = TcpListener::bind(addr);
let listener = match listener {
Expand All @@ -174,23 +185,58 @@ impl Hteapot {
return;
}
};
let action_clone = Arc::new(action);
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let action_clone = action_clone.clone();
thread::spawn(move || {
Hteapot::handle_client(stream, |req| {
action_clone(req)
});
let arc_action = Arc::new(action);
listener.set_nonblocking(false).expect("set_nonblocking call failed");
let pool_clone = self.pool.clone();
let greeter_loop = thread::spawn(move || {
for stream in listener.incoming() {

if stream.is_err() {continue;}
let stream = stream.unwrap();
let (lock, cvar) = &*pool_clone;
let mut pool = lock.lock().expect("Error locking pool");
pool.push(stream);
cvar.notify_one(); // Notify one waiting thread
}
});
let pool_clone = self.pool.clone();
//let pool2_clone = self.pool2.clone();
thread::spawn(move || {
loop {
let (lock, cvar) = &*pool_clone;
let mut pool = lock.lock().expect("Error locking pool");

while pool.is_empty() {
pool = cvar.wait(pool).expect("Error waiting on cvar");
}
pool.retain(|stream| {
let action_clone = arc_action.clone();
Hteapot::handle_client(stream, move |request| {
action_clone(request)
})
});

}
Err(e) => {
println!("Error: {}", e);
}
}
}
});

// let pool2_clone = self.pool2.clone();
// thread::spawn(move || {
// loop {
// let (lock, cvar) = &*pool2_clone;
// let mut pool = lock.lock().expect("Error locking pool");

// while pool.is_empty() {
// pool = cvar.wait(pool).expect("Error waiting on cvar");
// }
// for stream in pool.iter() {
// let action_clone = arc_action.clone();
// Hteapot::handle_client(stream, move |request| {
// action_clone(request)
// });
// }
// pool.clear();
// }
// });
greeter_loop.join();
}


Expand All @@ -212,15 +258,29 @@ impl Hteapot {
}

// Parse the request
pub fn request_parser(request: &str) -> HttpRequest {
pub fn request_parser(request: String) -> Result<HttpRequest, String> {
let mut lines = request.lines();
let first_line = lines.next().unwrap();
let first_line = lines.next();
if first_line.is_none() {
return Err("Error parsng request".to_string());
}
let first_line = first_line.unwrap();
let mut words = first_line.split_whitespace();
let method = words.next().unwrap();
let mut path = words.next().unwrap().to_string();
let method = words.next();
if method.is_none() {
return Err("Error parsng request".to_string());
}
let method = method.unwrap();
let path = words.next();
if path.is_none() {
return Err("Error parsng request".to_string());
}
let mut path = path.unwrap().to_string();
let mut headers: HashMap<String, String> = HashMap::new();
loop {
let line = lines.next().unwrap();
let line = lines.next();
if line.is_none() {break;}
let line = line.unwrap();
if line.is_empty() {
break;
}
Expand All @@ -229,11 +289,7 @@ impl Hteapot {
let value = parts.next().unwrap();
headers.insert(key, value.to_string());
}
let remaining_lines: Vec<&str> = lines.collect();
let body = remaining_lines.join("");
let body = body.trim().trim_end();
//remove all traling zero bytes
let body = body.trim_matches(char::from(0));
let body = lines.collect::<Vec<&str>>().join("").trim().trim_end_matches(char::from(0)).to_string();
let mut args: HashMap<String, String> = HashMap::new();
//remove http or https from the path
if path.starts_with("http://") {
Expand Down Expand Up @@ -265,38 +321,64 @@ impl Hteapot {
}
}

HttpRequest {
Ok(HttpRequest {
method: HttpMethod::from_str(method),
path: path.to_string(),
args: args,
headers: headers,
body: body.trim_end().to_string(),
}
})
}

// Handle the client when a request is received
fn handle_client(mut stream: TcpStream , action: impl Fn(HttpRequest) -> String ) {
let mut request_buffer: String = String::new();
fn handle_client(stream: &TcpStream , action: impl Fn(HttpRequest) -> String + Send + Sync + 'static ) -> bool{
stream.set_nodelay(true);
let mut reader = BufReader::new(stream);
let mut writer = BufWriter::new(stream);
let mut request_buffer = Vec::new();
loop {
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap_or_default();
match reader.read(&mut buffer) {
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
return true;
},
_ => {
return false;
},
}
},
Ok(m) => {
if m == 0 {
break;
}
},
};
request_buffer.append(&mut buffer.to_vec());
if buffer[0] == 0 {break};
let partial_request_buffer = String::from_utf8_lossy(&buffer).to_string();
request_buffer.push_str(&partial_request_buffer);
if *buffer.last().unwrap() == 0 {break;}
}

let request = Self::request_parser(&request_buffer);
//let response = Self::response_maker(HttpStatus::IAmATeapot, "Hello, World!");

let request_string = String::from_utf8(request_buffer).unwrap();
let request = Self::request_parser(request_string);
if request.is_err() {
eprintln!("{}", request.err().unwrap());
return false;
}
let request = request.unwrap();

let response = action(request);
let r = stream.write(response.as_bytes());
let r = writer.write_all(response.as_bytes());
if r.is_err() {
eprintln!("Error: {}", r.err().unwrap());
}
let r = stream.flush();
let r = writer.flush();
if r.is_err() {
eprintln!("Error: {}", r.err().unwrap());
}
true
}
}

Expand All @@ -306,7 +388,7 @@ impl Hteapot {
#[test]
fn test_http_parser() {
let request = "GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.68.0\r\nAccept: */*\r\n\r\n";
let parsed_request = Hteapot::request_parser(request);
let parsed_request = Hteapot::request_parser(request.to_string()).unwrap();
assert_eq!(parsed_request.method, HttpMethod::GET);
assert_eq!(parsed_request.path, "/");
assert_eq!(parsed_request.args.len(), 0);
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ fn main() {
} else {
config::Config::new_default()
};
let server = Hteapot::new(config.host.as_str(), config.port);
let mut server = Hteapot::new(config.host.as_str(), config.port);
println!("Server started at http://{}:{}", config.host, config.port);
server.listen(move |req| {
server.listen( move |req| {
println!("Request: {:?}", req.path);
let path = if req.path.ends_with("/") {
let mut path = req.path.clone();
Expand Down
Loading