From 2cfa1d618f6a8afde967ac9d892cdcf23421c595 Mon Sep 17 00:00:00 2001 From: Nisheeth Barthwal Date: Fri, 23 Jun 2023 12:57:49 +0200 Subject: [PATCH] add sql indexer behavior tests, remove restriction for normal sync (#1077) * add tests, remove restriction for normal sync * refactor test name --- client/mapping-sync/src/sql/mod.rs | 688 ++++++++++++++++++++++++++++- 1 file changed, 675 insertions(+), 13 deletions(-) diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index 213d7a0f61..343229f9a6 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -86,7 +86,6 @@ where tokio::task::spawn(async move { while let Some(cmd) = rx.recv().await { log::debug!(target: "frontier-sql", "💬 Recv Worker Command {cmd:?}"); - println!("💬 Recv Worker Command {cmd:?}"); match cmd { WorkerCommand::ResumeSync => { // Attempt to resume from last indexed block. If there is no data in the db, sync genesis. @@ -181,17 +180,12 @@ where indexer_backend: Arc>, import_notifications: sc_client_api::ImportNotifications, worker_config: SyncWorkerConfig, - sync_strategy: SyncStrategy, + _sync_strategy: SyncStrategy, sync_oracle: Arc, pubsub_notification_sinks: Arc< EthereumBlockNotificationSinks>, >, ) { - // work in progress for `SyncStrategy::Normal` to also index non-best blocks. - if sync_strategy == SyncStrategy::Normal { - panic!("'SyncStrategy::Normal' is not supported") - } - let tx = Self::spawn_worker( client.clone(), substrate_backend.clone(), @@ -202,7 +196,6 @@ where // Resume sync from the last indexed block until we reach an already indexed parent tx.send(WorkerCommand::ResumeSync).await.ok(); - // check missing blocks every interval let tx2 = tx.clone(); tokio::task::spawn(async move { @@ -478,7 +471,11 @@ async fn index_genesis_block( mod test { use super::*; - use std::{collections::BTreeMap, path::Path, sync::Arc}; + use std::{ + collections::BTreeMap, + path::Path, + sync::{Arc, Mutex}, + }; use futures::executor; use scale_codec::Encode; @@ -1170,7 +1167,7 @@ mod test { .hash(sp_runtime::traits::Zero::zero()) .unwrap() .expect("genesis hash"); - let mut block_hashes: Vec = vec![]; + let mut best_block_hashes: Vec = vec![]; for _block_number in 1..=5 { let builder = client .new_block_at(parent_hash, ethereum_digest(), false) @@ -1178,12 +1175,12 @@ mod test { let block = builder.build().unwrap().block; let block_hash = block.header.hash(); executor::block_on(client.import(BlockOrigin::Own, block)).unwrap(); - block_hashes.insert(0, block_hash.clone()); + best_block_hashes.insert(0, block_hash.clone()); parent_hash = block_hash; } // Mark the block as canon and indexed - let block_resume_at = block_hashes[0]; + let block_resume_at = best_block_hashes[0]; sqlx::query("INSERT INTO blocks(substrate_block_hash, ethereum_block_hash, ethereum_storage_schema, block_number, is_canon) VALUES (?, ?, ?, 5, 1)") .bind(block_resume_at.as_bytes()) .bind(H256::zero().as_bytes()) @@ -1235,7 +1232,672 @@ mod test { .iter() .map(|row| H256::from_slice(&row.get::, _>(0)[..])) .collect::>(); - let expected_imported_blocks = block_hashes.clone(); + let expected_imported_blocks = best_block_hashes.clone(); + assert_eq!(expected_imported_blocks, actual_imported_blocks); + } + + struct TestSyncOracle { + sync_status: Arc>, + } + impl sp_consensus::SyncOracle for TestSyncOracle { + fn is_major_syncing(&self) -> bool { + *self.sync_status.lock().expect("failed getting lock") + } + fn is_offline(&self) -> bool { + false + } + } + + struct TestSyncOracleWrapper { + oracle: Arc, + sync_status: Arc>, + } + impl TestSyncOracleWrapper { + fn new() -> Self { + let sync_status = Arc::new(Mutex::new(false)); + TestSyncOracleWrapper { + oracle: Arc::new(TestSyncOracle { + sync_status: sync_status.clone(), + }), + sync_status, + } + } + fn set_sync_status(&mut self, value: bool) { + *self.sync_status.lock().expect("failed getting lock") = value; + } + } + + #[tokio::test] + async fn sync_strategy_normal_indexes_best_blocks_if_not_major_sync() { + let tmp = tempdir().expect("create a temporary directory"); + let builder = TestClientBuilder::new().add_extra_storage( + PALLET_ETHEREUM_SCHEMA.to_vec(), + Encode::encode(&EthereumStorageSchema::V3), + ); + let backend = builder.backend(); + let (client, _) = + builder.build_with_native_executor::(None); + let mut client = Arc::new(client); + let mut overrides_map = BTreeMap::new(); + overrides_map.insert( + EthereumStorageSchema::V3, + Box::new(SchemaV3Override::new(client.clone())) as Box>, + ); + let overrides = Arc::new(OverrideHandle { + schemas: overrides_map, + fallback: Box::new(SchemaV3Override::new(client.clone())), + }); + let indexer_backend = fc_db::sql::Backend::new( + fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig { + path: Path::new("sqlite:///") + .join(tmp.path()) + .join("test.db3") + .to_str() + .unwrap(), + create_if_missing: true, + cache_size: 204800, + thread_count: 4, + }), + 100, + None, + overrides.clone(), + ) + .await + .expect("indexer pool to be created"); + + // Pool + let pool = indexer_backend.pool().clone(); + + // Spawn indexer task + let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks< + crate::EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + let mut sync_oracle_wrapper = TestSyncOracleWrapper::new(); + let sync_oracle = sync_oracle_wrapper.oracle.clone(); + let client_inner = client.clone(); + tokio::task::spawn(async move { + crate::sql::SyncWorker::run( + client_inner.clone(), + backend.clone(), + Arc::new(indexer_backend), + client_inner.import_notification_stream(), + SyncWorkerConfig { + read_notification_timeout: Duration::from_secs(10), + check_indexed_blocks_interval: Duration::from_secs(60), + }, + SyncStrategy::Normal, + Arc::new(sync_oracle), + pubsub_notification_sinks.clone(), + ) + .await + }); + // Enough time for startup + futures_timer::Delay::new(std::time::Duration::from_millis(200)).await; + + // Import 3 blocks as part of normal operation, storing them oldest first. + sync_oracle_wrapper.set_sync_status(false); + let mut parent_hash = client + .hash(sp_runtime::traits::Zero::zero()) + .unwrap() + .expect("genesis hash"); + let mut best_block_hashes: Vec = vec![]; + for _block_number in 1..=3 { + let builder = client + .new_block_at(parent_hash, ethereum_digest(), false) + .unwrap(); + let block = builder.build().unwrap().block; + let block_hash = block.header.hash(); + + executor::block_on(client.import(BlockOrigin::Own, block)).unwrap(); + best_block_hashes.push(block_hash.clone()); + parent_hash = block_hash; + } + + // Enough time for indexing + futures_timer::Delay::new(std::time::Duration::from_millis(3000)).await; + + // Test the chain is correctly indexed. + let actual_imported_blocks = + sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks") + .fetch_all(&pool) + .await + .expect("test query result") + .iter() + .map(|row| H256::from_slice(&row.get::, _>(0)[..])) + .collect::>(); + let expected_imported_blocks = best_block_hashes.clone(); + assert_eq!(expected_imported_blocks, actual_imported_blocks); + } + + #[tokio::test] + async fn sync_strategy_normal_ignores_non_best_block_if_not_major_sync() { + let tmp = tempdir().expect("create a temporary directory"); + let builder = TestClientBuilder::new().add_extra_storage( + PALLET_ETHEREUM_SCHEMA.to_vec(), + Encode::encode(&EthereumStorageSchema::V3), + ); + let backend = builder.backend(); + let (client, _) = + builder.build_with_native_executor::(None); + let mut client = Arc::new(client); + let mut overrides_map = BTreeMap::new(); + overrides_map.insert( + EthereumStorageSchema::V3, + Box::new(SchemaV3Override::new(client.clone())) as Box>, + ); + let overrides = Arc::new(OverrideHandle { + schemas: overrides_map, + fallback: Box::new(SchemaV3Override::new(client.clone())), + }); + let indexer_backend = fc_db::sql::Backend::new( + fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig { + path: Path::new("sqlite:///") + .join(tmp.path()) + .join("test.db3") + .to_str() + .unwrap(), + create_if_missing: true, + cache_size: 204800, + thread_count: 4, + }), + 100, + None, + overrides.clone(), + ) + .await + .expect("indexer pool to be created"); + + // Pool + let pool = indexer_backend.pool().clone(); + + // Spawn indexer task + let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks< + crate::EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + let mut sync_oracle_wrapper = TestSyncOracleWrapper::new(); + let sync_oracle = sync_oracle_wrapper.oracle.clone(); + let client_inner = client.clone(); + tokio::task::spawn(async move { + crate::sql::SyncWorker::run( + client_inner.clone(), + backend.clone(), + Arc::new(indexer_backend), + client_inner.import_notification_stream(), + SyncWorkerConfig { + read_notification_timeout: Duration::from_secs(10), + check_indexed_blocks_interval: Duration::from_secs(60), + }, + SyncStrategy::Normal, + Arc::new(sync_oracle), + pubsub_notification_sinks.clone(), + ) + .await + }); + // Enough time for startup + futures_timer::Delay::new(std::time::Duration::from_millis(200)).await; + + // Import 3 blocks as part of normal operation, storing them oldest first. + sync_oracle_wrapper.set_sync_status(false); + let mut parent_hash = client + .hash(sp_runtime::traits::Zero::zero()) + .unwrap() + .expect("genesis hash"); + let mut best_block_hashes: Vec = vec![]; + for _block_number in 1..=3 { + let builder = client + .new_block_at(parent_hash, ethereum_digest(), false) + .unwrap(); + let block = builder.build().unwrap().block; + let block_hash = block.header.hash(); + + executor::block_on(client.import(BlockOrigin::Own, block)).unwrap(); + best_block_hashes.push(block_hash.clone()); + parent_hash = block_hash; + } + + // create non-best block + let builder = client + .new_block_at(best_block_hashes[0], ethereum_digest(), false) + .unwrap(); + let block = builder.build().unwrap().block; + + executor::block_on(client.import(BlockOrigin::Own, block)).unwrap(); + + // Enough time for indexing + futures_timer::Delay::new(std::time::Duration::from_millis(3000)).await; + + // Test the chain is correctly indexed. + let actual_imported_blocks = + sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks") + .fetch_all(&pool) + .await + .expect("test query result") + .iter() + .map(|row| H256::from_slice(&row.get::, _>(0)[..])) + .collect::>(); + let expected_imported_blocks = best_block_hashes.clone(); + assert_eq!(expected_imported_blocks, actual_imported_blocks); + } + + #[tokio::test] + async fn sync_strategy_parachain_indexes_best_blocks_if_not_major_sync() { + let tmp = tempdir().expect("create a temporary directory"); + let builder = TestClientBuilder::new().add_extra_storage( + PALLET_ETHEREUM_SCHEMA.to_vec(), + Encode::encode(&EthereumStorageSchema::V3), + ); + let backend = builder.backend(); + let (client, _) = + builder.build_with_native_executor::(None); + let mut client = Arc::new(client); + let mut overrides_map = BTreeMap::new(); + overrides_map.insert( + EthereumStorageSchema::V3, + Box::new(SchemaV3Override::new(client.clone())) as Box>, + ); + let overrides = Arc::new(OverrideHandle { + schemas: overrides_map, + fallback: Box::new(SchemaV3Override::new(client.clone())), + }); + let indexer_backend = fc_db::sql::Backend::new( + fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig { + path: Path::new("sqlite:///") + .join(tmp.path()) + .join("test.db3") + .to_str() + .unwrap(), + create_if_missing: true, + cache_size: 204800, + thread_count: 4, + }), + 100, + None, + overrides.clone(), + ) + .await + .expect("indexer pool to be created"); + + // Pool + let pool = indexer_backend.pool().clone(); + + // Spawn indexer task + let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks< + crate::EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + let mut sync_oracle_wrapper = TestSyncOracleWrapper::new(); + let sync_oracle = sync_oracle_wrapper.oracle.clone(); + let client_inner = client.clone(); + tokio::task::spawn(async move { + crate::sql::SyncWorker::run( + client_inner.clone(), + backend.clone(), + Arc::new(indexer_backend), + client_inner.import_notification_stream(), + SyncWorkerConfig { + read_notification_timeout: Duration::from_secs(10), + check_indexed_blocks_interval: Duration::from_secs(60), + }, + SyncStrategy::Parachain, + Arc::new(sync_oracle), + pubsub_notification_sinks.clone(), + ) + .await + }); + // Enough time for startup + futures_timer::Delay::new(std::time::Duration::from_millis(200)).await; + + // Import 3 blocks as part of normal operation, storing them oldest first. + sync_oracle_wrapper.set_sync_status(false); + let mut parent_hash = client + .hash(sp_runtime::traits::Zero::zero()) + .unwrap() + .expect("genesis hash"); + let mut best_block_hashes: Vec = vec![]; + for _block_number in 1..=3 { + let builder = client + .new_block_at(parent_hash, ethereum_digest(), false) + .unwrap(); + let block = builder.build().unwrap().block; + let block_hash = block.header.hash(); + + executor::block_on(client.import(BlockOrigin::Own, block)).unwrap(); + best_block_hashes.push(block_hash.clone()); + parent_hash = block_hash; + } + + // Enough time for indexing + futures_timer::Delay::new(std::time::Duration::from_millis(3000)).await; + + // Test the chain is correctly indexed. + let actual_imported_blocks = + sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks") + .fetch_all(&pool) + .await + .expect("test query result") + .iter() + .map(|row| H256::from_slice(&row.get::, _>(0)[..])) + .collect::>(); + let expected_imported_blocks = best_block_hashes.clone(); + assert_eq!(expected_imported_blocks, actual_imported_blocks); + } + + #[tokio::test] + async fn sync_strategy_parachain_ignores_non_best_blocks_if_not_major_sync() { + let tmp = tempdir().expect("create a temporary directory"); + let builder = TestClientBuilder::new().add_extra_storage( + PALLET_ETHEREUM_SCHEMA.to_vec(), + Encode::encode(&EthereumStorageSchema::V3), + ); + let backend = builder.backend(); + let (client, _) = + builder.build_with_native_executor::(None); + let mut client = Arc::new(client); + let mut overrides_map = BTreeMap::new(); + overrides_map.insert( + EthereumStorageSchema::V3, + Box::new(SchemaV3Override::new(client.clone())) as Box>, + ); + let overrides = Arc::new(OverrideHandle { + schemas: overrides_map, + fallback: Box::new(SchemaV3Override::new(client.clone())), + }); + let indexer_backend = fc_db::sql::Backend::new( + fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig { + path: Path::new("sqlite:///") + .join(tmp.path()) + .join("test.db3") + .to_str() + .unwrap(), + create_if_missing: true, + cache_size: 204800, + thread_count: 4, + }), + 100, + None, + overrides.clone(), + ) + .await + .expect("indexer pool to be created"); + + // Pool + let pool = indexer_backend.pool().clone(); + + // Spawn indexer task + let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks< + crate::EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + let mut sync_oracle_wrapper = TestSyncOracleWrapper::new(); + let sync_oracle = sync_oracle_wrapper.oracle.clone(); + let client_inner = client.clone(); + tokio::task::spawn(async move { + crate::sql::SyncWorker::run( + client_inner.clone(), + backend.clone(), + Arc::new(indexer_backend), + client_inner.import_notification_stream(), + SyncWorkerConfig { + read_notification_timeout: Duration::from_secs(10), + check_indexed_blocks_interval: Duration::from_secs(60), + }, + SyncStrategy::Parachain, + Arc::new(sync_oracle), + pubsub_notification_sinks.clone(), + ) + .await + }); + // Enough time for startup + futures_timer::Delay::new(std::time::Duration::from_millis(200)).await; + + // Import 3 blocks as part of normal operation, storing them oldest first. + sync_oracle_wrapper.set_sync_status(false); + let mut parent_hash = client + .hash(sp_runtime::traits::Zero::zero()) + .unwrap() + .expect("genesis hash"); + let mut best_block_hashes: Vec = vec![]; + for _block_number in 1..=3 { + let builder = client + .new_block_at(parent_hash, ethereum_digest(), false) + .unwrap(); + let block = builder.build().unwrap().block; + let block_hash = block.header.hash(); + + executor::block_on(client.import(BlockOrigin::Own, block)).unwrap(); + best_block_hashes.push(block_hash.clone()); + parent_hash = block_hash; + } + + // create non-best block + let builder = client + .new_block_at(best_block_hashes[0], ethereum_digest(), false) + .unwrap(); + let block = builder.build().unwrap().block; + + executor::block_on(client.import(BlockOrigin::Own, block)).unwrap(); + + // Enough time for indexing + futures_timer::Delay::new(std::time::Duration::from_millis(3000)).await; + + // Test the chain is correctly indexed. + let actual_imported_blocks = + sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks") + .fetch_all(&pool) + .await + .expect("test query result") + .iter() + .map(|row| H256::from_slice(&row.get::, _>(0)[..])) + .collect::>(); + let expected_imported_blocks = best_block_hashes.clone(); + assert_eq!(expected_imported_blocks, actual_imported_blocks); + } + + #[tokio::test] + async fn sync_strategy_normal_ignores_best_blocks_if_major_sync() { + let tmp = tempdir().expect("create a temporary directory"); + let builder = TestClientBuilder::new().add_extra_storage( + PALLET_ETHEREUM_SCHEMA.to_vec(), + Encode::encode(&EthereumStorageSchema::V3), + ); + let backend = builder.backend(); + let (client, _) = + builder.build_with_native_executor::(None); + let mut client = Arc::new(client); + let mut overrides_map = BTreeMap::new(); + overrides_map.insert( + EthereumStorageSchema::V3, + Box::new(SchemaV3Override::new(client.clone())) as Box>, + ); + let overrides = Arc::new(OverrideHandle { + schemas: overrides_map, + fallback: Box::new(SchemaV3Override::new(client.clone())), + }); + let indexer_backend = fc_db::sql::Backend::new( + fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig { + path: Path::new("sqlite:///") + .join(tmp.path()) + .join("test.db3") + .to_str() + .unwrap(), + create_if_missing: true, + cache_size: 204800, + thread_count: 4, + }), + 100, + None, + overrides.clone(), + ) + .await + .expect("indexer pool to be created"); + + // Pool + let pool = indexer_backend.pool().clone(); + + // Spawn indexer task + let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks< + crate::EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + let mut sync_oracle_wrapper = TestSyncOracleWrapper::new(); + let sync_oracle = sync_oracle_wrapper.oracle.clone(); + let client_inner = client.clone(); + tokio::task::spawn(async move { + crate::sql::SyncWorker::run( + client_inner.clone(), + backend.clone(), + Arc::new(indexer_backend), + client_inner.import_notification_stream(), + SyncWorkerConfig { + read_notification_timeout: Duration::from_secs(10), + check_indexed_blocks_interval: Duration::from_secs(60), + }, + SyncStrategy::Normal, + Arc::new(sync_oracle), + pubsub_notification_sinks.clone(), + ) + .await + }); + // Enough time for startup + futures_timer::Delay::new(std::time::Duration::from_millis(200)).await; + + // Import 3 blocks as part of initial network sync, storing them oldest first. + sync_oracle_wrapper.set_sync_status(true); + let mut parent_hash = client + .hash(sp_runtime::traits::Zero::zero()) + .unwrap() + .expect("genesis hash"); + let mut best_block_hashes: Vec = vec![]; + for _block_number in 1..=3 { + let builder = client + .new_block_at(parent_hash, ethereum_digest(), false) + .unwrap(); + let block = builder.build().unwrap().block; + let block_hash = block.header.hash(); + + executor::block_on(client.import(BlockOrigin::NetworkInitialSync, block)).unwrap(); + best_block_hashes.push(block_hash.clone()); + parent_hash = block_hash; + } + + // Enough time for indexing + futures_timer::Delay::new(std::time::Duration::from_millis(3000)).await; + + // Test the chain is correctly indexed. + let actual_imported_blocks = + sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks") + .fetch_all(&pool) + .await + .expect("test query result") + .iter() + .map(|row| H256::from_slice(&row.get::, _>(0)[..])) + .collect::>(); + let expected_imported_blocks = Vec::::new(); + assert_eq!(expected_imported_blocks, actual_imported_blocks); + } + + #[tokio::test] + async fn sync_strategy_parachain_ignores_best_blocks_if_major_sync() { + let tmp = tempdir().expect("create a temporary directory"); + let builder = TestClientBuilder::new().add_extra_storage( + PALLET_ETHEREUM_SCHEMA.to_vec(), + Encode::encode(&EthereumStorageSchema::V3), + ); + let backend = builder.backend(); + let (client, _) = + builder.build_with_native_executor::(None); + let mut client = Arc::new(client); + let mut overrides_map = BTreeMap::new(); + overrides_map.insert( + EthereumStorageSchema::V3, + Box::new(SchemaV3Override::new(client.clone())) as Box>, + ); + let overrides = Arc::new(OverrideHandle { + schemas: overrides_map, + fallback: Box::new(SchemaV3Override::new(client.clone())), + }); + let indexer_backend = fc_db::sql::Backend::new( + fc_db::sql::BackendConfig::Sqlite(fc_db::sql::SqliteBackendConfig { + path: Path::new("sqlite:///") + .join(tmp.path()) + .join("test.db3") + .to_str() + .unwrap(), + create_if_missing: true, + cache_size: 204800, + thread_count: 4, + }), + 100, + None, + overrides.clone(), + ) + .await + .expect("indexer pool to be created"); + + // Pool + let pool = indexer_backend.pool().clone(); + + // Spawn indexer task + let pubsub_notification_sinks: crate::EthereumBlockNotificationSinks< + crate::EthereumBlockNotification, + > = Default::default(); + let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks); + let mut sync_oracle_wrapper = TestSyncOracleWrapper::new(); + let sync_oracle = sync_oracle_wrapper.oracle.clone(); + let client_inner = client.clone(); + tokio::task::spawn(async move { + crate::sql::SyncWorker::run( + client_inner.clone(), + backend.clone(), + Arc::new(indexer_backend), + client_inner.import_notification_stream(), + SyncWorkerConfig { + read_notification_timeout: Duration::from_secs(10), + check_indexed_blocks_interval: Duration::from_secs(60), + }, + SyncStrategy::Parachain, + Arc::new(sync_oracle), + pubsub_notification_sinks.clone(), + ) + .await + }); + // Enough time for startup + futures_timer::Delay::new(std::time::Duration::from_millis(200)).await; + + // Import 3 blocks as part of initial network sync, storing them oldest first. + sync_oracle_wrapper.set_sync_status(true); + let mut parent_hash = client + .hash(sp_runtime::traits::Zero::zero()) + .unwrap() + .expect("genesis hash"); + let mut best_block_hashes: Vec = vec![]; + for _block_number in 1..=3 { + let builder = client + .new_block_at(parent_hash, ethereum_digest(), false) + .unwrap(); + let block = builder.build().unwrap().block; + let block_hash = block.header.hash(); + + executor::block_on(client.import(BlockOrigin::NetworkInitialSync, block)).unwrap(); + best_block_hashes.push(block_hash.clone()); + parent_hash = block_hash; + } + + // Enough time for indexing + futures_timer::Delay::new(std::time::Duration::from_millis(3000)).await; + + // Test the chain is correctly indexed. + let actual_imported_blocks = + sqlx::query("SELECT substrate_block_hash, is_canon, block_number FROM blocks") + .fetch_all(&pool) + .await + .expect("test query result") + .iter() + .map(|row| H256::from_slice(&row.get::, _>(0)[..])) + .collect::>(); + let expected_imported_blocks = Vec::::new(); assert_eq!(expected_imported_blocks, actual_imported_blocks); } }