Skip to content

Commit

Permalink
Switch to using Duration instead of u64
Browse files Browse the repository at this point in the history
  • Loading branch information
stefan-mysten committed Jun 17, 2024
1 parent 2ce5c61 commit ce0210c
Showing 1 changed file with 28 additions and 29 deletions.
57 changes: 28 additions & 29 deletions crates/sui-graphql-rpc/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use axum::http::{HeaderMap, StatusCode};
use axum::middleware::{self};
use axum::response::IntoResponse;
use axum::routing::{get, post, MethodRouter, Route};
use axum::Extension;
use axum::{headers::Header, Router};
use chrono::Utc;
use http::{HeaderValue, Method, Request};
Expand All @@ -54,6 +55,7 @@ use mysten_network::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandl
use std::convert::Infallible;
use std::net::TcpStream;
use std::sync::Arc;
use std::time::Duration;
use std::{any::Any, net::SocketAddr, time::Instant};
use sui_graphql_rpc_headers::{LIMITS_HEADER, VERSION_HEADER};
use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
Expand All @@ -66,8 +68,8 @@ use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::{info, warn};
use uuid::Uuid;

/// The default maximum lag between the current timestamp and the checkpoint timestamp.
const DEFAULT_MAX_CHECKPOINT_TS_LAG: u64 = 300_000;
/// The default allowed maximum lag between the current timestamp and the checkpoint timestamp.
const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);

pub(crate) struct Server {
pub server: HyperServer<HyperAddrIncoming, IntoMakeServiceWithConnectInfo<Router, SocketAddr>>,
Expand Down Expand Up @@ -509,8 +511,8 @@ pub fn export_schema() -> String {
/// if set in the request headers, and the watermark as set by the background task.
async fn graphql_handler(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
schema: axum::Extension<SuiGraphQLSchema>,
axum::Extension(watermark_lock): axum::Extension<WatermarkLock>,
schema: Extension<SuiGraphQLSchema>,
Extension(watermark_lock): Extension<WatermarkLock>,
headers: HeaderMap,
req: GraphQLRequest,
) -> (axum::http::Extensions, GraphQLResponse) {
Expand Down Expand Up @@ -610,35 +612,41 @@ async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCo

#[derive(serde::Deserialize)]
struct HealthParam {
max_checkpoint_ts_lag: Option<u64>,
max_checkpoint_lag_ms: Option<u64>,
}

/// Endpoint for querying the health of the service.
/// It returns 500 for any internal error, including not connecting to the DB,
/// and 503 if the checkpoint timestamp is too far behind the current timestamp as per the
/// and 504 if the checkpoint timestamp is too far behind the current timestamp as per the
/// max checkpoint timestamp lag query parameter, or the default value if not provided.
async fn health_check(
State(connection): State<ConnectionConfig>,
axum::Extension(watermark_lock): axum::Extension<WatermarkLock>,
query_params: AxumQuery<HealthParam>,
Extension(watermark_lock): Extension<WatermarkLock>,
AxumQuery(query_params): AxumQuery<HealthParam>,
) -> StatusCode {
let db_health_check = db_health_check(axum::extract::State(connection)).await;
if db_health_check != StatusCode::OK {
return db_health_check;
}

let checkpoint_timestamp = watermark_lock.read().await.checkpoint_timestamp_ms;
let now: u64 = if let Ok(now) = Utc::now().timestamp_millis().try_into() {
now
} else {
return StatusCode::INTERNAL_SERVER_ERROR;
let max_checkpoint_lag_ms = query_params
.max_checkpoint_lag_ms
.map(Duration::from_millis)
.unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);

let checkpoint_timestamp =
Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms);

let now_millis = Utc::now().timestamp_millis();

// Check for negative timestamp or conversion failure
let now: Duration = match u64::try_from(now_millis) {
Ok(val) => Duration::from_millis(val),
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
};
let max_checkpoint_ts_lag = query_params
.max_checkpoint_ts_lag
.unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_TS_LAG);

if (now - checkpoint_timestamp) > max_checkpoint_ts_lag {
return StatusCode::SERVICE_UNAVAILABLE;
if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
return StatusCode::GATEWAY_TIMEOUT;
}

db_health_check
Expand Down Expand Up @@ -1067,17 +1075,8 @@ pub mod tests {
let resp = reqwest::get(&url).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);

let url_with_param = format!("{}?max_checkpoint_ts_lag=10", url);
let url_with_param = format!("{}?max_checkpoint_lag_ms=1", url);
let resp = reqwest::get(&url_with_param).await.unwrap();
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);

let url_with_param = format!("{}?max_checkpoint_ts_lag=50000", url);
let resp = reqwest::get(&url_with_param).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);

let now: u64 = Utc::now().timestamp_millis().try_into().unwrap();
let url_with_param = format!("{}?max_checkpoint_ts_lag={}", url, now);
let resp = reqwest::get(&url_with_param).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
assert_eq!(resp.status(), StatusCode::GATEWAY_TIMEOUT);
}
}

0 comments on commit ce0210c

Please sign in to comment.