Skip to content

Commit

Permalink
[indexer] Compatibility check using migration records (#19156)
Browse files Browse the repository at this point in the history
## Description 

Previously we check DB compatibility by making sure that we could make
select query on all columns to the DB based on the locally known schema.
This doesn't cover all cases, for instance, there could be tables in the
DB that does not exist in the local schema.
This PR changes how we do the compatibility check by fully leveraging
the migration records. It checks that the migration records in the DB
must fully match with the locally known one.
It also moves the code to sui-indexer crate, so that we could do this
check on the startup of both indexer and graphql server.
This does require that from now on we fully respect the migration
scripts, and don't do adhoc modifications on the existing migration.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind committed Sep 5, 2024
1 parent 89737f7 commit 2b3991a
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 162 deletions.
9 changes: 1 addition & 8 deletions crates/sui-graphql-rpc/src/data/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ mod tests {
use diesel::QueryDsl;
use sui_framework::BuiltInFramework;
use sui_indexer::{
database::Connection,
db::{get_pool_connection, new_connection_pool, reset_database, ConnectionPoolConfig},
models::objects::StoredObject,
schema::objects,
Expand All @@ -215,13 +214,7 @@ mod tests {
pool_config.set_pool_size(5);
let pool = new_connection_pool(database.database().url().as_str(), &pool_config).unwrap();
let mut conn = get_pool_connection(&pool).unwrap();
reset_database(
Connection::dedicated(database.database().url())
.await
.unwrap(),
)
.await
.unwrap();
reset_database(&mut conn).await.unwrap();

let objects: Vec<StoredObject> = BuiltInFramework::iter_system_packages()
.map(|pkg| IndexedObject::from_object(1, pkg.genesis_object(), None).into())
Expand Down
14 changes: 6 additions & 8 deletions crates/sui-graphql-rpc/src/server/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use super::compatibility_check::check_all_tables;
use super::exchange_rates_task::TriggerExchangeRatesTask;
use super::system_package_task::SystemPackageTask;
use super::watermark_task::{ChainIdentifierLock, Watermark, WatermarkLock, WatermarkTask};
Expand Down Expand Up @@ -58,6 +57,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{any::Any, net::SocketAddr, time::Instant};
use sui_graphql_rpc_headers::LIMITS_HEADER;
use sui_indexer::db::{check_db_migration_consistency, get_pool_connection};
use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
use sui_sdk::SuiClientBuilder;
use tokio::join;
Expand Down Expand Up @@ -88,10 +88,8 @@ impl Server {
pub async fn run(mut self) -> Result<(), Error> {
get_or_init_server_start_time().await;

// Compatibility check
info!("Starting compatibility check");
check_all_tables(&self.db_reader).await?;
info!("Compatibility check passed");
let mut connection = get_pool_connection(&self.db_reader.inner.get_pool())?;
check_db_migration_consistency(&mut connection)?;

// A handle that spawns a background task to periodically update the `Watermark`, which
// consists of the checkpoint upper bound and current epoch.
Expand Down Expand Up @@ -545,7 +543,7 @@ async fn graphql_handler(

let result = schema.execute(req).await;

// If there are errors, insert them as an extention so that the Metrics callback handler can
// If there are errors, insert them as an extension so that the Metrics callback handler can
// pull it out later.
let mut extensions = axum::http::Extensions::new();
if result.is_err() {
Expand Down Expand Up @@ -776,7 +774,7 @@ pub mod tests {
cluster
.wait_for_checkpoint_catchup(1, Duration::from_secs(10))
.await;
// timeout test includes mutation timeout, which requies a [SuiClient] to be able to run
// timeout test includes mutation timeout, which requires a [SuiClient] to be able to run
// the test, and a transaction. [WalletContext] gives access to everything that's needed.
let wallet = &cluster.network.validator_fullnode_handle.wallet;
let db_url = cluster.network.graphql_connection_config.db_url.clone();
Expand Down Expand Up @@ -1685,7 +1683,7 @@ pub mod tests {
// Test that when variables are re-used as execution params, the size of the variable is
// only counted once.

// First, check that `eror_passed_tx_checks` is working, by submitting a request that will
// First, check that `error_passed_tx_checks` is working, by submitting a request that will
// fail the initial payload check.
assert!(!passed_tx_checks(
&execute_for_error(
Expand Down
66 changes: 0 additions & 66 deletions crates/sui-graphql-rpc/src/server/compatibility_check.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/sui-graphql-rpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
pub mod graphiql_server;

pub mod builder;
pub(crate) mod compatibility_check;
pub(crate) mod exchange_rates_task;
pub(crate) mod system_package_task;
pub mod version;
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub enum Command {
#[clap(long)]
force: bool,
},
/// Run through the migration scripts.
RunMigrations,
}

#[derive(Args, Default, Debug, Clone)]
Expand Down
180 changes: 150 additions & 30 deletions crates/sui-indexer/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use crate::errors::IndexerError;
use clap::Args;
use diesel::migration::{Migration, MigrationSource, MigrationVersion};
use diesel::pg::Pg;
use diesel::prelude::*;
use diesel::query_dsl::RunQueryDsl;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::{Pool, PooledConnection};
use diesel::PgConnection;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use std::time::Duration;
use tracing::info;

table! {
__diesel_schema_migrations (version) {
version -> VarChar,
run_on -> Timestamp,
}
}

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/pg");

pub type ConnectionPool = Pool<ConnectionManager<PgConnection>>;
pub type PoolConnection = PooledConnection<ConnectionManager<PgConnection>>;
Expand Down Expand Up @@ -117,18 +130,65 @@ pub fn get_pool_connection(pool: &ConnectionPool) -> Result<PoolConnection, Inde
})
}

pub use setup_postgres::reset_database;
/// Checks that the local migration scripts is a prefix of the records in the database.
/// This allows us run migration scripts against a DB at anytime, without worrying about
/// existing readers fail over.
/// We do however need to make sure that whenever we are deploying a new version of either reader or writer,
/// we must first run migration scripts to ensure that there is not more local scripts than in the DB record.
pub fn check_db_migration_consistency(conn: &mut PoolConnection) -> Result<(), IndexerError> {
info!("Starting compatibility check");
let migrations: Vec<Box<dyn Migration<Pg>>> = MIGRATIONS.migrations().map_err(|err| {
IndexerError::DbMigrationError(format!(
"Failed to fetch local migrations from schema: {err}"
))
})?;
let local_migrations: Vec<_> = migrations.iter().map(|m| m.name().version()).collect();
check_db_migration_consistency_impl(conn, local_migrations)?;
info!("Compatibility check passed");
Ok(())
}

fn check_db_migration_consistency_impl(
conn: &mut PoolConnection,
local_migrations: Vec<MigrationVersion>,
) -> Result<(), IndexerError> {
// Unfortunately we cannot call applied_migrations() directly on the connection,
// since it implicitly creates the __diesel_schema_migrations table if it doesn't exist,
// which is a write operation that we don't want to do in this function.
let applied_migrations: Vec<MigrationVersion> = __diesel_schema_migrations::table
.select(__diesel_schema_migrations::version)
.order(__diesel_schema_migrations::version.asc())
.load(conn)?;

// We check that the local migrations is a prefix of the applied migrations.
if local_migrations.len() > applied_migrations.len() {
return Err(IndexerError::DbMigrationError(format!(
"The number of local migrations is greater than the number of applied migrations. Local migrations: {:?}, Applied migrations: {:?}",
local_migrations, applied_migrations
)));
}
for (local_migration, applied_migration) in local_migrations.iter().zip(&applied_migrations) {
if local_migration != applied_migration {
return Err(IndexerError::DbMigrationError(format!(
"The next applied migration `{:?}` diverges from the local migration `{:?}`",
applied_migration, local_migration
)));
}
}
Ok(())
}

pub use setup_postgres::{reset_database, run_migrations};

pub mod setup_postgres {
use crate::database::Connection;
use crate::db::{PoolConnection, MIGRATIONS};
use anyhow::anyhow;
use diesel_async::RunQueryDsl;
use diesel_migrations::{embed_migrations, EmbeddedMigrations};
use diesel::migration::MigrationConnection;
use diesel::RunQueryDsl;
use diesel_migrations::MigrationHarness;
use tracing::info;

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/pg");

pub async fn reset_database(mut conn: Connection<'static>) -> Result<(), anyhow::Error> {
pub async fn reset_database(conn: &mut PoolConnection) -> Result<(), anyhow::Error> {
info!("Resetting PG database ...");

let drop_all_tables = "
Expand All @@ -140,9 +200,7 @@ pub mod setup_postgres {
EXECUTE 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE';
END LOOP;
END $$;";
diesel::sql_query(drop_all_tables)
.execute(&mut conn)
.await?;
diesel::sql_query(drop_all_tables).execute(conn)?;
info!("Dropped all tables.");

let drop_all_procedures = "
Expand All @@ -156,9 +214,7 @@ pub mod setup_postgres {
EXECUTE 'DROP PROCEDURE IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE';
END LOOP;
END $$;";
diesel::sql_query(drop_all_procedures)
.execute(&mut conn)
.await?;
diesel::sql_query(drop_all_procedures).execute(conn)?;
info!("Dropped all procedures.");

let drop_all_functions = "
Expand All @@ -172,26 +228,90 @@ pub mod setup_postgres {
EXECUTE 'DROP FUNCTION IF EXISTS ' || quote_ident(r.proname) || '(' || r.argtypes || ') CASCADE';
END LOOP;
END $$;";
diesel::sql_query(drop_all_functions)
.execute(&mut conn)
.await?;
diesel::sql_query(drop_all_functions).execute(conn)?;
info!("Dropped all functions.");

diesel::sql_query(
"
CREATE TABLE IF NOT EXISTS __diesel_schema_migrations (
version VARCHAR(50) PRIMARY KEY,
run_on TIMESTAMP NOT NULL DEFAULT NOW()
)",
)
.execute(&mut conn)
.await?;
conn.setup()?;
info!("Created __diesel_schema_migrations table.");

conn.run_migrations(MIGRATIONS)
.await
.map_err(|e| anyhow!("Failed to run migrations {e}"))?;
run_migrations(conn).await?;
info!("Reset database complete.");
Ok(())
}

pub async fn run_migrations(conn: &mut PoolConnection) -> Result<(), anyhow::Error> {
conn.run_pending_migrations(MIGRATIONS)
.map_err(|e| anyhow!("Failed to run migrations {e}"))?;
Ok(())
}
}

#[cfg(feature = "pg_integration")]
#[cfg(test)]
mod tests {
use crate::db::{
check_db_migration_consistency, check_db_migration_consistency_impl, get_pool_connection,
new_connection_pool, reset_database, ConnectionPoolConfig, MIGRATIONS,
};
use crate::tempdb::TempDb;
use diesel::migration::{Migration, MigrationSource};
use diesel::pg::Pg;
use diesel_migrations::MigrationHarness;

// Check that the migration records in the database created from the local schema
// pass the consistency check.
#[tokio::test]
async fn db_migration_consistency_smoke_test() {
let database = TempDb::new().unwrap();
let blocking_pool = new_connection_pool(
database.database().url().as_str(),
&ConnectionPoolConfig::default(),
)
.unwrap();
let mut conn = get_pool_connection(&blocking_pool).unwrap();
reset_database(&mut conn).await.unwrap();
check_db_migration_consistency(&mut conn).unwrap();
}

#[tokio::test]
async fn db_migration_consistency_non_prefix_test() {
let database = TempDb::new().unwrap();
let blocking_pool = new_connection_pool(
database.database().url().as_str(),
&ConnectionPoolConfig::default(),
)
.unwrap();
let mut conn = get_pool_connection(&blocking_pool).unwrap();

reset_database(&mut conn).await.unwrap();

conn.revert_migration(MIGRATIONS.migrations().unwrap().last().unwrap())
.unwrap();
// Local migrations is one record more than the applied migrations.
// This will fail the consistency check since it's not a prefix.
assert!(check_db_migration_consistency(&mut conn).is_err());

conn.run_pending_migrations(MIGRATIONS).unwrap();
// After running pending migrations they should be consistent.
check_db_migration_consistency(&mut conn).unwrap();
}

#[tokio::test]
async fn db_migration_consistency_prefix_test() {
let database = TempDb::new().unwrap();
let blocking_pool = new_connection_pool(
database.database().url().as_str(),
&ConnectionPoolConfig::default(),
)
.unwrap();
let mut conn = get_pool_connection(&blocking_pool).unwrap();
reset_database(&mut conn).await.unwrap();

let migrations: Vec<Box<dyn Migration<Pg>>> = MIGRATIONS.migrations().unwrap();
let mut local_migrations: Vec<_> = migrations.iter().map(|m| m.name().version()).collect();
local_migrations.pop();
// Local migrations is one record less than the applied migrations.
// This should pass the consistency check since it's still a prefix.
check_db_migration_consistency_impl(&mut conn, local_migrations).unwrap();
}
}
Loading

0 comments on commit 2b3991a

Please sign in to comment.