From 7104e5f5b7a3aa435ca9f5c383c6e5cebcae289c Mon Sep 17 00:00:00 2001 From: stefan-mysten <135084671+stefan-mysten@users.noreply.github.com> Date: Fri, 14 Jun 2024 16:14:34 -0700 Subject: [PATCH 1/2] Add checkpoint_timestamp_ms to the watermark and use it in a new health check endpoint function --- crates/sui-graphql-rpc/src/server/builder.rs | 76 ++++++++++++++++++- .../src/server/watermark_task.rs | 15 ++-- crates/sui-graphql-rpc/tests/e2e_tests.rs | 17 +++++ 3 files changed, 99 insertions(+), 9 deletions(-) diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index 5dc9bbeb3f30b..a15f86d165c74 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -36,12 +36,15 @@ use async_graphql::EmptySubscription; use async_graphql::{extensions::ExtensionFactory, Schema, SchemaBuilder}; use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; use axum::extract::FromRef; -use axum::extract::{connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, State}; +use axum::extract::{ + connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, Query as AxumQuery, State, +}; use axum::http::{HeaderMap, StatusCode}; use axum::middleware::{self}; use axum::response::IntoResponse; -use axum::routing::{post, MethodRouter, Route}; +use axum::routing::{get, post, MethodRouter, Route}; use axum::{headers::Header, Router}; +use chrono::Utc; use http::{HeaderValue, Method, Request}; use hyper::server::conn::AddrIncoming as HyperAddrIncoming; use hyper::Body; @@ -63,6 +66,9 @@ use tower_http::cors::{AllowOrigin, CorsLayer}; use tracing::{info, warn}; use uuid::Uuid; +/// The default maximum lag between the current timestamp and the checkpoint timestamp. +const DEFAULT_MAX_CHECKPOINT_TS_LAG: u64 = 300_000; + pub(crate) struct Server { pub server: HyperServer>, watermark_task: WatermarkTask, @@ -248,7 +254,7 @@ impl ServerBuilder { .route("/:version", post(graphql_handler)) .route("/graphql", post(graphql_handler)) .route("/graphql/:version", post(graphql_handler)) - .route("/health", axum::routing::get(health_checks)) + .route("/health", get(health_check)) .with_state(self.state.clone()) .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler { metrics: self.state.metrics.clone(), @@ -579,7 +585,7 @@ impl Drop for MetricsCallbackHandler { struct GraphqlErrors(std::sync::Arc>); /// Connect via a TCPStream to the DB to check if it is alive -async fn health_checks(State(connection): State) -> StatusCode { +async fn db_health_check(State(connection): State) -> StatusCode { let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else { return StatusCode::INTERNAL_SERVER_ERROR; }; @@ -601,6 +607,42 @@ async fn health_checks(State(connection): State) -> StatusCode } } +#[derive(serde::Deserialize)] +struct HealthParam { + max_checkpoint_ts_lag: Option, +} + +/// Endpoint for querying the health of the service. +/// It returns 500 for any internal error, including not connecting to the DB, +/// and 503 if the checkpoint timestamp is too far behind the current timestamp as per the +/// max checkpoint timestamp lag query parameter, or the default value if not provided. +async fn health_check( + State(connection): State, + axum::Extension(watermark_lock): axum::Extension, + query_params: AxumQuery, +) -> StatusCode { + let db_health_check = db_health_check(axum::extract::State(connection)).await; + if db_health_check != StatusCode::OK { + return db_health_check; + } + + let checkpoint_timestamp = watermark_lock.read().await.checkpoint_timestamp_ms; + let now: u64 = if let Ok(now) = Utc::now().timestamp_millis().try_into() { + now + } else { + return StatusCode::INTERNAL_SERVER_ERROR; + }; + let max_checkpoint_ts_lag = query_params + .max_checkpoint_ts_lag + .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_TS_LAG); + + if (now - checkpoint_timestamp) > max_checkpoint_ts_lag { + return StatusCode::SERVICE_UNAVAILABLE; + } + + db_health_check +} + // One server per proc, so this is okay async fn get_or_init_server_start_time() -> &'static Instant { static ONCE: OnceCell = OnceCell::const_new(); @@ -651,6 +693,7 @@ pub mod tests { let cancellation_token = CancellationToken::new(); let watermark = Watermark { checkpoint: 1, + checkpoint_timestamp_ms: 1, epoch: 0, }; let state = AppState::new( @@ -1019,4 +1062,29 @@ pub mod tests { assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.); assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.); } + + pub async fn test_health_check_impl() { + let server_builder = prep_schema(None, None); + let url = format!( + "http://{}:{}/health", + server_builder.state.connection.host, server_builder.state.connection.port + ); + server_builder.build_schema(); + + let resp = reqwest::get(&url).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let url_with_param = format!("{}?max_checkpoint_ts_lag=10", url); + let resp = reqwest::get(&url_with_param).await.unwrap(); + assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE); + + let url_with_param = format!("{}?max_checkpoint_ts_lag=50000", url); + let resp = reqwest::get(&url_with_param).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let now: u64 = Utc::now().timestamp_millis().try_into().unwrap(); + let url_with_param = format!("{}?max_checkpoint_ts_lag={}", url, now); + let resp = reqwest::get(&url_with_param).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + } } diff --git a/crates/sui-graphql-rpc/src/server/watermark_task.rs b/crates/sui-graphql-rpc/src/server/watermark_task.rs index db072a3c5abd0..eae0827fd4006 100644 --- a/crates/sui-graphql-rpc/src/server/watermark_task.rs +++ b/crates/sui-graphql-rpc/src/server/watermark_task.rs @@ -14,7 +14,8 @@ use tokio::sync::{watch, RwLock}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; -/// Watermark task that periodically updates the current checkpoint and epoch values. +/// Watermark task that periodically updates the current checkpoint, checkpoint timestamp, and +/// epoch values. pub(crate) struct WatermarkTask { /// Thread-safe watermark that avoids writer starvation watermark: WatermarkLock, @@ -34,6 +35,8 @@ pub(crate) type WatermarkLock = Arc>; pub(crate) struct Watermark { /// The checkpoint upper-bound for the query. pub checkpoint: u64, + /// The checkpoint upper-bound timestamp for the query. + pub checkpoint_timestamp_ms: u64, /// The current epoch. pub epoch: u64, } @@ -67,7 +70,7 @@ impl WatermarkTask { return; }, _ = tokio::time::sleep(self.sleep) => { - let Watermark { checkpoint, epoch } = match Watermark::query(&self.db).await { + let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await { Ok(Some(watermark)) => watermark, Ok(None) => continue, Err(e) => { @@ -81,6 +84,7 @@ impl WatermarkTask { let prev_epoch = { let mut w = self.watermark.write().await; w.checkpoint = checkpoint; + w.checkpoint_timestamp_ms = checkpoint_timestamp_ms; mem::replace(&mut w.epoch, epoch) }; @@ -107,17 +111,18 @@ impl Watermark { let w = lock.read().await; Self { checkpoint: w.checkpoint, + checkpoint_timestamp_ms: w.checkpoint_timestamp_ms, epoch: w.epoch, } } pub(crate) async fn query(db: &Db) -> Result, Error> { use checkpoints::dsl; - let Some((checkpoint, epoch)): Option<(i64, i64)> = db + let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db .execute(move |conn| { conn.first(move || { dsl::checkpoints - .select((dsl::sequence_number, dsl::epoch)) + .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch)) .order_by(dsl::sequence_number.desc()) }) .optional() @@ -127,9 +132,9 @@ impl Watermark { else { return Ok(None); }; - Ok(Some(Watermark { checkpoint: checkpoint as u64, + checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64, epoch: epoch as u64, })) } diff --git a/crates/sui-graphql-rpc/tests/e2e_tests.rs b/crates/sui-graphql-rpc/tests/e2e_tests.rs index bc06a5aeddcd9..b0c43867b68b2 100644 --- a/crates/sui-graphql-rpc/tests/e2e_tests.rs +++ b/crates/sui-graphql-rpc/tests/e2e_tests.rs @@ -880,4 +880,21 @@ mod tests { async fn test_query_complexity_metrics() { test_query_complexity_metrics_impl().await; } + + #[tokio::test] + #[serial] + async fn test_health_check() { + let _guard = telemetry_subscribers::TelemetryConfig::new() + .with_env() + .init(); + let connection_config = ConnectionConfig::ci_integration_test_cfg(); + let cluster = + sui_graphql_rpc::test_infra::cluster::start_cluster(connection_config, None).await; + + println!("Cluster started"); + cluster + .wait_for_checkpoint_catchup(0, Duration::from_secs(10)) + .await; + test_health_check_impl().await; + } } From 9e5b9a08ecd1b5d060c226ac9d55fafe9ff4e504 Mon Sep 17 00:00:00 2001 From: stefan-mysten <135084671+stefan-mysten@users.noreply.github.com> Date: Sun, 16 Jun 2024 19:51:16 -0700 Subject: [PATCH 2/2] Switch to using Duration instead of u64 --- crates/sui-graphql-rpc/src/server/builder.rs | 57 ++++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index a15f86d165c74..6bbca11157e33 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -43,6 +43,7 @@ use axum::http::{HeaderMap, StatusCode}; use axum::middleware::{self}; use axum::response::IntoResponse; use axum::routing::{get, post, MethodRouter, Route}; +use axum::Extension; use axum::{headers::Header, Router}; use chrono::Utc; use http::{HeaderValue, Method, Request}; @@ -54,6 +55,7 @@ use mysten_network::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandl use std::convert::Infallible; use std::net::TcpStream; use std::sync::Arc; +use std::time::Duration; use std::{any::Any, net::SocketAddr, time::Instant}; use sui_graphql_rpc_headers::{LIMITS_HEADER, VERSION_HEADER}; use sui_package_resolver::{PackageStoreWithLruCache, Resolver}; @@ -66,8 +68,8 @@ use tower_http::cors::{AllowOrigin, CorsLayer}; use tracing::{info, warn}; use uuid::Uuid; -/// The default maximum lag between the current timestamp and the checkpoint timestamp. -const DEFAULT_MAX_CHECKPOINT_TS_LAG: u64 = 300_000; +/// The default allowed maximum lag between the current timestamp and the checkpoint timestamp. +const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300); pub(crate) struct Server { pub server: HyperServer>, @@ -508,8 +510,8 @@ pub fn export_schema() -> String { /// if set in the request headers, and the watermark as set by the background task. async fn graphql_handler( ConnectInfo(addr): ConnectInfo, - schema: axum::Extension, - axum::Extension(watermark_lock): axum::Extension, + schema: Extension, + Extension(watermark_lock): Extension, headers: HeaderMap, req: GraphQLRequest, ) -> (axum::http::Extensions, GraphQLResponse) { @@ -609,35 +611,41 @@ async fn db_health_check(State(connection): State) -> StatusCo #[derive(serde::Deserialize)] struct HealthParam { - max_checkpoint_ts_lag: Option, + max_checkpoint_lag_ms: Option, } /// Endpoint for querying the health of the service. /// It returns 500 for any internal error, including not connecting to the DB, -/// and 503 if the checkpoint timestamp is too far behind the current timestamp as per the +/// and 504 if the checkpoint timestamp is too far behind the current timestamp as per the /// max checkpoint timestamp lag query parameter, or the default value if not provided. async fn health_check( State(connection): State, - axum::Extension(watermark_lock): axum::Extension, - query_params: AxumQuery, + Extension(watermark_lock): Extension, + AxumQuery(query_params): AxumQuery, ) -> StatusCode { let db_health_check = db_health_check(axum::extract::State(connection)).await; if db_health_check != StatusCode::OK { return db_health_check; } - let checkpoint_timestamp = watermark_lock.read().await.checkpoint_timestamp_ms; - let now: u64 = if let Ok(now) = Utc::now().timestamp_millis().try_into() { - now - } else { - return StatusCode::INTERNAL_SERVER_ERROR; + let max_checkpoint_lag_ms = query_params + .max_checkpoint_lag_ms + .map(Duration::from_millis) + .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG); + + let checkpoint_timestamp = + Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms); + + let now_millis = Utc::now().timestamp_millis(); + + // Check for negative timestamp or conversion failure + let now: Duration = match u64::try_from(now_millis) { + Ok(val) => Duration::from_millis(val), + Err(_) => return StatusCode::INTERNAL_SERVER_ERROR, }; - let max_checkpoint_ts_lag = query_params - .max_checkpoint_ts_lag - .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_TS_LAG); - if (now - checkpoint_timestamp) > max_checkpoint_ts_lag { - return StatusCode::SERVICE_UNAVAILABLE; + if (now - checkpoint_timestamp) > max_checkpoint_lag_ms { + return StatusCode::GATEWAY_TIMEOUT; } db_health_check @@ -1074,17 +1082,8 @@ pub mod tests { let resp = reqwest::get(&url).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); - let url_with_param = format!("{}?max_checkpoint_ts_lag=10", url); + let url_with_param = format!("{}?max_checkpoint_lag_ms=1", url); let resp = reqwest::get(&url_with_param).await.unwrap(); - assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE); - - let url_with_param = format!("{}?max_checkpoint_ts_lag=50000", url); - let resp = reqwest::get(&url_with_param).await.unwrap(); - assert_eq!(resp.status(), StatusCode::OK); - - let now: u64 = Utc::now().timestamp_millis().try_into().unwrap(); - let url_with_param = format!("{}?max_checkpoint_ts_lag={}", url, now); - let resp = reqwest::get(&url_with_param).await.unwrap(); - assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(resp.status(), StatusCode::GATEWAY_TIMEOUT); } }