Skip to content

Commit

Permalink
Add checkpoint_timestamp_ms to the watermark and use it in a new heal…
Browse files Browse the repository at this point in the history
…th check endpoint function
  • Loading branch information
stefan-mysten committed Jun 15, 2024
1 parent 55f370f commit 2ce5c61
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 9 deletions.
76 changes: 72 additions & 4 deletions crates/sui-graphql-rpc/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ use async_graphql::EmptySubscription;
use async_graphql::{extensions::ExtensionFactory, Schema, SchemaBuilder};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use axum::extract::FromRef;
use axum::extract::{connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, State};
use axum::extract::{
connect_info::IntoMakeServiceWithConnectInfo, ConnectInfo, Query as AxumQuery, State,
};
use axum::http::{HeaderMap, StatusCode};
use axum::middleware::{self};
use axum::response::IntoResponse;
use axum::routing::{post, MethodRouter, Route};
use axum::routing::{get, post, MethodRouter, Route};
use axum::{headers::Header, Router};
use chrono::Utc;
use http::{HeaderValue, Method, Request};
use hyper::server::conn::AddrIncoming as HyperAddrIncoming;
use hyper::Body;
Expand All @@ -63,6 +66,9 @@ use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::{info, warn};
use uuid::Uuid;

/// The default maximum lag between the current timestamp and the checkpoint timestamp.
const DEFAULT_MAX_CHECKPOINT_TS_LAG: u64 = 300_000;

pub(crate) struct Server {
pub server: HyperServer<HyperAddrIncoming, IntoMakeServiceWithConnectInfo<Router, SocketAddr>>,
watermark_task: WatermarkTask,
Expand Down Expand Up @@ -253,7 +259,7 @@ impl ServerBuilder {
.route("/:version", post(graphql_handler))
.route("/graphql", post(graphql_handler))
.route("/graphql/:version", post(graphql_handler))
.route("/health", axum::routing::get(health_checks))
.route("/health", get(health_check))
.with_state(self.state.clone())
.route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
metrics: self.state.metrics.clone(),
Expand Down Expand Up @@ -580,7 +586,7 @@ impl Drop for MetricsCallbackHandler {
struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);

/// Connect via a TCPStream to the DB to check if it is alive
async fn health_checks(State(connection): State<ConnectionConfig>) -> StatusCode {
async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
return StatusCode::INTERNAL_SERVER_ERROR;
};
Expand All @@ -602,6 +608,42 @@ async fn health_checks(State(connection): State<ConnectionConfig>) -> StatusCode
}
}

#[derive(serde::Deserialize)]
struct HealthParam {
max_checkpoint_ts_lag: Option<u64>,
}

/// Endpoint for querying the health of the service.
/// It returns 500 for any internal error, including not connecting to the DB,
/// and 503 if the checkpoint timestamp is too far behind the current timestamp as per the
/// max checkpoint timestamp lag query parameter, or the default value if not provided.
async fn health_check(
State(connection): State<ConnectionConfig>,
axum::Extension(watermark_lock): axum::Extension<WatermarkLock>,
query_params: AxumQuery<HealthParam>,
) -> StatusCode {
let db_health_check = db_health_check(axum::extract::State(connection)).await;
if db_health_check != StatusCode::OK {
return db_health_check;
}

let checkpoint_timestamp = watermark_lock.read().await.checkpoint_timestamp_ms;
let now: u64 = if let Ok(now) = Utc::now().timestamp_millis().try_into() {
now
} else {
return StatusCode::INTERNAL_SERVER_ERROR;
};
let max_checkpoint_ts_lag = query_params
.max_checkpoint_ts_lag
.unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_TS_LAG);

if (now - checkpoint_timestamp) > max_checkpoint_ts_lag {
return StatusCode::SERVICE_UNAVAILABLE;
}

db_health_check
}

// One server per proc, so this is okay
async fn get_or_init_server_start_time() -> &'static Instant {
static ONCE: OnceCell<Instant> = OnceCell::const_new();
Expand Down Expand Up @@ -644,6 +686,7 @@ pub mod tests {
let cancellation_token = CancellationToken::new();
let watermark = Watermark {
checkpoint: 1,
checkpoint_timestamp_ms: 1,
epoch: 0,
};
let state = AppState::new(
Expand Down Expand Up @@ -1012,4 +1055,29 @@ pub mod tests {
assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
}

pub async fn test_health_check_impl() {
let server_builder = prep_schema(None, None);
let url = format!(
"http://{}:{}/health",
server_builder.state.connection.host, server_builder.state.connection.port
);
server_builder.build_schema();

let resp = reqwest::get(&url).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);

let url_with_param = format!("{}?max_checkpoint_ts_lag=10", url);
let resp = reqwest::get(&url_with_param).await.unwrap();
assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);

let url_with_param = format!("{}?max_checkpoint_ts_lag=50000", url);
let resp = reqwest::get(&url_with_param).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);

let now: u64 = Utc::now().timestamp_millis().try_into().unwrap();
let url_with_param = format!("{}?max_checkpoint_ts_lag={}", url, now);
let resp = reqwest::get(&url_with_param).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
}
15 changes: 10 additions & 5 deletions crates/sui-graphql-rpc/src/server/watermark_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use tokio::sync::{watch, RwLock};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

/// Watermark task that periodically updates the current checkpoint and epoch values.
/// Watermark task that periodically updates the current checkpoint, checkpoint timestamp, and
/// epoch values.
pub(crate) struct WatermarkTask {
/// Thread-safe watermark that avoids writer starvation
watermark: WatermarkLock,
Expand All @@ -34,6 +35,8 @@ pub(crate) type WatermarkLock = Arc<RwLock<Watermark>>;
pub(crate) struct Watermark {
/// The checkpoint upper-bound for the query.
pub checkpoint: u64,
/// The checkpoint upper-bound timestamp for the query.
pub checkpoint_timestamp_ms: u64,
/// The current epoch.
pub epoch: u64,
}
Expand Down Expand Up @@ -67,7 +70,7 @@ impl WatermarkTask {
return;
},
_ = tokio::time::sleep(self.sleep) => {
let Watermark { checkpoint, epoch } = match Watermark::query(&self.db).await {
let Watermark {checkpoint, epoch, checkpoint_timestamp_ms } = match Watermark::query(&self.db).await {
Ok(Some(watermark)) => watermark,
Ok(None) => continue,
Err(e) => {
Expand All @@ -81,6 +84,7 @@ impl WatermarkTask {
let prev_epoch = {
let mut w = self.watermark.write().await;
w.checkpoint = checkpoint;
w.checkpoint_timestamp_ms = checkpoint_timestamp_ms;
mem::replace(&mut w.epoch, epoch)
};

Expand All @@ -107,17 +111,18 @@ impl Watermark {
let w = lock.read().await;
Self {
checkpoint: w.checkpoint,
checkpoint_timestamp_ms: w.checkpoint_timestamp_ms,
epoch: w.epoch,
}
}

pub(crate) async fn query(db: &Db) -> Result<Option<Watermark>, Error> {
use checkpoints::dsl;
let Some((checkpoint, epoch)): Option<(i64, i64)> = db
let Some((checkpoint, checkpoint_timestamp_ms, epoch)): Option<(i64, i64, i64)> = db
.execute(move |conn| {
conn.first(move || {
dsl::checkpoints
.select((dsl::sequence_number, dsl::epoch))
.select((dsl::sequence_number, dsl::timestamp_ms, dsl::epoch))
.order_by(dsl::sequence_number.desc())
})
.optional()
Expand All @@ -127,9 +132,9 @@ impl Watermark {
else {
return Ok(None);
};

Ok(Some(Watermark {
checkpoint: checkpoint as u64,
checkpoint_timestamp_ms: checkpoint_timestamp_ms as u64,
epoch: epoch as u64,
}))
}
Expand Down
17 changes: 17 additions & 0 deletions crates/sui-graphql-rpc/tests/e2e_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,4 +896,21 @@ mod tests {
async fn test_query_complexity_metrics() {
test_query_complexity_metrics_impl().await;
}

#[tokio::test]
#[serial]
async fn test_health_check() {
let _guard = telemetry_subscribers::TelemetryConfig::new()
.with_env()
.init();
let connection_config = ConnectionConfig::ci_integration_test_cfg();
let cluster =
sui_graphql_rpc::test_infra::cluster::start_cluster(connection_config, None).await;

println!("Cluster started");
cluster
.wait_for_checkpoint_catchup(0, Duration::from_secs(10))
.await;
test_health_check_impl().await;
}
}

0 comments on commit 2ce5c61

Please sign in to comment.