Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update nonFatalErrors in subgraphs.subgraph_deployment table #4615

Merged
merged 2 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ where
persisted_data_sources,
deterministic_errors,
processed_data_sources,
is_non_fatal_errors_active,
)
.await
.context("Failed to transact block operations")?;
Expand Down
1 change: 1 addition & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker {
data_sources: Vec<StoredDynamicDataSource>,
deterministic_errors: Vec<SubgraphError>,
offchain_to_remove: Vec<StoredDynamicDataSource>,
is_non_fatal_errors_active: bool,
) -> Result<(), StoreError>;

/// The deployment `id` finished syncing, mark it as synced in the database
Expand Down
3 changes: 3 additions & 0 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ pub struct Batch {
pub deterministic_errors: Vec<SubgraphError>,
pub offchain_to_remove: DataSources,
pub error: Option<StoreError>,
pub is_non_fatal_errors_active: bool,
}

impl Batch {
Expand All @@ -615,6 +616,7 @@ impl Batch {
data_sources: Vec<StoredDynamicDataSource>,
deterministic_errors: Vec<SubgraphError>,
offchain_to_remove: Vec<StoredDynamicDataSource>,
is_non_fatal_errors_active: bool,
) -> Result<Self, StoreError> {
let block = block_ptr.number;

Expand Down Expand Up @@ -647,6 +649,7 @@ impl Batch {
deterministic_errors,
offchain_to_remove,
error: None,
is_non_fatal_errors_active,
})
}

Expand Down
26 changes: 25 additions & 1 deletion store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,29 @@ pub fn fail(
) -> Result<(), StoreError> {
let error_id = insert_subgraph_error(conn, error)?;

update_deployment_status(conn, id, SubgraphHealth::Failed, Some(error_id))?;
update_deployment_status(conn, id, SubgraphHealth::Failed, Some(error_id), None)?;

Ok(())
}

pub fn update_non_fatal_errors(
conn: &PgConnection,
deployment_id: &DeploymentHash,
health: SubgraphHealth,
non_fatal_errors: Option<&[SubgraphError]>,
) -> Result<(), StoreError> {
let error_ids = non_fatal_errors.map(|errors| {
errors
.iter()
.map(|error| {
hex::encode(stable_hash_legacy::utils::stable_hash::<SetHasher, _>(
error,
))
})
.collect::<Vec<_>>()
});

update_deployment_status(conn, deployment_id, health, None, error_ids)?;

Ok(())
}
Expand All @@ -802,6 +824,7 @@ pub fn update_deployment_status(
deployment_id: &DeploymentHash,
health: SubgraphHealth,
fatal_error: Option<String>,
non_fatal_errors: Option<Vec<String>>,
) -> Result<(), StoreError> {
use subgraph_deployment as d;

Expand All @@ -810,6 +833,7 @@ pub fn update_deployment_status(
d::failed.eq(health.is_failed()),
d::health.eq(health),
d::fatal_error.eq::<Option<String>>(fatal_error),
d::non_fatal_errors.eq::<Vec<String>>(non_fatal_errors.unwrap_or(vec![])),
))
.execute(conn)
.map(|_| ())
Expand Down
12 changes: 11 additions & 1 deletion store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,15 @@ impl DeploymentStore {
&batch.deterministic_errors,
batch.block_ptr.number,
)?;

if batch.is_non_fatal_errors_active {
deployment::update_non_fatal_errors(
&conn,
&site.deployment,
deployment::SubgraphHealth::Unhealthy,
Some(&batch.deterministic_errors),
)?;
}
}

let earliest_block = deployment::transact_block(
Expand Down Expand Up @@ -1631,7 +1640,7 @@ impl DeploymentStore {
let _ = self.revert_block_operations(site.clone(), parent_ptr.clone(), &FirehoseCursor::None)?;

// Unfail the deployment.
deployment::update_deployment_status(conn, deployment_id, prev_health, None)?;
deployment::update_deployment_status(conn, deployment_id, prev_health, None,None)?;

Ok(UnfailOutcome::Unfailed)
}
Expand Down Expand Up @@ -1714,6 +1723,7 @@ impl DeploymentStore {
deployment_id,
deployment::SubgraphHealth::Healthy,
None,
None,
)?;

// Delete the fatal error.
Expand Down
2 changes: 2 additions & 0 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,7 @@ impl WritableStoreTrait for WritableStore {
data_sources: Vec<StoredDynamicDataSource>,
deterministic_errors: Vec<SubgraphError>,
processed_data_sources: Vec<StoredDynamicDataSource>,
is_non_fatal_errors_active: bool,
) -> Result<(), StoreError> {
let batch = Batch::new(
block_ptr_to.clone(),
Expand All @@ -1503,6 +1504,7 @@ impl WritableStoreTrait for WritableStore {
data_sources,
deterministic_errors,
processed_data_sources,
is_non_fatal_errors_active,
)?;
self.writer.write(batch, stopwatch).await?;

Expand Down
6 changes: 6 additions & 0 deletions store/test-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,14 @@ pub fn remove_subgraph(id: &DeploymentHash) {
}

/// Transact errors for this block and wait until changes have been written
/// Takes store, deployment, block ptr to, errors, and a bool indicating whether
/// nonFatalErrors are active
pub async fn transact_errors(
store: &Arc<Store>,
deployment: &DeploymentLocator,
block_ptr_to: BlockPtr,
errs: Vec<SubgraphError>,
is_non_fatal_errors_active: bool,
) -> Result<(), StoreError> {
let metrics_registry = Arc::new(MetricsRegistry::mock());
let stopwatch_metrics = StopwatchMetrics::new(
Expand All @@ -232,6 +235,7 @@ pub async fn transact_errors(
Vec::new(),
errs,
Vec::new(),
is_non_fatal_errors_active,
)
.await?;
flush(deployment).await
Expand Down Expand Up @@ -287,6 +291,7 @@ pub async fn transact_entities_and_dynamic_data_sources(
deployment.id,
Arc::new(manifest_idx_and_name),
))?;

let mut entity_cache = EntityCache::new(Arc::new(store.clone()));
entity_cache.append(ops);
let mods = entity_cache
Expand All @@ -309,6 +314,7 @@ pub async fn transact_entities_and_dynamic_data_sources(
data_sources,
Vec::new(),
Vec::new(),
false,
)
.await
}
Expand Down
2 changes: 2 additions & 0 deletions store/test-store/tests/chain/ethereum/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ specVersion: 0.0.2
&deployment,
test_store::BLOCKS[1].clone(),
vec![error],
false,
)
.await
.unwrap();
Expand Down Expand Up @@ -336,6 +337,7 @@ specVersion: 0.0.2
&deployment,
test_store::BLOCKS[1].clone(),
vec![error],
false,
)
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl WritableStore for MockStore {
_: Vec<StoredDynamicDataSource>,
_: Vec<SubgraphError>,
_: Vec<StoredDynamicDataSource>,
_: bool,
) -> Result<(), StoreError> {
unimplemented!()
}
Expand Down
4 changes: 2 additions & 2 deletions store/test-store/tests/graphql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2439,7 +2439,7 @@ fn non_fatal_errors() {
deterministic: true,
};

transact_errors(&STORE, &deployment, BLOCK_TWO.block_ptr(), vec![err])
transact_errors(&STORE, &deployment, BLOCK_TWO.block_ptr(), vec![err], true)
.await
.unwrap();

Expand Down Expand Up @@ -2545,7 +2545,7 @@ fn deterministic_error() {
deterministic: true,
};

transact_errors(&STORE, &deployment, BLOCK_TWO.block_ptr(), vec![err])
transact_errors(&STORE, &deployment, BLOCK_TWO.block_ptr(), vec![err], false)
.await
.unwrap();

Expand Down
2 changes: 2 additions & 0 deletions store/test-store/tests/postgres/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,7 @@ fn handle_large_string_with_index() {
Vec::new(),
Vec::new(),
Vec::new(),
false,
)
.await
.expect("Failed to insert large text");
Expand Down Expand Up @@ -1623,6 +1624,7 @@ fn handle_large_bytea_with_index() {
Vec::new(),
Vec::new(),
Vec::new(),
false,
)
.await
.expect("Failed to insert large text");
Expand Down
66 changes: 62 additions & 4 deletions store/test-store/tests/postgres/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ fn subgraph_error() {

assert!(count() == 0);

transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error])
transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error], false)
.await
.unwrap();
assert!(count() == 1);
Expand All @@ -520,7 +520,7 @@ fn subgraph_error() {
};

// Inserting the same error is allowed but ignored.
transact_errors(&store, &deployment, BLOCKS[2].clone(), vec![error])
transact_errors(&store, &deployment, BLOCKS[2].clone(), vec![error], false)
.await
.unwrap();
assert!(count() == 1);
Expand All @@ -533,7 +533,7 @@ fn subgraph_error() {
deterministic: false,
};

transact_errors(&store, &deployment, BLOCKS[3].clone(), vec![error2])
transact_errors(&store, &deployment, BLOCKS[3].clone(), vec![error2], false)
.await
.unwrap();
assert!(count() == 2);
Expand All @@ -542,6 +542,64 @@ fn subgraph_error() {
})
}

#[test]
fn subgraph_non_fatal_error() {
test_store::run_test_sequentially(|store| async move {
let subgraph_store = store.subgraph_store();
let subgraph_id = DeploymentHash::new("subgraph_non_fatal_error").unwrap();
let deployment =
test_store::create_test_subgraph(&subgraph_id, "type Foo { id: ID! }").await;

let count = || -> usize {
let store = store.subgraph_store();
let count = store.error_count(&subgraph_id).unwrap();
println!("count: {}", count);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a leftover from debugging.

count
};

let error = SubgraphError {
subgraph_id: subgraph_id.clone(),
message: "test".to_string(),
block_ptr: Some(BLOCKS[1].clone()),
handler: None,
deterministic: true,
};

assert!(count() == 0);

transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error], true)
.await
.unwrap();
assert!(count() == 1);

let info = subgraph_store.status_for_id(deployment.id);

assert!(info.non_fatal_errors.len() == 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's generally better to use assert_eq!(1, info.non_fatal_errors.len()) for equality comparisons since that will print both the expected and actual values when the assertion fails. It's a small quality-of-life improvement when tests fail, not that big a deal, but would be good to change before merging.

assert!(info.health == SubgraphHealth::Unhealthy);

let error2 = SubgraphError {
subgraph_id: subgraph_id.clone(),
message: "test2".to_string(),
block_ptr: None,
handler: None,
deterministic: false,
};

// Inserting non deterministic errors will increase error count but not count of non fatal errors
transact_errors(&store, &deployment, BLOCKS[2].clone(), vec![error2], false)
.await
.unwrap();
assert!(count() == 2);

let info = subgraph_store.status_for_id(deployment.id);

assert!(info.non_fatal_errors.len() == 1);
assert!(info.health == SubgraphHealth::Unhealthy);

test_store::remove_subgraph(&subgraph_id);
})
}

#[test]
fn fatal_vs_non_fatal() {
async fn setup() -> DeploymentLocator {
Expand Down Expand Up @@ -592,7 +650,7 @@ fn fatal_vs_non_fatal() {
.await
.unwrap());

transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error()])
transact_errors(&store, &deployment, BLOCKS[1].clone(), vec![error()], false)
.await
.unwrap();

Expand Down