Skip to content

Commit

Permalink
config: timeout code was reverted at some point - resolves #137
Browse files Browse the repository at this point in the history
  • Loading branch information
naim94a committed Aug 20, 2024
1 parent 80d7829 commit 59df2ea
Showing 1 changed file with 51 additions and 51 deletions.
102 changes: 51 additions & 51 deletions lumen/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
use std::{
borrow::Cow,
collections::HashMap,
mem::discriminant,
process::exit,
sync::Arc,
time::{Duration, Instant},
borrow::Cow, collections::HashMap, mem::discriminant, process::exit, sync::Arc, time::Instant,
};

use common::{
Expand Down Expand Up @@ -33,21 +28,22 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>(
let server_name = state.server_name.as_str();

trace!("waiting for command..");
let req = match timeout(Duration::from_secs(3600), rpc::read_packet(&mut stream)).await {
Ok(res) => match res {
Ok(v) => v,
Err(e) => return Err(e),
},
Err(_) => {
_ = RpcMessage::Fail(RpcFail {
code: 0,
message: &format!("{server_name} client idle for too long.\n"),
})
.async_write(&mut stream)
.await;
return Err(Error::Timeout);
},
};
let req =
match timeout(state.config.limits.command_timeout, rpc::read_packet(&mut stream)).await {
Ok(res) => match res {
Ok(v) => v,
Err(e) => return Err(e),
},
Err(_) => {
_ = RpcMessage::Fail(RpcFail {
code: 0,
message: &format!("{server_name} client idle for too long.\n"),
})
.async_write(&mut stream)
.await;
return Err(Error::Timeout);
},
};
trace!("got command!");
let req = match RpcMessage::deserialize(&req) {
Ok(v) => v,
Expand All @@ -67,33 +63,34 @@ async fn handle_transaction<'a, S: AsyncRead + AsyncWrite + Unpin>(
match req {
RpcMessage::PullMetadata(md) => {
let start = Instant::now();
let funcs = match timeout(Duration::from_secs(4 * 60), db.get_funcs(&md.funcs)).await {
Ok(r) => match r {
Ok(v) => v,
Err(e) => {
error!("pull failed, db: {}", e);
rpc::RpcMessage::Fail(rpc::RpcFail {
let funcs =
match timeout(state.config.limits.pull_md_timeout, db.get_funcs(&md.funcs)).await {
Ok(r) => match r {
Ok(v) => v,
Err(e) => {
error!("pull failed, db: {}", e);
rpc::RpcMessage::Fail(rpc::RpcFail {
code: 0,
message: &format!(
"{server_name}: db error; please try again later..\n"
),
})
.async_write(&mut stream)
.await?;
return Ok(());
},
},
Err(_) => {
RpcMessage::Fail(RpcFail {
code: 0,
message: &format!(
"{server_name}: db error; please try again later..\n"
),
message: &format!("{server_name}: query took too long to execute.\n"),
})
.async_write(&mut stream)
.await?;
return Ok(());
debug!("pull query timeout");
return Err(Error::Timeout);
},
},
Err(_) => {
RpcMessage::Fail(RpcFail {
code: 0,
message: &format!("{server_name}: query took too long to execute.\n"),
})
.async_write(&mut stream)
.await?;
debug!("pull query timeout");
return Err(Error::Timeout);
},
};
};
let pulled_funcs = funcs.iter().filter(|v| v.is_some()).count();
state.metrics.pulls.inc_by(pulled_funcs as _);
state.metrics.queried_funcs.inc_by(md.funcs.len() as _);
Expand Down Expand Up @@ -259,13 +256,14 @@ async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
state: &SharedState, mut stream: S,
) -> Result<(), rpc::Error> {
let server_name = &state.server_name;
let hello = match timeout(Duration::from_secs(15), rpc::read_packet(&mut stream)).await {
Ok(v) => v?,
Err(_) => {
debug!("didn't get hello in time.");
return Ok(());
},
};
let hello =
match timeout(state.config.limits.hello_timeout, rpc::read_packet(&mut stream)).await {
Ok(v) => v?,
Err(_) => {
debug!("didn't get hello in time.");
return Ok(());
},
};

let (hello, creds) = match RpcMessage::deserialize(&hello) {
Ok(RpcMessage::Hello(v, creds)) => {
Expand Down Expand Up @@ -394,7 +392,9 @@ async fn serve(
debug!("Connection from {:?}{}: {} active connections", &addr, protocol, count);
match accpt {
Some(accpt) => {
match timeout(Duration::from_secs(10), accpt.accept(client)).await {
match timeout(state.config.limits.tls_handshake_timeout, accpt.accept(client))
.await
{
Ok(r) => match r {
Ok(s) => {
handle_connection(&state, s).await;
Expand Down

0 comments on commit 59df2ea

Please sign in to comment.