Skip to content

Commit

Permalink
Merge pull request #3 from Az107/feature/pool
Browse files Browse the repository at this point in the history
Feature/pool
  • Loading branch information
Az107 authored Jun 24, 2024
2 parents 2f25c97 + 6b1a67f commit 3a16269
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 43 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ authors = ["Alb Ruiz G. <me@albruiz.dev>"]
[lib]
name = "hteapot"
path = "src/hteapot.rs"

[[bin]]
name = "hteapot"


164 changes: 123 additions & 41 deletions src/hteapot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
// Also provide utilities to parse the requests and build the responses

use std::collections::HashMap;
use std::io::{Read, Write};
use std::hash::Hash;
use std::io::{self, BufRead, BufReader, BufWriter, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use std::{str, thread};
use std::sync::{Arc, Mutex, Condvar};


#[derive(Debug)]
Expand Down Expand Up @@ -148,6 +151,10 @@ pub struct HttpRequest {
pub struct Hteapot {
port: u16,
address: String,
cache: HashMap<String,String>,
pool: Arc<(Mutex<Vec<TcpStream>>, Condvar)>,
//pool2: Arc<(Mutex<Vec<TcpStream>>, Condvar)>

// this will store a map from path to their actions
// path_table: HashMap<HttpMethod, HashMap<String, HashMap<HttpMethod, fn(HttpRequest) -> String>>>,
}
Expand All @@ -159,12 +166,16 @@ impl Hteapot {
Hteapot {
port: port,
address: address.to_string(),
cache: HashMap::new(),
pool: Arc::new((Mutex::new(Vec::new()), Condvar::new())),
//pool2: Arc::new((Mutex::new(Vec::new()), Condvar::new())),

// path_table: HashMap::new(),
}
}

// Start the server
pub fn listen(&self, action: impl Fn(HttpRequest) -> String + Send + Sync + 'static ){
pub fn listen(&mut self, action: impl Fn(HttpRequest) -> String + Send + Sync + 'static ){
let addr = format!("{}:{}", self.address, self.port);
let listener = TcpListener::bind(addr);
let listener = match listener {
Expand All @@ -174,23 +185,58 @@ impl Hteapot {
return;
}
};
let action_clone = Arc::new(action);
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let action_clone = action_clone.clone();
thread::spawn(move || {
Hteapot::handle_client(stream, |req| {
action_clone(req)
});
let arc_action = Arc::new(action);
listener.set_nonblocking(false).expect("set_nonblocking call failed");
let pool_clone = self.pool.clone();
let greeter_loop = thread::spawn(move || {
for stream in listener.incoming() {

if stream.is_err() {continue;}
let stream = stream.unwrap();
let (lock, cvar) = &*pool_clone;
let mut pool = lock.lock().expect("Error locking pool");
pool.push(stream);
cvar.notify_one(); // Notify one waiting thread
}
});
let pool_clone = self.pool.clone();
//let pool2_clone = self.pool2.clone();
thread::spawn(move || {
loop {
let (lock, cvar) = &*pool_clone;
let mut pool = lock.lock().expect("Error locking pool");

while pool.is_empty() {
pool = cvar.wait(pool).expect("Error waiting on cvar");
}
pool.retain(|stream| {
let action_clone = arc_action.clone();
Hteapot::handle_client(stream, move |request| {
action_clone(request)
})
});

}
Err(e) => {
println!("Error: {}", e);
}
}
}
});

// let pool2_clone = self.pool2.clone();
// thread::spawn(move || {
// loop {
// let (lock, cvar) = &*pool2_clone;
// let mut pool = lock.lock().expect("Error locking pool");

// while pool.is_empty() {
// pool = cvar.wait(pool).expect("Error waiting on cvar");
// }
// for stream in pool.iter() {
// let action_clone = arc_action.clone();
// Hteapot::handle_client(stream, move |request| {
// action_clone(request)
// });
// }
// pool.clear();
// }
// });
greeter_loop.join();
}


Expand All @@ -212,15 +258,29 @@ impl Hteapot {
}

// Parse the request
pub fn request_parser(request: &str) -> HttpRequest {
pub fn request_parser(request: String) -> Result<HttpRequest, String> {
let mut lines = request.lines();
let first_line = lines.next().unwrap();
let first_line = lines.next();
if first_line.is_none() {
return Err("Error parsng request".to_string());
}
let first_line = first_line.unwrap();
let mut words = first_line.split_whitespace();
let method = words.next().unwrap();
let mut path = words.next().unwrap().to_string();
let method = words.next();
if method.is_none() {
return Err("Error parsng request".to_string());
}
let method = method.unwrap();
let path = words.next();
if path.is_none() {
return Err("Error parsng request".to_string());
}
let mut path = path.unwrap().to_string();
let mut headers: HashMap<String, String> = HashMap::new();
loop {
let line = lines.next().unwrap();
let line = lines.next();
if line.is_none() {break;}
let line = line.unwrap();
if line.is_empty() {
break;
}
Expand All @@ -229,11 +289,7 @@ impl Hteapot {
let value = parts.next().unwrap();
headers.insert(key, value.to_string());
}
let remaining_lines: Vec<&str> = lines.collect();
let body = remaining_lines.join("");
let body = body.trim().trim_end();
//remove all traling zero bytes
let body = body.trim_matches(char::from(0));
let body = lines.collect::<Vec<&str>>().join("").trim().trim_end_matches(char::from(0)).to_string();
let mut args: HashMap<String, String> = HashMap::new();
//remove http or https from the path
if path.starts_with("http://") {
Expand Down Expand Up @@ -265,38 +321,64 @@ impl Hteapot {
}
}

HttpRequest {
Ok(HttpRequest {
method: HttpMethod::from_str(method),
path: path.to_string(),
args: args,
headers: headers,
body: body.trim_end().to_string(),
}
})
}

// Handle the client when a request is received
fn handle_client(mut stream: TcpStream , action: impl Fn(HttpRequest) -> String ) {
let mut request_buffer: String = String::new();
fn handle_client(stream: &TcpStream , action: impl Fn(HttpRequest) -> String + Send + Sync + 'static ) -> bool{
stream.set_nodelay(true);
let mut reader = BufReader::new(stream);
let mut writer = BufWriter::new(stream);
let mut request_buffer = Vec::new();
loop {
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap_or_default();
match reader.read(&mut buffer) {
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
return true;
},
_ => {
return false;
},
}
},
Ok(m) => {
if m == 0 {
break;
}
},
};
request_buffer.append(&mut buffer.to_vec());
if buffer[0] == 0 {break};
let partial_request_buffer = String::from_utf8_lossy(&buffer).to_string();
request_buffer.push_str(&partial_request_buffer);
if *buffer.last().unwrap() == 0 {break;}
}

let request = Self::request_parser(&request_buffer);
//let response = Self::response_maker(HttpStatus::IAmATeapot, "Hello, World!");

let request_string = String::from_utf8(request_buffer).unwrap();
let request = Self::request_parser(request_string);
if request.is_err() {
eprintln!("{}", request.err().unwrap());
return false;
}
let request = request.unwrap();

let response = action(request);
let r = stream.write(response.as_bytes());
let r = writer.write_all(response.as_bytes());
if r.is_err() {
eprintln!("Error: {}", r.err().unwrap());
}
let r = stream.flush();
let r = writer.flush();
if r.is_err() {
eprintln!("Error: {}", r.err().unwrap());
}
true
}
}

Expand All @@ -306,7 +388,7 @@ impl Hteapot {
#[test]
fn test_http_parser() {
let request = "GET / HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.68.0\r\nAccept: */*\r\n\r\n";
let parsed_request = Hteapot::request_parser(request);
let parsed_request = Hteapot::request_parser(request.to_string()).unwrap();
assert_eq!(parsed_request.method, HttpMethod::GET);
assert_eq!(parsed_request.path, "/");
assert_eq!(parsed_request.args.len(), 0);
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ fn main() {
} else {
config::Config::new_default()
};
let server = Hteapot::new(config.host.as_str(), config.port);
let mut server = Hteapot::new(config.host.as_str(), config.port);
println!("Server started at http://{}:{}", config.host, config.port);
server.listen(move |req| {
server.listen( move |req| {
println!("Request: {:?}", req.path);
let path = if req.path.ends_with("/") {
let mut path = req.path.clone();
Expand Down

0 comments on commit 3a16269

Please sign in to comment.