Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GraphQL] Add health endpoint #18277

Merged
merged 2 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 73 additions & 6 deletions crates/sui-graphql-rpc/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ use async_graphql::EmptySubscription;
use async_graphql::{extensions::ExtensionFactory, Schema, SchemaBuilder};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::extract::FromRef;
use axum::extract::{connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, State};
use axum::extract::{
connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, Query as AxumQuery, State,
};
use axum::http::{HeaderMap, StatusCode};
use axum::middleware::{self};
use axum::response::IntoResponse;
use axum::routing::{post, MethodRouter, Route};
use axum::routing::{get, post, MethodRouter, Route};
use axum::Extension;
use axum::{headers::Header, Router};
use chrono::Utc;
use http::{HeaderValue, Method, Request};
use hyper::server::conn::AddrIncoming as HyperAddrIncoming;
use hyper::Body;
Expand All @@ -51,6 +55,7 @@ use mysten_network::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandl
use std::convert::Infallible;
use std::net::TcpStream;
use std::sync::Arc;
use std::time::Duration;
use std::{any::Any, net::SocketAddr, time::Instant};
use sui_graphql_rpc_headers::{LIMITS_HEADER, VERSION_HEADER};
use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
Expand All @@ -63,6 +68,9 @@ use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::{info, warn};
use uuid::Uuid;

/// The default allowed maximum lag between the current timestamp and the checkpoint timestamp.
const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);

pub(crate) struct Server {
pub server: HyperServer<HyperAddrIncoming, IntoMakeServiceWithConnectInfo<Router, SocketAddr>>,
watermark_task: WatermarkTask,
Expand Down Expand Up @@ -248,7 +256,7 @@ impl ServerBuilder {
.route("/:version", post(graphql_handler))
.route("/graphql", post(graphql_handler))
.route("/graphql/:version", post(graphql_handler))
.route("/health", axum::routing::get(health_checks))
.route("/health", get(health_check))
.with_state(self.state.clone())
.route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
metrics: self.state.metrics.clone(),
Expand Down Expand Up @@ -502,8 +510,8 @@ pub fn export_schema() -> String {
/// if set in the request headers, and the watermark as set by the background task.
async fn graphql_handler(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
schema: axum::Extension<SuiGraphQLSchema>,
axum::Extension(watermark_lock): axum::Extension<WatermarkLock>,
schema: Extension<SuiGraphQLSchema>,
Extension(watermark_lock): Extension<WatermarkLock>,
headers: HeaderMap,
req: GraphQLRequest,
) -> (axum::http::Extensions, GraphQLResponse) {
Expand Down Expand Up @@ -579,7 +587,7 @@ impl Drop for MetricsCallbackHandler {
struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);

/// Connect via a TCPStream to the DB to check if it is alive
async fn health_checks(State(connection): State<ConnectionConfig>) -> StatusCode {
async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
return StatusCode::INTERNAL_SERVER_ERROR;
};
Expand All @@ -601,6 +609,48 @@ async fn health_checks(State(connection): State<ConnectionConfig>) -> StatusCode
}
}

#[derive(serde::Deserialize)]
struct HealthParam {
max_checkpoint_lag_ms: Option<u64>,
}

/// Endpoint for querying the health of the service.
/// It returns 500 for any internal error, including not connecting to the DB,
/// and 504 if the checkpoint timestamp is too far behind the current timestamp as per the
/// max checkpoint timestamp lag query parameter, or the default value if not provided.
async fn health_check(
State(connection): State<ConnectionConfig>,
Extension(watermark_lock): Extension<WatermarkLock>,
AxumQuery(query_params): AxumQuery<HealthParam>,
) -> StatusCode {
let db_health_check = db_health_check(axum::extract::State(connection)).await;
if db_health_check != StatusCode::OK {
return db_health_check;
}

let max_checkpoint_lag_ms = query_params
.max_checkpoint_lag_ms
.map(Duration::from_millis)
.unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);

let checkpoint_timestamp =
Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms);

let now_millis = Utc::now().timestamp_millis();

// Check for negative timestamp or conversion failure
let now: Duration = match u64::try_from(now_millis) {
Ok(val) => Duration::from_millis(val),
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
};

if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
return StatusCode::GATEWAY_TIMEOUT;
}

db_health_check
}

// One server per proc, so this is okay
async fn get_or_init_server_start_time() -> &'static Instant {
static ONCE: OnceCell<Instant> = OnceCell::const_new();
Expand Down Expand Up @@ -651,6 +701,7 @@ pub mod tests {
let cancellation_token = CancellationToken::new();
let watermark = Watermark {
checkpoint: 1,
checkpoint_timestamp_ms: 1,
epoch: 0,
};
let state = AppState::new(
Expand Down Expand Up @@ -1019,4 +1070,20 @@ pub mod tests {
assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
}

pub async fn test_health_check_impl() {
let server_builder = prep_schema(None, None);
let url = format!(
"http://{}:{}/health",
server_builder.state.connection.host, server_builder.state.connection.port
);
server_builder.build_schema();

let resp = reqwest::get(&url).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);

let url_with_param = format!("{}?max_checkpoint_lag_ms=1", url);
let resp = reqwest::get(&url_with_param).await.unwrap();
assert_eq!(resp.status(), StatusCode::GATEWAY_TIMEOUT);
}
}
15 changes: 10 additions & 5 deletions crates/sui-graphql-rpc/src/server/watermark_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use tokio::sync::{watch, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

/// Watermark task that periodically updates the current checkpoint and epoch values.
/// Watermark task that periodically updates the current checkpoint, checkpoint timestamp, and
/// epoch values.
pub(crate) struct WatermarkTask {
/// Thread-safe watermark that avoids writer starvation
watermark: WatermarkLock,
Expand All @@ -34,6 +35,8 @@ pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;
pub(crate) struct Watermark {
/// The checkpoint upper-bound for the query.
pub checkpoint: u64,
/// The checkpoint upper-bound timestamp for the query.
pub checkpoint_timestamp_ms: u64,
/// The current epoch.
pub epoch: u64,
}
Expand Down Expand Up @@ -67,7 +70,7 @@ impl WatermarkTask {
return;
},
_ = tokio::time::sleep(self.sleep) => {
let Watermark { checkpoint, epoch } = match Watermark::query(&self.db).await {
let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await {
Ok(Some(watermark)) => watermark,
Ok(None) => continue,
Err(e) => {
Expand All @@ -81,6 +84,7 @@ impl WatermarkTask {
let prev_epoch = {
let mut w = self.watermark.write().await;
w.checkpoint = checkpoint;
w.checkpoint_timestamp_ms = checkpoint_timestamp_ms;
mem::replace(&mut w.epoch, epoch)
};

Expand All @@ -107,17 +111,18 @@ impl Watermark {
let w = lock.read().await;
Self {
checkpoint: w.checkpoint,
checkpoint_timestamp_ms: w.checkpoint_timestamp_ms,
epoch: w.epoch,
}
}

pub(crate) async fn query(db: &Db) -> Result<Option<Watermark>, Error> {
use checkpoints::dsl;
let Some((checkpoint, epoch)): Option<(i64, i64)> = db
let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db
.execute(move |conn| {
conn.first(move || {
dsl::checkpoints
.select((dsl::sequence_number, dsl::epoch))
.select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch))
.order_by(dsl::sequence_number.desc())
})
.optional()
Expand All @@ -127,9 +132,9 @@ impl Watermark {
else {
return Ok(None);
};

Ok(Some(Watermark {
checkpoint: checkpoint as u64,
checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64,
epoch: epoch as u64,
}))
}
Expand Down
17 changes: 17 additions & 0 deletions crates/sui-graphql-rpc/tests/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,4 +880,21 @@ mod tests {
async fn test_query_complexity_metrics() {
test_query_complexity_metrics_impl().await;
}

#[tokio::test]
#[serial]
async fn test_health_check() {
let _guard = telemetry_subscribers::TelemetryConfig::new()
.with_env()
.init();
let connection_config = ConnectionConfig::ci_integration_test_cfg();
let cluster =
sui_graphql_rpc::test_infra::cluster::start_cluster(connection_config, None).await;

println!("Cluster started");
cluster
.wait_for_checkpoint_catchup(0, Duration::from_secs(10))
.await;
test_health_check_impl().await;
}
}
Loading