From bdd79f20120e85e4a95b21f8ce3d416e2d83b439 Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Fri, 12 May 2023 17:46:14 +0530 Subject: [PATCH 1/2] store,graph,node : Add methods to update non_fatal_errors field in db, and wired them in runner --- core/src/subgraph/runner.rs | 1 + graph/src/components/store/traits.rs | 1 + graph/src/components/store/write.rs | 3 +++ store/postgres/src/deployment.rs | 26 +++++++++++++++++++- store/postgres/src/deployment_store.rs | 12 ++++++++- store/postgres/src/writable.rs | 2 ++ store/test-store/src/store.rs | 3 +++ store/test-store/tests/graph/entity_cache.rs | 1 + store/test-store/tests/postgres/store.rs | 2 ++ 9 files changed, 49 insertions(+), 2 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 627ed8219cf..50e449db1c4 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -478,6 +478,7 @@ where persisted_data_sources, deterministic_errors, processed_data_sources, + is_non_fatal_errors_active, ) .await .context("Failed to transact block operations")?; diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 01875b9bfa2..f82e8765e4e 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -297,6 +297,7 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker { data_sources: Vec, deterministic_errors: Vec, offchain_to_remove: Vec, + is_non_fatal_errors_active: bool, ) -> Result<(), StoreError>; /// The deployment `id` finished syncing, mark it as synced in the database diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index df3864079f8..8ba12ca8d46 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -605,6 +605,7 @@ pub struct Batch { pub deterministic_errors: Vec, pub offchain_to_remove: DataSources, pub error: Option, + pub is_non_fatal_errors_active: bool, } impl Batch { @@ -615,6 +616,7 @@ impl Batch { data_sources: Vec, deterministic_errors: Vec, offchain_to_remove: Vec, + is_non_fatal_errors_active: bool, ) -> Result { let block = block_ptr.number; @@ -647,6 +649,7 @@ impl Batch { deterministic_errors, offchain_to_remove, error: None, + is_non_fatal_errors_active, }) } diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index 50204a37dbb..523060f3395 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -775,7 +775,29 @@ pub fn fail( ) -> Result<(), StoreError> { let error_id = insert_subgraph_error(conn, error)?; - update_deployment_status(conn, id, SubgraphHealth::Failed, Some(error_id))?; + update_deployment_status(conn, id, SubgraphHealth::Failed, Some(error_id), None)?; + + Ok(()) +} + +pub fn update_non_fatal_errors( + conn: &PgConnection, + deployment_id: &DeploymentHash, + health: SubgraphHealth, + non_fatal_errors: Option<&[SubgraphError]>, +) -> Result<(), StoreError> { + let error_ids = non_fatal_errors.map(|errors| { + errors + .iter() + .map(|error| { + hex::encode(stable_hash_legacy::utils::stable_hash::( + error, + )) + }) + .collect::>() + }); + + update_deployment_status(conn, deployment_id, health, None, error_ids)?; Ok(()) } @@ -802,6 +824,7 @@ pub fn update_deployment_status( deployment_id: &DeploymentHash, health: SubgraphHealth, fatal_error: Option, + non_fatal_errors: Option>, ) -> Result<(), StoreError> { use subgraph_deployment as d; @@ -810,6 +833,7 @@ pub fn update_deployment_status( d::failed.eq(health.is_failed()), d::health.eq(health), d::fatal_error.eq::>(fatal_error), + d::non_fatal_errors.eq::>(non_fatal_errors.unwrap_or(vec![])), )) .execute(conn) .map(|_| ()) diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 39b7442600b..7eb66502cd8 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1116,6 +1116,15 @@ impl DeploymentStore { &batch.deterministic_errors, batch.block_ptr.number, )?; + + if batch.is_non_fatal_errors_active { + deployment::update_non_fatal_errors( + &conn, + &site.deployment, + deployment::SubgraphHealth::Unhealthy, + Some(&batch.deterministic_errors), + )?; + } } let earliest_block = deployment::transact_block( @@ -1631,7 +1640,7 @@ impl DeploymentStore { let _ = self.revert_block_operations(site.clone(), parent_ptr.clone(), &FirehoseCursor::None)?; // Unfail the deployment. - deployment::update_deployment_status(conn, deployment_id, prev_health, None)?; + deployment::update_deployment_status(conn, deployment_id, prev_health, None,None)?; Ok(UnfailOutcome::Unfailed) } @@ -1714,6 +1723,7 @@ impl DeploymentStore { deployment_id, deployment::SubgraphHealth::Healthy, None, + None, )?; // Delete the fatal error. diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index b29173206db..845c4d38f5d 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -1495,6 +1495,7 @@ impl WritableStoreTrait for WritableStore { data_sources: Vec, deterministic_errors: Vec, processed_data_sources: Vec, + is_non_fatal_errors_active: bool, ) -> Result<(), StoreError> { let batch = Batch::new( block_ptr_to.clone(), @@ -1503,6 +1504,7 @@ impl WritableStoreTrait for WritableStore { data_sources, deterministic_errors, processed_data_sources, + is_non_fatal_errors_active, )?; self.writer.write(batch, stopwatch).await?; diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index 1a1737f9eb1..a9c9cce30c5 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -232,6 +232,7 @@ pub async fn transact_errors( Vec::new(), errs, Vec::new(), + false, ) .await?; flush(deployment).await @@ -287,6 +288,7 @@ pub async fn transact_entities_and_dynamic_data_sources( deployment.id, Arc::new(manifest_idx_and_name), ))?; + let mut entity_cache = EntityCache::new(Arc::new(store.clone())); entity_cache.append(ops); let mods = entity_cache @@ -309,6 +311,7 @@ pub async fn transact_entities_and_dynamic_data_sources( data_sources, Vec::new(), Vec::new(), + false, ) .await } diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 12a05e1caba..ac667c6a396 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -130,6 +130,7 @@ impl WritableStore for MockStore { _: Vec, _: Vec, _: Vec, + _: bool, ) -> Result<(), StoreError> { unimplemented!() } diff --git a/store/test-store/tests/postgres/store.rs b/store/test-store/tests/postgres/store.rs index 68b44c7958f..93bfd67e656 100644 --- a/store/test-store/tests/postgres/store.rs +++ b/store/test-store/tests/postgres/store.rs @@ -1530,6 +1530,7 @@ fn handle_large_string_with_index() { Vec::new(), Vec::new(), Vec::new(), + false, ) .await .expect("Failed to insert large text"); @@ -1623,6 +1624,7 @@ fn handle_large_bytea_with_index() { Vec::new(), Vec::new(), Vec::new(), + false, ) .await .expect("Failed to insert large text"); From 92451cc1e6af9e30835a6ac31752c57184a3661f Mon Sep 17 00:00:00 2001 From: Krishnanand V P Date: Fri, 26 May 2023 20:14:43 +0530 Subject: [PATCH 2/2] store/test-store : Add tests for nonFatalErrors --- store/test-store/src/store.rs | 5 +- .../tests/chain/ethereum/manifest.rs | 2 + store/test-store/tests/graphql/query.rs | 4 +- store/test-store/tests/postgres/subgraph.rs | 66 +++++++++++++++++-- 4 files changed, 70 insertions(+), 7 deletions(-) diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index a9c9cce30c5..527e59bc44b 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -207,11 +207,14 @@ pub fn remove_subgraph(id: &DeploymentHash) { } /// Transact errors for this block and wait until changes have been written +/// Takes store, deployment, block ptr to, errors, and a bool indicating whether +/// nonFatalErrors are active pub async fn transact_errors( store: &Arc, deployment: &DeploymentLocator, block_ptr_to: BlockPtr, errs: Vec, + is_non_fatal_errors_active: bool, ) -> Result<(), StoreError> { let metrics_registry = Arc::new(MetricsRegistry::mock()); let stopwatch_metrics = StopwatchMetrics::new( @@ -232,7 +235,7 @@ pub async fn transact_errors( Vec::new(), errs, Vec::new(), - false, + is_non_fatal_errors_active, ) .await?; flush(deployment).await diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index 274750133b7..a4548ffeecc 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -225,6 +225,7 @@ specVersion: 0.0.2 &deployment, test_store::BLOCKS[1].clone(), vec![error], + false, ) .await .unwrap(); @@ -336,6 +337,7 @@ specVersion: 0.0.2 &deployment, test_store::BLOCKS[1].clone(), vec![error], + false, ) .await .unwrap(); diff --git a/store/test-store/tests/graphql/query.rs b/store/test-store/tests/graphql/query.rs index 05cd63d6336..b9b08d6528a 100644 --- a/store/test-store/tests/graphql/query.rs +++ b/store/test-store/tests/graphql/query.rs @@ -2439,7 +2439,7 @@ fn non_fatal_errors() { deterministic: true, }; - transact_errors(&STORE, &deployment, BLOCK_TWO.block_ptr(), vec![err]) + transact_errors(&STORE, &deployment, BLOCK_TWO.block_ptr(), vec![err], true) .await .unwrap(); @@ -2545,7 +2545,7 @@ fn deterministic_error() { deterministic: true, }; - transact_errors(&STORE, &deployment, BLOCK_TWO.block_ptr(), vec![err]) + transact_errors(&STORE, &deployment, BLOCK_TWO.block_ptr(), vec![err], false) .await .unwrap(); diff --git a/store/test-store/tests/postgres/subgraph.rs b/store/test-store/tests/postgres/subgraph.rs index 30b5e032d21..69bc3babddd 100644 --- a/store/test-store/tests/postgres/subgraph.rs +++ b/store/test-store/tests/postgres/subgraph.rs @@ -506,7 +506,7 @@ fn subgraph_error() { assert!(count() == 0); - transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error]) + transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error], false) .await .unwrap(); assert!(count() == 1); @@ -520,7 +520,7 @@ fn subgraph_error() { }; // Inserting the same error is allowed but ignored. - transact_errors(&store, &deployment, BLOCKS[2].clone(), vec![error]) + transact_errors(&store, &deployment, BLOCKS[2].clone(), vec![error], false) .await .unwrap(); assert!(count() == 1); @@ -533,7 +533,7 @@ fn subgraph_error() { deterministic: false, }; - transact_errors(&store, &deployment, BLOCKS[3].clone(), vec![error2]) + transact_errors(&store, &deployment, BLOCKS[3].clone(), vec![error2], false) .await .unwrap(); assert!(count() == 2); @@ -542,6 +542,64 @@ fn subgraph_error() { }) } +#[test] +fn subgraph_non_fatal_error() { + test_store::run_test_sequentially(|store| async move { + let subgraph_store = store.subgraph_store(); + let subgraph_id = DeploymentHash::new("subgraph_non_fatal_error").unwrap(); + let deployment = + test_store::create_test_subgraph(&subgraph_id, "type Foo { id: ID! }").await; + + let count = || -> usize { + let store = store.subgraph_store(); + let count = store.error_count(&subgraph_id).unwrap(); + println!("count: {}", count); + count + }; + + let error = SubgraphError { + subgraph_id: subgraph_id.clone(), + message: "test".to_string(), + block_ptr: Some(BLOCKS[1].clone()), + handler: None, + deterministic: true, + }; + + assert!(count() == 0); + + transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error], true) + .await + .unwrap(); + assert!(count() == 1); + + let info = subgraph_store.status_for_id(deployment.id); + + assert!(info.non_fatal_errors.len() == 1); + assert!(info.health == SubgraphHealth::Unhealthy); + + let error2 = SubgraphError { + subgraph_id: subgraph_id.clone(), + message: "test2".to_string(), + block_ptr: None, + handler: None, + deterministic: false, + }; + + // Inserting non deterministic errors will increase error count but not count of non fatal errors + transact_errors(&store, &deployment, BLOCKS[2].clone(), vec![error2], false) + .await + .unwrap(); + assert!(count() == 2); + + let info = subgraph_store.status_for_id(deployment.id); + + assert!(info.non_fatal_errors.len() == 1); + assert!(info.health == SubgraphHealth::Unhealthy); + + test_store::remove_subgraph(&subgraph_id); + }) +} + #[test] fn fatal_vs_non_fatal() { async fn setup() -> DeploymentLocator { @@ -592,7 +650,7 @@ fn fatal_vs_non_fatal() { .await .unwrap()); - transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error()]) + transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error()], false) .await .unwrap();