Skip to content

Commit

Permalink
Fix/7 (#11)
Browse files Browse the repository at this point in the history
* fix large file sending

Return socket context when woudl block error raises

* improvements

* minor fixes

* minor changes

* improve test
  • Loading branch information
Az107 authored Aug 5, 2024
1 parent d4c3b22 commit 3e733a1
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 40 deletions.
File renamed without changes.
128 changes: 89 additions & 39 deletions src/hteapot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ use std::collections::{HashMap, VecDeque};
use std::hash::Hash;
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::net::{Shutdown, TcpListener, TcpStream};
use std::{str, thread};
use std::{str, thread, vec};
use std::sync::{Arc, Mutex, Condvar};

const VERSION: &str = env!("CARGO_PKG_VERSION");


#[derive(Debug)]
#[derive(PartialEq)]
Expand Down Expand Up @@ -155,6 +157,14 @@ pub struct Hteapot {

}

#[derive(Clone,Debug)]
struct SocketStatus {
reading: bool,
data_readed: Vec<u8>,
data_write: Vec<u8>,
index_writed: usize
}

impl Hteapot {

// Constructor
Expand Down Expand Up @@ -191,6 +201,7 @@ impl Hteapot {
}
};
let pool: Arc<(Mutex<VecDeque<TcpStream>>, Condvar)> = Arc::new((Mutex::new(VecDeque::new()), Condvar::new()));
//let statusPool = Arc::new(Mutex::new(HashMap::<String, socketStatus>::new()));
let arc_action = Arc::new(action);


Expand All @@ -199,6 +210,7 @@ impl Hteapot {
let action_clone = arc_action.clone();
thread::spawn(move || {
let mut streams_to_handle = Vec::new();
let mut streams_data: HashMap<String, SocketStatus> = HashMap::new();
loop {
{
let (lock, cvar) = &*pool_clone;
Expand All @@ -216,10 +228,29 @@ impl Hteapot {

streams_to_handle.retain(|stream| {
//println!("Handling request by {}", tn);
let local_addr = stream.local_addr().unwrap().to_string();
let action_clone = action_clone.clone();
Hteapot::handle_client(stream, move |request| {
let status = match streams_data.get(&local_addr) {
Some(d) => d.clone(),
None => SocketStatus {
reading: true,
data_readed: vec![],
data_write: vec![],
index_writed: 0
}
};

let r = Hteapot::handle_client(stream,status, move |request| {
action_clone(request)
})
});
if r.is_some() {
streams_data.insert(local_addr, r.unwrap());
return true;
} else {
streams_data.remove(&local_addr);
return false;
}

});
}
});
Expand All @@ -233,7 +264,7 @@ impl Hteapot {
}
let (stream, _) = stream.unwrap();
stream.set_nonblocking(true).expect("Error seting non blocking");
stream.set_nodelay(true).expect("Error seting no delay");
//stream.set_nodelay(true).expect("Error seting no delay");
{
let (lock, cvar) = &*pool_clone;
let mut pool = lock.lock().expect("Error locking pool");
Expand All @@ -257,6 +288,7 @@ impl Hteapot {
HashMap::new()
};
headers.insert("Content-Length".to_string(), content.len().to_string());
headers.insert("Server".to_string(), format!("HTeaPot/{}",VERSION).to_string());
for (key, value) in headers.iter() {
headers_text.push_str(&format!("{}: {}\r\n", key, value));
}
Expand Down Expand Up @@ -343,54 +375,70 @@ impl Hteapot {
}

// Handle the client when a request is received
fn handle_client(stream: &TcpStream , action: impl Fn(HttpRequest) -> Vec<u8> + Send + Sync + 'static ) -> bool{
fn handle_client(stream: &TcpStream, socket_status: SocketStatus , action: impl Fn(HttpRequest) -> Vec<u8> + Send + Sync + 'static ) -> Option<SocketStatus>{
let mut reader = BufReader::new(stream);
let mut writer = BufWriter::new(stream);
let mut request_buffer = Vec::new();
loop {
let mut buffer = [0; 1024];
match reader.read(&mut buffer) {
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
return true;
},
_ => {
println!("{:?}",e);
return false;
},
}
},
Ok(m) => {
if m == 0 {
break;
}
},
};
request_buffer.append(&mut buffer.to_vec());
if buffer[0] == 0 {break};
if *buffer.last().unwrap() == 0 {break;}
let mut socket_status = socket_status.clone();
if socket_status.reading {
loop {
let mut buffer = [0; 1024];
match reader.read(&mut buffer) {
Err(e) => {
match e.kind() {
io::ErrorKind::WouldBlock => {
return Some(socket_status);
},
_ => {
println!("R Error{:?}",e);
return None;
},
}
},
Ok(m) => {
if m == 0 {break;}
},
};
socket_status.data_readed.append(&mut buffer.to_vec());
//socket_status
if buffer[0] == 0 {break};
if *buffer.last().unwrap() == 0 {break;}
}
socket_status.reading = false;
}

let request_string = String::from_utf8(request_buffer).unwrap();
let request_string = String::from_utf8(socket_status.data_readed.clone()).unwrap();
// let request_string = "GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n".to_string();
let request = Self::request_parser(request_string);
if request.is_err() {
eprintln!("{}", request.err().unwrap());
return false;
eprintln!("Request parse error {:?}", request.err().unwrap());
return None;
}
let request = request.unwrap();
let response = action(request);
let r = writer.write_all(&response);
if r.is_err() {
eprintln!("Error1: {}", r.err().unwrap());
if socket_status.data_write.len() == 0 {
socket_status.data_write = action(request);
}
for n in socket_status.index_writed..socket_status.data_write.len() {
let r = writer.write(&[socket_status.data_write[n]]);
if r.is_err() {
let error = r.err().unwrap();
if error.kind() == io::ErrorKind::WouldBlock {
return Some(socket_status);
} else {
eprintln!("W error: {:?}",error);
return None;
}
}
socket_status.index_writed+=r.unwrap();
}


let r = writer.flush();
if r.is_err() {
eprintln!("Error2: {}", r.err().unwrap());
return Some(socket_status);
}
let _ = stream.shutdown(Shutdown::Both);
false
None
}
}

Expand All @@ -412,7 +460,9 @@ fn test_http_parser() {
fn test_http_response_maker() {
let response = Hteapot::response_maker(HttpStatus::IAmATeapot, "Hello, World!", None);
let response = String::from_utf8(response).unwrap();
let expected_response = "HTTP/1.1 418 I'm a teapot\r\nContent-Length: 13\r\n\r\nHello, World!\r\n";
assert_eq!(response, expected_response);
let expected_response = "HTTP/1.1 418 I'm a teapot\r\nContent-Length: 13\r\nServer: HTeaPot/0.2.5\r\n\r\nHello, World!\r\n".split("\r\n");
for item in expected_response.into_iter() {
assert!(response.contains(item));
}
}

13 changes: 12 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,21 @@ use hteapot::HttpStatus;
use brew::fetch;
use logger::Logger;

const VERSION: &str = env!("CARGO_PKG_VERSION");


fn main() {
let args = std::env::args().collect::<Vec<String>>();
if args[1] == "--version" ||args[1] == "-v" {
println!("Hteapot {}",VERSION);
return;
}
if args[1] == "--help" ||args[1] == "-h" {
println!("Hteapot {}",VERSION);
println!("usage: {} <config file>",args[0]);
return;
}

let config = if args.len() > 1 {
config::Config::load_config(&args[1])
} else {
Expand Down Expand Up @@ -115,7 +127,6 @@ fn main() {
};
match content {
Ok(content) => {

if config.cache {
let cache = cache.lock();
if cache.is_ok() && is_cache {
Expand Down

0 comments on commit 3e733a1

Please sign in to comment.