Skip to content

Commit

Permalink
Add exponential backoff retry for non deterministic errors (#2988)
Browse files Browse the repository at this point in the history
* deployment: Have just one is_failed function for SubgraphHealth

* writable_store: Add method to get health of deployment

* instance_manager: Use subgraph health to decide unfailure

* instance_manager: Add exponential backoff retry for non-deterministic errors

* store: Make health function async
  • Loading branch information
evaporei authored Dec 7, 2021
1 parent 1744e46 commit 5ddc3e1
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 55 deletions.
105 changes: 90 additions & 15 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use graph::data::store::EntityVersion;
use graph::data::subgraph::{UnifiedMappingApiVersion, MAX_SPEC_VERSION};
use graph::prelude::TryStreamExt;
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
use graph::util::lfu_cache::LfuCache;
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
use graph::{blockchain::block_stream::BlockStreamMetrics, components::store::WritableStore};
use graph::{blockchain::block_stream::BlockWithTriggers, data::subgraph::SubgraphFeature};
use graph::{
blockchain::NodeCapabilities,
blockchain::TriggersAdapter,
data::subgraph::schema::{SubgraphError, POI_OBJECT},
data::subgraph::schema::{SubgraphError, SubgraphHealth, POI_OBJECT},
};
use graph::{
blockchain::{block_stream::BlockStreamEvent, Blockchain, TriggerFilter as _},
Expand All @@ -27,9 +27,11 @@ use graph::{
use lazy_static::lazy_static;
use std::collections::{BTreeSet, HashMap};
use std::sync::{Arc, RwLock};
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::task;

const MINUTE: Duration = Duration::from_secs(60);

lazy_static! {
/// Size limit of the entity LFU cache, in bytes.
// Multiplied by 1000 because the env var is in KB.
Expand All @@ -43,6 +45,14 @@ lazy_static! {
// Used for testing Graph Node itself.
pub static ref DISABLE_FAIL_FAST: bool =
std::env::var("GRAPH_DISABLE_FAIL_FAST").is_ok();

/// Ceiling for the backoff retry of non-deterministic errors, in seconds.
pub static ref SUBGRAPH_ERROR_RETRY_CEIL_SECS: Duration =
std::env::var("GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS")
.unwrap_or((MINUTE * 30).as_secs().to_string())
.parse::<u64>()
.map(Duration::from_secs)
.expect("invalid GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS");
}

type SharedInstanceKeepAliveMap = Arc<RwLock<HashMap<DeploymentId, CancelGuard>>>;
Expand Down Expand Up @@ -462,6 +472,10 @@ where
let mut should_try_unfail_deterministic = true;
let mut should_try_unfail_non_deterministic = true;

// Exponential backoff that starts with two minutes and keeps
// increasing its timeout exponentially until it reaches the ceiling.
let mut backoff = ExponentialBackoff::new(MINUTE * 2, *SUBGRAPH_ERROR_RETRY_CEIL_SECS);

loop {
debug!(logger, "Starting or restarting subgraph");

Expand Down Expand Up @@ -634,18 +648,29 @@ where

match res {
Ok(needs_restart) => {
// Runs only once
// Keep trying to unfail subgraph for everytime it advances block(s) until it's
// health is not Failed anymore.
if should_try_unfail_non_deterministic {
should_try_unfail_non_deterministic = false;

// If the deployment head advanced, we can unfail
// the non-deterministic error (if there's any).
ctx.inputs
.store
.unfail_non_deterministic_error(&block_ptr)?;
}

deployment_failed.set(0.0);
match ctx.inputs.store.health(&ctx.inputs.deployment.hash).await? {
SubgraphHealth::Failed => {
// If the unfail call didn't change the subgraph health, we keep
// `should_try_unfail_non_deterministic` as `true` until it's
// actually unfailed.
}
SubgraphHealth::Healthy | SubgraphHealth::Unhealthy => {
// Stop trying to unfail.
should_try_unfail_non_deterministic = false;
deployment_failed.set(0.0);
backoff.reset();
}
};
}

// Notify the BlockStream implementation that a block was succesfully consumed
// and that its internal cursoring mechanism can be saved to memory.
Expand Down Expand Up @@ -674,24 +699,74 @@ where

// Handle unexpected stream errors by marking the subgraph as failed.
Err(e) => {
deployment_failed.set(1.0);

let message = format!("{:#}", e).replace("\n", "\t");
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);
let deterministic = e.is_deterministic();

let error = SubgraphError {
subgraph_id: id_for_err.clone(),
message,
block_ptr: Some(block_ptr),
handler: None,
deterministic: e.is_deterministic(),
deterministic,
};
deployment_failed.set(1.0);

store_for_err
.fail_subgraph(error)
.await
.context("Failed to set subgraph status to `failed`")?;
match deterministic {
true => {
// Fail subgraph:
// - Change status/health.
// - Save the error to the database.
store_for_err
.fail_subgraph(error)
.await
.context("Failed to set subgraph status to `failed`")?;

return Err(err);
}
false => {
// Shouldn't fail subgraph if it's already failed for non-deterministic
// reasons.
//
// If we don't do this check we would keep adding the same error to the
// database.
let should_fail_subgraph =
ctx.inputs.store.health(&ctx.inputs.deployment.hash).await?
!= SubgraphHealth::Failed;

if should_fail_subgraph {
// Fail subgraph:
// - Change status/health.
// - Save the error to the database.
store_for_err
.fail_subgraph(error)
.await
.context("Failed to set subgraph status to `failed`")?;
}

// Retry logic below:

// Cancel the stream for real.
ctx.state
.instances
.write()
.unwrap()
.remove(&ctx.inputs.deployment.id);

return Err(err);
error!(logger, "Subgraph failed for non-deterministic error: {}", e;
"attempt" => backoff.attempt,
"retry_delay_s" => backoff.delay().as_secs());

// Sleep before restarting.
backoff.sleep_async().await;

should_try_unfail_non_deterministic = true;

// And restart the subgraph.
break;
}
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions graph/src/components/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,8 @@ pub trait WritableStore: Send + Sync + 'static {
/// Report the name of the shard in which the subgraph is stored. This
/// should only be used for reporting and monitoring
fn shard(&self) -> &str;

async fn health(&self, id: &DeploymentHash) -> Result<SubgraphHealth, StoreError>;
}

#[async_trait]
Expand Down Expand Up @@ -1264,6 +1266,10 @@ impl WritableStore for MockStore {
fn shard(&self) -> &str {
unimplemented!()
}

async fn health(&self, _: &DeploymentHash) -> Result<SubgraphHealth, StoreError> {
unimplemented!()
}
}

pub trait BlockStore: Send + Sync + 'static {
Expand Down
3 changes: 1 addition & 2 deletions graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ impl SubgraphHealth {

pub fn is_failed(&self) -> bool {
match self {
SubgraphHealth::Healthy => false,
SubgraphHealth::Unhealthy => false,
SubgraphHealth::Failed => true,
SubgraphHealth::Healthy | SubgraphHealth::Unhealthy => false,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions graph/src/util/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ impl ExponentialBackoff {
self.attempt += 1;
delay
}

pub fn reset(&mut self) {
self.attempt = 0;
}
}
20 changes: 16 additions & 4 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ pub enum SubgraphHealth {

impl SubgraphHealth {
fn is_failed(&self) -> bool {
match self {
Self::Failed => true,
Self::Healthy | Self::Unhealthy => false,
}
use graph::data::subgraph::schema::SubgraphHealth as H;

H::from(*self).is_failed()
}
}

Expand Down Expand Up @@ -627,6 +626,19 @@ fn check_health(
.map_err(|e| e.into())
}

pub(crate) fn health(
conn: &PgConnection,
id: &DeploymentHash,
) -> Result<SubgraphHealth, StoreError> {
use subgraph_deployment as d;

d::table
.filter(d::deployment.eq(id.as_str()))
.select(d::health)
.get_result(conn)
.map_err(|e| e.into())
}

/// Reverts the errors and updates the subgraph health if necessary.
pub(crate) fn revert_subgraph_errors(
conn: &PgConnection,
Expand Down
60 changes: 32 additions & 28 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1321,48 +1321,43 @@ impl DeploymentStore {
// Deployment head (current_ptr) advanced more than the error.
// That means it's healthy, and the non-deterministic error got
// solved (didn't happen on another try).
//
// This should be the scenario where the unfail happens, however
// for now we unfail in all cases that non-deterministic errors
// were found and the deployment head advanced.
(Bound::Included(error_block_number), _)
if current_ptr.number >= error_block_number =>
{
info!(
self.logger,
"Unfailing the deployment status";
"subgraph_id" => deployment_id,
);

// Unfail the deployment.
deployment::update_deployment_status(
conn,
deployment_id,
deployment::SubgraphHealth::Healthy,
None,
)?;

// Delete the fatal error.
deployment::delete_error(conn, &subgraph_error.id)?;

Ok(())
}
// The deployment head is still before where non-deterministic error happened.
//
// Technically we shouldn't unfail the subgraph and delete the error
// until it's head actually passed the error block range. But for
// now we'll only log this and keep the old behavior.
// NOOP, the deployment head is still before where non-deterministic error happened.
block_range => {
info!(
self.logger,
"Subgraph error is still ahead of deployment head";
"Subgraph error is still ahead of deployment head, nothing to unfail";
"subgraph_id" => deployment_id,
"block_number" => format!("{}", current_ptr.number),
"block_hash" => format!("{}", current_ptr.hash),
"error_block_range" => format!("{:?}", block_range),
"error_block_hash" => subgraph_error.block_hash.as_ref().map(|hash| format!("0x{}", hex::encode(hash))),
);
}
};

info!(
self.logger,
"Unfailing the deployment status";
"subgraph_id" => deployment_id,
);

// Unfail the deployment.
deployment::update_deployment_status(
conn,
deployment_id,
deployment::SubgraphHealth::Healthy,
None,
)?;

// Delete the fatal error.
deployment::delete_error(conn, &subgraph_error.id)
Ok(())
}
}
})
}

Expand All @@ -1379,4 +1374,13 @@ impl DeploymentStore {
"shard" => self.pool.shard.as_str())
});
}

pub(crate) async fn health(
&self,
id: &DeploymentHash,
) -> Result<deployment::SubgraphHealth, StoreError> {
let id = id.clone();
self.with_conn(move |conn, _| deployment::health(&conn, &id).map_err(Into::into))
.await
}
}
9 changes: 8 additions & 1 deletion store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use graph::{
},
constraint_violation,
data::query::QueryTarget,
data::subgraph::schema::SubgraphError,
data::subgraph::schema::{self, SubgraphError},
data::{
store::{EntityVersion, Vid},
subgraph::status,
Expand Down Expand Up @@ -1325,6 +1325,13 @@ impl WritableStoreTrait for WritableStore {
fn shard(&self) -> &str {
self.site.shard.as_str()
}

async fn health(&self, id: &DeploymentHash) -> Result<schema::SubgraphHealth, StoreError> {
self.retry_async("health", || async {
self.writable.health(id).await.map(Into::into)
})
.await
}
}

fn same_subgraph(mods: &Vec<EntityModification>, id: &DeploymentHash) -> bool {
Expand Down
9 changes: 4 additions & 5 deletions store/postgres/tests/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,15 +966,14 @@ fn fail_unfail_non_deterministic_error_noop() {
// Fail the subgraph with a non-deterministic error, but with an advanced block.
writable.fail_subgraph(error).await.unwrap();

// Since the block range of the block won't match the deployment head, this would be NOOP,
// but we're skipping the confidence check for now.
// Since the block range of the block won't match the deployment head, this will be NOOP.
writable.unfail_non_deterministic_error(&BLOCKS[1]).unwrap();

// Unfail ocurrs as expected.
assert_eq!(count(), 1);
// State continues the same besides a new error added to the database.
assert_eq!(count(), 2);
let vi = get_version_info(&store, NAME);
assert_eq!(&*NAME, vi.deployment_id.as_str());
assert_eq!(false, vi.failed);
assert_eq!(true, vi.failed);
assert_eq!(Some(1), vi.latest_ethereum_block_number);

test_store::remove_subgraphs();
Expand Down

0 comments on commit 5ddc3e1

Please sign in to comment.