Skip to content

Commit

Permalink
indexer fix: reset db via reverting migrations (MystenLabs#18993)
Browse files Browse the repository at this point in the history
## Description 

the issue was that, prev indexer db reset was done via dropping all
tables, which is problematic when we change a PG PROCEDURE parameter,
see this slack message.
https://mysten-labs.slack.com/archives/C03TCGDF45N/p1723507055114959

this caused issues on CI after merging
MystenLabs#18899 and it got reverted, this
pr changes it to reverting all migrations and cleans up the table
dropping codes

## Test plan 

locally
- reset DB before MystenLabs#18899 
- cherry-pick this pr
- cherry-pick MystenLabs#18899

run cmd below, which was the cmd on CI that ran into issue 
```
DB_POOL_SIZE=10 cargo run --bin sui-indexer -- --db-url "postgres://postgres:postgres@localhost/gegao" --reset-db
```


---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp committed Aug 16, 2024
1 parent 6a84e4d commit bafce36
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 96 deletions.
2 changes: 1 addition & 1 deletion crates/sui-graphql-rpc/src/data/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod tests {
)
.unwrap();
let mut conn = get_pool_connection(&pool).unwrap();
reset_database(&mut conn, /* drop_all */ true).unwrap();
reset_database(&mut conn).unwrap();

let objects: Vec<StoredObject> = BuiltInFramework::iter_system_packages()
.map(|pkg| IndexedObject::from_object(1, pkg.genesis_object(), None).into())
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
DROP PROCEDURE IF EXISTS advance_partition;
DROP PROCEDURE IF EXISTS drop_partition;
113 changes: 19 additions & 94 deletions crates/sui-indexer/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ pub fn get_pool_connection<T: R2D2Connection + Send + 'static>(

pub fn reset_database<T: R2D2Connection + Send + 'static>(
conn: &mut PoolConnection<T>,
drop_all: bool,
) -> Result<(), anyhow::Error> {
#[cfg(feature = "postgres-feature")]
{
Expand All @@ -169,7 +168,7 @@ pub fn reset_database<T: R2D2Connection + Send + 'static>(
.map_or_else(
|| Err(anyhow!("Failed to downcast connection to PgConnection")),
|pg_conn| {
setup_postgres::reset_database(pg_conn, drop_all)?;
setup_postgres::reset_database(pg_conn)?;
Ok(())
},
)?;
Expand All @@ -182,7 +181,7 @@ pub fn reset_database<T: R2D2Connection + Send + 'static>(
.map_or_else(
|| Err(anyhow!("Failed to downcast connection to PgConnection")),
|mysql_conn| {
setup_mysql::reset_database(mysql_conn, drop_all)?;
setup_mysql::reset_database(mysql_conn)?;
Ok(())
},
)?;
Expand All @@ -200,60 +199,24 @@ pub mod setup_postgres {
use crate::IndexerConfig;
use anyhow::anyhow;
use diesel::migration::MigrationSource;
use diesel::{PgConnection, RunQueryDsl};
use diesel::PgConnection;
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use prometheus::Registry;
use secrecy::ExposeSecret;
use tracing::{error, info};

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/pg");

pub fn reset_database(
conn: &mut PoolConnection<PgConnection>,
drop_all: bool,
) -> Result<(), anyhow::Error> {
pub fn reset_database(conn: &mut PoolConnection<PgConnection>) -> Result<(), anyhow::Error> {
info!("Resetting database ...");
if drop_all {
drop_all_tables(conn)
.map_err(|e| anyhow!("Encountering error when dropping all tables {e}"))?;
} else {
conn.revert_all_migrations(MIGRATIONS)
.map_err(|e| anyhow!("Error reverting all migrations {e}"))?;
}
conn.revert_all_migrations(MIGRATIONS)
.map_err(|e| anyhow!("Error reverting all migrations {e}"))?;
conn.run_migrations(&MIGRATIONS.migrations().unwrap())
.map_err(|e| anyhow!("Failed to run migrations {e}"))?;
info!("Reset database complete.");
Ok(())
}

fn drop_all_tables(conn: &mut PgConnection) -> Result<(), diesel::result::Error> {
info!("Dropping all tables in the database");
let table_names: Vec<String> = diesel::dsl::sql::<diesel::sql_types::Text>(
"
SELECT tablename FROM pg_tables WHERE schemaname = 'public'
",
)
.load(conn)?;

for table_name in table_names {
let drop_table_query = format!("DROP TABLE IF EXISTS {} CASCADE", table_name);
diesel::sql_query(drop_table_query).execute(conn)?;
}

// Recreate the __diesel_schema_migrations table
diesel::sql_query(
"
CREATE TABLE __diesel_schema_migrations (
version VARCHAR(50) PRIMARY KEY,
run_on TIMESTAMP NOT NULL DEFAULT NOW()
)
",
)
.execute(conn)?;
info!("Dropped all tables in the database");
Ok(())
}

pub async fn setup(
indexer_config: IndexerConfig,
registry: Registry,
Expand Down Expand Up @@ -281,7 +244,7 @@ pub mod setup_postgres {
);
e
})?;
reset_database(&mut conn, /* drop_all */ true).map_err(|e| {
reset_database(&mut conn).map_err(|e| {
let db_err_msg = format!(
"Failed resetting database with url: {:?} and error: {:?}",
db_url, e
Expand Down Expand Up @@ -343,60 +306,24 @@ pub mod setup_mysql {
use crate::IndexerConfig;
use anyhow::anyhow;
use diesel::migration::MigrationSource;
use diesel::{MysqlConnection, RunQueryDsl};
use diesel::MysqlConnection;
use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness};
use prometheus::Registry;
use secrecy::ExposeSecret;
use tracing::{error, info};

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/mysql");

pub fn reset_database(
conn: &mut PoolConnection<MysqlConnection>,
drop_all: bool,
) -> Result<(), anyhow::Error> {
pub fn reset_database(conn: &mut PoolConnection<MysqlConnection>) -> Result<(), anyhow::Error> {
info!("Resetting database ...");
if drop_all {
crate::db::setup_mysql::drop_all_tables(conn)
.map_err(|e| anyhow!("Encountering error when dropping all tables {e}"))?;
} else {
conn.revert_all_migrations(MIGRATIONS)
.map_err(|e| anyhow!("Error reverting all migrations {e}"))?;
}
conn.revert_all_migrations(MIGRATIONS)
.map_err(|e| anyhow!("Error reverting all migrations {e}"))?;
conn.run_migrations(&MIGRATIONS.migrations().unwrap())
.map_err(|e| anyhow!("Failed to run migrations {e}"))?;
info!("Reset database complete.");
Ok(())
}

fn drop_all_tables(conn: &mut MysqlConnection) -> Result<(), diesel::result::Error> {
info!("Dropping all tables in the database");
let table_names: Vec<String> = diesel::dsl::sql::<diesel::sql_types::Text>(
"
SELECT TABLE_NAME FROM information_schema.tables WHERE table_schema = DATABASE()
",
)
.load(conn)?;

for table_name in table_names {
let drop_table_query = format!("DROP TABLE IF EXISTS {}", table_name);
diesel::sql_query(drop_table_query).execute(conn)?;
}

// Recreate the __diesel_schema_migrations table
diesel::sql_query(
"
CREATE TABLE __diesel_schema_migrations (
version VARCHAR(50) PRIMARY KEY,
run_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP()
)
",
)
.execute(conn)?;
info!("Dropped all tables in the database");
Ok(())
}

pub async fn setup(
indexer_config: IndexerConfig,
registry: Registry,
Expand All @@ -421,16 +348,14 @@ pub mod setup_mysql {
);
e
})?;
crate::db::setup_mysql::reset_database(&mut conn, /* drop_all */ true).map_err(
|e| {
let db_err_msg = format!(
"Failed resetting database with url: {:?} and error: {:?}",
db_url, e
);
error!("{}", db_err_msg);
IndexerError::PostgresResetError(db_err_msg)
},
)?;
crate::db::setup_mysql::reset_database(&mut conn).map_err(|e| {
let db_err_msg = format!(
"Failed resetting database with url: {:?} and error: {:?}",
db_url, e
);
error!("{}", db_err_msg);
IndexerError::PostgresResetError(db_err_msg)
})?;
info!("Reset MySQL database complete.");
}
let indexer_metrics = IndexerMetrics::new(&registry);
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub async fn start_test_indexer_impl<T: R2D2Connection + 'static>(
}
ReaderWriterConfig::Writer { snapshot_config } => {
if config.reset_db {
crate::db::reset_database(&mut blocking_pool.get().unwrap(), true).unwrap();
crate::db::reset_database(&mut blocking_pool.get().unwrap()).unwrap();
}
let store_clone = store.clone();

Expand Down

0 comments on commit bafce36

Please sign in to comment.