From 2b3991a841ce655bd790abfcbbdc5d2f27e14448 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Thu, 5 Sep 2024 16:28:12 -0700 Subject: [PATCH] [indexer] Compatibility check using migration records (#19156) ## Description Previously we check DB compatibility by making sure that we could make select query on all columns to the DB based on the locally known schema. This doesn't cover all cases, for instance, there could be tables in the DB that does not exist in the local schema. This PR changes how we do the compatibility check by fully leveraging the migration records. It checks that the migration records in the DB must fully match with the locally known one. It also moves the code to sui-indexer crate, so that we could do this check on the startup of both indexer and graphql server. This does require that from now on we fully respect the migration scripts, and don't do adhoc modifications on the existing migration. ## Test plan CI --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- crates/sui-graphql-rpc/src/data/pg.rs | 9 +- crates/sui-graphql-rpc/src/server/builder.rs | 14 +- .../src/server/compatibility_check.rs | 66 ------- crates/sui-graphql-rpc/src/server/mod.rs | 1 - crates/sui-indexer/src/config.rs | 2 + crates/sui-indexer/src/db.rs | 180 +++++++++++++++--- crates/sui-indexer/src/errors.rs | 3 + crates/sui-indexer/src/main.rs | 19 +- crates/sui-indexer/src/schema/pg.rs | 75 ++++---- crates/sui-indexer/src/tempdb.rs | 5 +- crates/sui-indexer/src/test_utils.rs | 6 +- 11 files changed, 218 insertions(+), 162 deletions(-) delete mode 100644 crates/sui-graphql-rpc/src/server/compatibility_check.rs diff --git a/crates/sui-graphql-rpc/src/data/pg.rs b/crates/sui-graphql-rpc/src/data/pg.rs index 19d65c7cd1ec9..9d5aadcba5158 100644 --- a/crates/sui-graphql-rpc/src/data/pg.rs +++ b/crates/sui-graphql-rpc/src/data/pg.rs @@ -200,7 +200,6 @@ mod tests { use diesel::QueryDsl; use sui_framework::BuiltInFramework; use sui_indexer::{ - database::Connection, db::{get_pool_connection, new_connection_pool, reset_database, ConnectionPoolConfig}, models::objects::StoredObject, schema::objects, @@ -215,13 +214,7 @@ mod tests { pool_config.set_pool_size(5); let pool = new_connection_pool(database.database().url().as_str(), &pool_config).unwrap(); let mut conn = get_pool_connection(&pool).unwrap(); - reset_database( - Connection::dedicated(database.database().url()) - .await - .unwrap(), - ) - .await - .unwrap(); + reset_database(&mut conn).await.unwrap(); let objects: Vec = BuiltInFramework::iter_system_packages() .map(|pkg| IndexedObject::from_object(1, pkg.genesis_object(), None).into()) diff --git a/crates/sui-graphql-rpc/src/server/builder.rs b/crates/sui-graphql-rpc/src/server/builder.rs index 8e9053f7f4123..6b77fddf14b89 100644 --- a/crates/sui-graphql-rpc/src/server/builder.rs +++ b/crates/sui-graphql-rpc/src/server/builder.rs @@ -1,7 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use super::compatibility_check::check_all_tables; use super::exchange_rates_task::TriggerExchangeRatesTask; use super::system_package_task::SystemPackageTask; use super::watermark_task::{ChainIdentifierLock, Watermark, WatermarkLock, WatermarkTask}; @@ -58,6 +57,7 @@ use std::sync::Arc; use std::time::Duration; use std::{any::Any, net::SocketAddr, time::Instant}; use sui_graphql_rpc_headers::LIMITS_HEADER; +use sui_indexer::db::{check_db_migration_consistency, get_pool_connection}; use sui_package_resolver::{PackageStoreWithLruCache, Resolver}; use sui_sdk::SuiClientBuilder; use tokio::join; @@ -88,10 +88,8 @@ impl Server { pub async fn run(mut self) -> Result<(), Error> { get_or_init_server_start_time().await; - // Compatibility check - info!("Starting compatibility check"); - check_all_tables(&self.db_reader).await?; - info!("Compatibility check passed"); + let mut connection = get_pool_connection(&self.db_reader.inner.get_pool())?; + check_db_migration_consistency(&mut connection)?; // A handle that spawns a background task to periodically update the `Watermark`, which // consists of the checkpoint upper bound and current epoch. @@ -545,7 +543,7 @@ async fn graphql_handler( let result = schema.execute(req).await; - // If there are errors, insert them as an extention so that the Metrics callback handler can + // If there are errors, insert them as an extension so that the Metrics callback handler can // pull it out later. let mut extensions = axum::http::Extensions::new(); if result.is_err() { @@ -776,7 +774,7 @@ pub mod tests { cluster .wait_for_checkpoint_catchup(1, Duration::from_secs(10)) .await; - // timeout test includes mutation timeout, which requies a [SuiClient] to be able to run + // timeout test includes mutation timeout, which requires a [SuiClient] to be able to run // the test, and a transaction. [WalletContext] gives access to everything that's needed. let wallet = &cluster.network.validator_fullnode_handle.wallet; let db_url = cluster.network.graphql_connection_config.db_url.clone(); @@ -1685,7 +1683,7 @@ pub mod tests { // Test that when variables are re-used as execution params, the size of the variable is // only counted once. - // First, check that `eror_passed_tx_checks` is working, by submitting a request that will + // First, check that `error_passed_tx_checks` is working, by submitting a request that will // fail the initial payload check. assert!(!passed_tx_checks( &execute_for_error( diff --git a/crates/sui-graphql-rpc/src/server/compatibility_check.rs b/crates/sui-graphql-rpc/src/server/compatibility_check.rs deleted file mode 100644 index 7ccdb4d5d313d..0000000000000 --- a/crates/sui-graphql-rpc/src/server/compatibility_check.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use crate::data::{Db, DbConnection, DieselBackend, DieselConn, QueryExecutor}; -use crate::error::Error; -use diesel::query_builder::{AstPass, Query, QueryFragment, QueryId}; -use diesel::sql_types::Bool; -use diesel::{QueryDsl, QueryResult, RunQueryDsl}; - -/// Generates a function: `check_all_tables` that runs a query against every table this GraphQL -/// service is aware of, to test for schema compatibility. Each query is of the form: -/// -/// SELECT TRUE FROM (...) q WHERE FALSE -/// -/// where `...` is a query selecting all of the fields from the given table. The query is expected -/// to return no results, but will complain if it relies on a column that doesn't exist. -macro_rules! generate_compatibility_check { - ($($table:ident),*) => { - pub(crate) async fn check_all_tables(db: &Db) -> Result<(), Error> { - use futures::future::join_all; - use sui_indexer::schema::*; - - let futures = vec![ - $( - db.execute(|conn| Ok::<_, diesel::result::Error>( - conn.results::<_, bool>(move || Check { - query: $table::table.select($table::all_columns) - }) - .is_ok() - )) - ),* - ]; - - let results = join_all(futures).await; - if results.into_iter().all(|res| res.unwrap_or(false)) { - Ok(()) - } else { - Err(Error::Internal( - "One or more tables are missing expected columns".into(), - )) - } - } - }; -} - -sui_indexer::for_all_tables!(generate_compatibility_check); - -#[derive(Debug, Clone, Copy, QueryId)] -struct Check { - query: Q, -} - -impl Query for Check { - type SqlType = Bool; -} - -impl RunQueryDsl for Check {} - -impl> QueryFragment for Check { - fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, DieselBackend>) -> QueryResult<()> { - out.push_sql("SELECT TRUE FROM ("); - self.query.walk_ast(out.reborrow())?; - out.push_sql(") q WHERE FALSE"); - Ok(()) - } -} diff --git a/crates/sui-graphql-rpc/src/server/mod.rs b/crates/sui-graphql-rpc/src/server/mod.rs index fe30cdd930ce2..bc0124a6097d3 100644 --- a/crates/sui-graphql-rpc/src/server/mod.rs +++ b/crates/sui-graphql-rpc/src/server/mod.rs @@ -4,7 +4,6 @@ pub mod graphiql_server; pub mod builder; -pub(crate) mod compatibility_check; pub(crate) mod exchange_rates_task; pub(crate) mod system_package_task; pub mod version; diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index 1f8372ca87531..a67853badbfc2 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -158,6 +158,8 @@ pub enum Command { #[clap(long)] force: bool, }, + /// Run through the migration scripts. + RunMigrations, } #[derive(Args, Default, Debug, Clone)] diff --git a/crates/sui-indexer/src/db.rs b/crates/sui-indexer/src/db.rs index f938d96c424d6..5fa20040ad138 100644 --- a/crates/sui-indexer/src/db.rs +++ b/crates/sui-indexer/src/db.rs @@ -1,14 +1,27 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::time::Duration; - use crate::errors::IndexerError; use clap::Args; +use diesel::migration::{Migration, MigrationSource, MigrationVersion}; +use diesel::pg::Pg; +use diesel::prelude::*; use diesel::query_dsl::RunQueryDsl; use diesel::r2d2::ConnectionManager; use diesel::r2d2::{Pool, PooledConnection}; use diesel::PgConnection; +use diesel_migrations::{embed_migrations, EmbeddedMigrations}; +use std::time::Duration; +use tracing::info; + +table! { + __diesel_schema_migrations (version) { + version -> VarChar, + run_on -> Timestamp, + } +} + +const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/pg"); pub type ConnectionPool = Pool>; pub type PoolConnection = PooledConnection>; @@ -117,18 +130,65 @@ pub fn get_pool_connection(pool: &ConnectionPool) -> Result Result<(), IndexerError> { + info!("Starting compatibility check"); + let migrations: Vec>> = MIGRATIONS.migrations().map_err(|err| { + IndexerError::DbMigrationError(format!( + "Failed to fetch local migrations from schema: {err}" + )) + })?; + let local_migrations: Vec<_> = migrations.iter().map(|m| m.name().version()).collect(); + check_db_migration_consistency_impl(conn, local_migrations)?; + info!("Compatibility check passed"); + Ok(()) +} + +fn check_db_migration_consistency_impl( + conn: &mut PoolConnection, + local_migrations: Vec, +) -> Result<(), IndexerError> { + // Unfortunately we cannot call applied_migrations() directly on the connection, + // since it implicitly creates the __diesel_schema_migrations table if it doesn't exist, + // which is a write operation that we don't want to do in this function. + let applied_migrations: Vec = __diesel_schema_migrations::table + .select(__diesel_schema_migrations::version) + .order(__diesel_schema_migrations::version.asc()) + .load(conn)?; + + // We check that the local migrations is a prefix of the applied migrations. + if local_migrations.len() > applied_migrations.len() { + return Err(IndexerError::DbMigrationError(format!( + "The number of local migrations is greater than the number of applied migrations. Local migrations: {:?}, Applied migrations: {:?}", + local_migrations, applied_migrations + ))); + } + for (local_migration, applied_migration) in local_migrations.iter().zip(&applied_migrations) { + if local_migration != applied_migration { + return Err(IndexerError::DbMigrationError(format!( + "The next applied migration `{:?}` diverges from the local migration `{:?}`", + applied_migration, local_migration + ))); + } + } + Ok(()) +} + +pub use setup_postgres::{reset_database, run_migrations}; pub mod setup_postgres { - use crate::database::Connection; + use crate::db::{PoolConnection, MIGRATIONS}; use anyhow::anyhow; - use diesel_async::RunQueryDsl; - use diesel_migrations::{embed_migrations, EmbeddedMigrations}; + use diesel::migration::MigrationConnection; + use diesel::RunQueryDsl; + use diesel_migrations::MigrationHarness; use tracing::info; - const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/pg"); - - pub async fn reset_database(mut conn: Connection<'static>) -> Result<(), anyhow::Error> { + pub async fn reset_database(conn: &mut PoolConnection) -> Result<(), anyhow::Error> { info!("Resetting PG database ..."); let drop_all_tables = " @@ -140,9 +200,7 @@ pub mod setup_postgres { EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE'; END LOOP; END $$;"; - diesel::sql_query(drop_all_tables) - .execute(&mut conn) - .await?; + diesel::sql_query(drop_all_tables).execute(conn)?; info!("Dropped all tables."); let drop_all_procedures = " @@ -156,9 +214,7 @@ pub mod setup_postgres { EXECUTE 'DROP PROCEDURE IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE'; END LOOP; END $$;"; - diesel::sql_query(drop_all_procedures) - .execute(&mut conn) - .await?; + diesel::sql_query(drop_all_procedures).execute(conn)?; info!("Dropped all procedures."); let drop_all_functions = " @@ -172,26 +228,90 @@ pub mod setup_postgres { EXECUTE 'DROP FUNCTION IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE'; END LOOP; END $$;"; - diesel::sql_query(drop_all_functions) - .execute(&mut conn) - .await?; + diesel::sql_query(drop_all_functions).execute(conn)?; info!("Dropped all functions."); - diesel::sql_query( - " - CREATE TABLE IF NOT EXISTS __diesel_schema_migrations ( - version VARCHAR(50) PRIMARY KEY, - run_on TIMESTAMP NOT NULL DEFAULT NOW() - )", - ) - .execute(&mut conn) - .await?; + conn.setup()?; info!("Created __diesel_schema_migrations table."); - conn.run_migrations(MIGRATIONS) - .await - .map_err(|e| anyhow!("Failed to run migrations {e}"))?; + run_migrations(conn).await?; info!("Reset database complete."); Ok(()) } + + pub async fn run_migrations(conn: &mut PoolConnection) -> Result<(), anyhow::Error> { + conn.run_pending_migrations(MIGRATIONS) + .map_err(|e| anyhow!("Failed to run migrations {e}"))?; + Ok(()) + } +} + +#[cfg(feature = "pg_integration")] +#[cfg(test)] +mod tests { + use crate::db::{ + check_db_migration_consistency, check_db_migration_consistency_impl, get_pool_connection, + new_connection_pool, reset_database, ConnectionPoolConfig, MIGRATIONS, + }; + use crate::tempdb::TempDb; + use diesel::migration::{Migration, MigrationSource}; + use diesel::pg::Pg; + use diesel_migrations::MigrationHarness; + + // Check that the migration records in the database created from the local schema + // pass the consistency check. + #[tokio::test] + async fn db_migration_consistency_smoke_test() { + let database = TempDb::new().unwrap(); + let blocking_pool = new_connection_pool( + database.database().url().as_str(), + &ConnectionPoolConfig::default(), + ) + .unwrap(); + let mut conn = get_pool_connection(&blocking_pool).unwrap(); + reset_database(&mut conn).await.unwrap(); + check_db_migration_consistency(&mut conn).unwrap(); + } + + #[tokio::test] + async fn db_migration_consistency_non_prefix_test() { + let database = TempDb::new().unwrap(); + let blocking_pool = new_connection_pool( + database.database().url().as_str(), + &ConnectionPoolConfig::default(), + ) + .unwrap(); + let mut conn = get_pool_connection(&blocking_pool).unwrap(); + + reset_database(&mut conn).await.unwrap(); + + conn.revert_migration(MIGRATIONS.migrations().unwrap().last().unwrap()) + .unwrap(); + // Local migrations is one record more than the applied migrations. + // This will fail the consistency check since it's not a prefix. + assert!(check_db_migration_consistency(&mut conn).is_err()); + + conn.run_pending_migrations(MIGRATIONS).unwrap(); + // After running pending migrations they should be consistent. + check_db_migration_consistency(&mut conn).unwrap(); + } + + #[tokio::test] + async fn db_migration_consistency_prefix_test() { + let database = TempDb::new().unwrap(); + let blocking_pool = new_connection_pool( + database.database().url().as_str(), + &ConnectionPoolConfig::default(), + ) + .unwrap(); + let mut conn = get_pool_connection(&blocking_pool).unwrap(); + reset_database(&mut conn).await.unwrap(); + + let migrations: Vec>> = MIGRATIONS.migrations().unwrap(); + let mut local_migrations: Vec<_> = migrations.iter().map(|m| m.name().version()).collect(); + local_migrations.pop(); + // Local migrations is one record less than the applied migrations. + // This should pass the consistency check since it's still a prefix. + check_db_migration_consistency_impl(&mut conn, local_migrations).unwrap(); + } } diff --git a/crates/sui-indexer/src/errors.rs b/crates/sui-indexer/src/errors.rs index 8a1303dbafae3..64f53ea53d696 100644 --- a/crates/sui-indexer/src/errors.rs +++ b/crates/sui-indexer/src/errors.rs @@ -129,6 +129,9 @@ pub enum IndexerError { #[error(transparent)] NameServiceError(#[from] NameServiceError), + + #[error("Inconsistent migration records: {0}")] + DbMigrationError(String), } pub trait Context { diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index f642b808ff985..69556db4f23ae 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -3,8 +3,11 @@ use clap::Parser; use sui_indexer::config::Command; -use sui_indexer::database::{Connection, ConnectionPool}; -use sui_indexer::db::{new_connection_pool, reset_database}; +use sui_indexer::database::ConnectionPool; +use sui_indexer::db::{ + check_db_migration_consistency, get_pool_connection, new_connection_pool, reset_database, + run_migrations, +}; use sui_indexer::indexer::Indexer; use sui_indexer::store::PgIndexerStore; use tokio_util::sync::CancellationToken; @@ -43,7 +46,11 @@ async fn main() -> anyhow::Result<()> { snapshot_config, pruning_options, } => { + // Make sure to run all migrations on startup, and also serve as a compatibility check. + run_migrations(&mut get_pool_connection(&connection_pool)?).await?; + let store = PgIndexerStore::new(connection_pool, pool, indexer_metrics.clone()); + Indexer::start_writer_with_config( &ingestion_config, store, @@ -55,6 +62,8 @@ async fn main() -> anyhow::Result<()> { .await?; } Command::JsonRpcService(json_rpc_config) => { + check_db_migration_consistency(&mut get_pool_connection(&connection_pool)?)?; + Indexer::start_reader(&json_rpc_config, ®istry, connection_pool, pool).await?; } Command::ResetDatabase { force } => { @@ -64,8 +73,10 @@ async fn main() -> anyhow::Result<()> { )); } - let connection = Connection::dedicated(&opts.database_url).await?; - reset_database(connection).await?; + reset_database(&mut get_pool_connection(&connection_pool)?).await?; + } + Command::RunMigrations => { + run_migrations(&mut get_pool_connection(&connection_pool)?).await?; } } diff --git a/crates/sui-indexer/src/schema/pg.rs b/crates/sui-indexer/src/schema/pg.rs index c1720c94b37a1..e8f975e581848 100644 --- a/crates/sui-indexer/src/schema/pg.rs +++ b/crates/sui-indexer/src/schema/pg.rs @@ -397,43 +397,38 @@ diesel::table! { } } -#[macro_export] -macro_rules! for_all_tables { - ($action:path) => { - $action!( - chain_identifier, - checkpoints, - display, - epochs, - event_emit_module, - event_emit_package, - event_senders, - event_struct_instantiation, - event_struct_module, - event_struct_name, - event_struct_package, - events, - feature_flags, - objects_history, - objects_snapshot, - objects_version, - packages, - protocol_configs, - pruner_cp_watermark, - transactions, - tx_calls_fun, - tx_calls_mod, - tx_calls_pkg, - tx_changed_objects, - tx_digests, - tx_input_objects, - tx_kinds, - tx_recipients, - tx_senders - ); - }; -} - -pub use for_all_tables; - -for_all_tables!(diesel::allow_tables_to_appear_in_same_query); +diesel::allow_tables_to_appear_in_same_query!( + chain_identifier, + checkpoints, + display, + epochs, + event_emit_module, + event_emit_package, + event_senders, + event_struct_instantiation, + event_struct_module, + event_struct_name, + event_struct_package, + events, + events_partition_0, + feature_flags, + objects, + objects_history, + objects_history_partition_0, + objects_snapshot, + objects_version, + packages, + protocol_configs, + pruner_cp_watermark, + transactions, + transactions_partition_0, + tx_calls_fun, + tx_calls_mod, + tx_calls_pkg, + tx_changed_objects, + tx_digests, + tx_input_objects, + tx_kinds, + tx_recipients, + tx_senders, +); diff --git a/crates/sui-indexer/src/tempdb.rs b/crates/sui-indexer/src/tempdb.rs index c7c025cef1b82..d63f34a02a3de 100644 --- a/crates/sui-indexer/src/tempdb.rs +++ b/crates/sui-indexer/src/tempdb.rs @@ -280,7 +280,10 @@ fn initdb(dir: &Path) -> Result<()> { if output.status.success() { Ok(()) } else { - Err(anyhow!("unable to initialize database")) + Err(anyhow!( + "unable to initialize database: {:?}", + String::from_utf8(output.stderr) + )) } } diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 8e9b8800038ff..c7c90f26c3cef 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -12,9 +12,8 @@ use sui_json_rpc_types::SuiTransactionBlockResponse; use crate::config::IngestionConfig; use crate::config::PruningOptions; use crate::config::SnapshotLagConfig; -use crate::database::Connection; use crate::database::ConnectionPool; -use crate::db::{new_connection_pool, ConnectionPoolConfig}; +use crate::db::{get_pool_connection, new_connection_pool, ConnectionPoolConfig}; use crate::errors::IndexerError; use crate::indexer::Indexer; use crate::store::PgIndexerStore; @@ -121,10 +120,9 @@ pub async fn start_test_indexer_impl( snapshot_config, pruning_options, } => { - let connection = Connection::dedicated(&db_url.parse().unwrap()) + crate::db::reset_database(&mut get_pool_connection(&blocking_pool).unwrap()) .await .unwrap(); - crate::db::reset_database(connection).await.unwrap(); let store_clone = store.clone(); let mut ingestion_config = IngestionConfig::default();