From 2ce5c6174608ed355b7907a49ff6f685f90a5a13 Mon Sep 17 00:00:00 2001 From: stefan-mysten <135084671+stefan-mysten@users.noreply.github.com> Date: Fri, 14 Jun 2024 16:14:34 -0700 Subject: [PATCH] Add checkpoint_timestamp_ms to the watermark and use it in a new health check endpoint function --- crates/sui-graphql-rpc/src/server/builder.rs | 76 ++++++++++++++++++- .../src/server/watermark_task.rs | 15 ++-- crates/sui-graphql-rpc/tests/e2e_tests.rs | 17 +++++ 3 files changed, 99 insertions(+), 9 deletions(-) diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index e13fa605d789ce..9e864aebf17033 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -36,12 +36,15 @@ use async_graphql::EmptySubscription; use async_graphql::{extensions::ExtensionFactory, Schema, SchemaBuilder}; use async_graphql_axum::{GraphQLRequest, GraphQLResponse}; use axum::extract::FromRef; -use axum::extract::{connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, State}; +use axum::extract::{ + connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, Query as AxumQuery, State, +}; use axum::http::{HeaderMap, StatusCode}; use axum::middleware::{self}; use axum::response::IntoResponse; -use axum::routing::{post, MethodRouter, Route}; +use axum::routing::{get, post, MethodRouter, Route}; use axum::{headers::Header, Router}; +use chrono::Utc; use http::{HeaderValue, Method, Request}; use hyper::server::conn::AddrIncoming as HyperAddrIncoming; use hyper::Body; @@ -63,6 +66,9 @@ use tower_http::cors::{AllowOrigin, CorsLayer}; use tracing::{info, warn}; use uuid::Uuid; +/// The default maximum lag between the current timestamp and the checkpoint timestamp. +const DEFAULT_MAX_CHECKPOINT_TS_LAG: u64 = 300_000; + pub(crate) struct Server { pub server: HyperServer>, watermark_task: WatermarkTask, @@ -253,7 +259,7 @@ impl ServerBuilder { .route("/:version", post(graphql_handler)) .route("/graphql", post(graphql_handler)) .route("/graphql/:version", post(graphql_handler)) - .route("/health", axum::routing::get(health_checks)) + .route("/health", get(health_check)) .with_state(self.state.clone()) .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler { metrics: self.state.metrics.clone(), @@ -580,7 +586,7 @@ impl Drop for MetricsCallbackHandler { struct GraphqlErrors(std::sync::Arc>); /// Connect via a TCPStream to the DB to check if it is alive -async fn health_checks(State(connection): State) -> StatusCode { +async fn db_health_check(State(connection): State) -> StatusCode { let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else { return StatusCode::INTERNAL_SERVER_ERROR; }; @@ -602,6 +608,42 @@ async fn health_checks(State(connection): State) -> StatusCode } } +#[derive(serde::Deserialize)] +struct HealthParam { + max_checkpoint_ts_lag: Option, +} + +/// Endpoint for querying the health of the service. +/// It returns 500 for any internal error, including not connecting to the DB, +/// and 503 if the checkpoint timestamp is too far behind the current timestamp as per the +/// max checkpoint timestamp lag query parameter, or the default value if not provided. +async fn health_check( + State(connection): State, + axum::Extension(watermark_lock): axum::Extension, + query_params: AxumQuery, +) -> StatusCode { + let db_health_check = db_health_check(axum::extract::State(connection)).await; + if db_health_check != StatusCode::OK { + return db_health_check; + } + + let checkpoint_timestamp = watermark_lock.read().await.checkpoint_timestamp_ms; + let now: u64 = if let Ok(now) = Utc::now().timestamp_millis().try_into() { + now + } else { + return StatusCode::INTERNAL_SERVER_ERROR; + }; + let max_checkpoint_ts_lag = query_params + .max_checkpoint_ts_lag + .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_TS_LAG); + + if (now - checkpoint_timestamp) > max_checkpoint_ts_lag { + return StatusCode::SERVICE_UNAVAILABLE; + } + + db_health_check +} + // One server per proc, so this is okay async fn get_or_init_server_start_time() -> &'static Instant { static ONCE: OnceCell = OnceCell::const_new(); @@ -644,6 +686,7 @@ pub mod tests { let cancellation_token = CancellationToken::new(); let watermark = Watermark { checkpoint: 1, + checkpoint_timestamp_ms: 1, epoch: 0, }; let state = AppState::new( @@ -1012,4 +1055,29 @@ pub mod tests { assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.); assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.); } + + pub async fn test_health_check_impl() { + let server_builder = prep_schema(None, None); + let url = format!( + "http://{}:{}/health", + server_builder.state.connection.host, server_builder.state.connection.port + ); + server_builder.build_schema(); + + let resp = reqwest::get(&url).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let url_with_param = format!("{}?max_checkpoint_ts_lag=10", url); + let resp = reqwest::get(&url_with_param).await.unwrap(); + assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE); + + let url_with_param = format!("{}?max_checkpoint_ts_lag=50000", url); + let resp = reqwest::get(&url_with_param).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + + let now: u64 = Utc::now().timestamp_millis().try_into().unwrap(); + let url_with_param = format!("{}?max_checkpoint_ts_lag={}", url, now); + let resp = reqwest::get(&url_with_param).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + } } diff --git a/crates/sui-graphql-rpc/src/server/watermark_task.rs b/crates/sui-graphql-rpc/src/server/watermark_task.rs index db072a3c5abd09..eae0827fd4006b 100644 --- a/crates/sui-graphql-rpc/src/server/watermark_task.rs +++ b/crates/sui-graphql-rpc/src/server/watermark_task.rs @@ -14,7 +14,8 @@ use tokio::sync::{watch, RwLock}; use tokio_util::sync::CancellationToken; use tracing::{error, info}; -/// Watermark task that periodically updates the current checkpoint and epoch values. +/// Watermark task that periodically updates the current checkpoint, checkpoint timestamp, and +/// epoch values. pub(crate) struct WatermarkTask { /// Thread-safe watermark that avoids writer starvation watermark: WatermarkLock, @@ -34,6 +35,8 @@ pub(crate) type WatermarkLock = Arc>; pub(crate) struct Watermark { /// The checkpoint upper-bound for the query. pub checkpoint: u64, + /// The checkpoint upper-bound timestamp for the query. + pub checkpoint_timestamp_ms: u64, /// The current epoch. pub epoch: u64, } @@ -67,7 +70,7 @@ impl WatermarkTask { return; }, _ = tokio::time::sleep(self.sleep) => { - let Watermark { checkpoint, epoch } = match Watermark::query(&self.db).await { + let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await { Ok(Some(watermark)) => watermark, Ok(None) => continue, Err(e) => { @@ -81,6 +84,7 @@ impl WatermarkTask { let prev_epoch = { let mut w = self.watermark.write().await; w.checkpoint = checkpoint; + w.checkpoint_timestamp_ms = checkpoint_timestamp_ms; mem::replace(&mut w.epoch, epoch) }; @@ -107,17 +111,18 @@ impl Watermark { let w = lock.read().await; Self { checkpoint: w.checkpoint, + checkpoint_timestamp_ms: w.checkpoint_timestamp_ms, epoch: w.epoch, } } pub(crate) async fn query(db: &Db) -> Result, Error> { use checkpoints::dsl; - let Some((checkpoint, epoch)): Option<(i64, i64)> = db + let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db .execute(move |conn| { conn.first(move || { dsl::checkpoints - .select((dsl::sequence_number, dsl::epoch)) + .select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch)) .order_by(dsl::sequence_number.desc()) }) .optional() @@ -127,9 +132,9 @@ impl Watermark { else { return Ok(None); }; - Ok(Some(Watermark { checkpoint: checkpoint as u64, + checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64, epoch: epoch as u64, })) } diff --git a/crates/sui-graphql-rpc/tests/e2e_tests.rs b/crates/sui-graphql-rpc/tests/e2e_tests.rs index 5cc840e8edbf0e..f6b85b84b911e9 100644 --- a/crates/sui-graphql-rpc/tests/e2e_tests.rs +++ b/crates/sui-graphql-rpc/tests/e2e_tests.rs @@ -896,4 +896,21 @@ mod tests { async fn test_query_complexity_metrics() { test_query_complexity_metrics_impl().await; } + + #[tokio::test] + #[serial] + async fn test_health_check() { + let _guard = telemetry_subscribers::TelemetryConfig::new() + .with_env() + .init(); + let connection_config = ConnectionConfig::ci_integration_test_cfg(); + let cluster = + sui_graphql_rpc::test_infra::cluster::start_cluster(connection_config, None).await; + + println!("Cluster started"); + cluster + .wait_for_checkpoint_catchup(0, Duration::from_secs(10)) + .await; + test_health_check_impl().await; + } }