Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exponential backoff retry for non deterministic errors #2988

Merged
merged 5 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 90 additions & 15 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use graph::data::store::EntityVersion;
use graph::data::subgraph::{UnifiedMappingApiVersion, MAX_SPEC_VERSION};
use graph::prelude::TryStreamExt;
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
use graph::util::lfu_cache::LfuCache;
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
use graph::{blockchain::block_stream::BlockStreamMetrics, components::store::WritableStore};
use graph::{blockchain::block_stream::BlockWithTriggers, data::subgraph::SubgraphFeature};
use graph::{
blockchain::NodeCapabilities,
blockchain::TriggersAdapter,
data::subgraph::schema::{SubgraphError, POI_OBJECT},
data::subgraph::schema::{SubgraphError, SubgraphHealth, POI_OBJECT},
};
use graph::{
blockchain::{block_stream::BlockStreamEvent, Blockchain, TriggerFilter as _},
Expand All @@ -27,9 +27,11 @@ use graph::{
use lazy_static::lazy_static;
use std::collections::{BTreeSet, HashMap};
use std::sync::{Arc, RwLock};
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::task;

const MINUTE: Duration = Duration::from_secs(60);

lazy_static! {
/// Size limit of the entity LFU cache, in bytes.
// Multiplied by 1000 because the env var is in KB.
Expand All @@ -43,6 +45,14 @@ lazy_static! {
// Used for testing Graph Node itself.
pub static ref DISABLE_FAIL_FAST: bool =
std::env::var("GRAPH_DISABLE_FAIL_FAST").is_ok();

/// Ceiling for the backoff retry of non-deterministic errors, in seconds.
pub static ref SUBGRAPH_ERROR_RETRY_CEIL_SECS: Duration =
std::env::var("GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS")
.unwrap_or((MINUTE * 30).as_secs().to_string())
.parse::<u64>()
.map(Duration::from_secs)
.expect("invalid GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS");
}

type SharedInstanceKeepAliveMap = Arc<RwLock<HashMap<DeploymentId, CancelGuard>>>;
Expand Down Expand Up @@ -462,6 +472,10 @@ where
let mut should_try_unfail_deterministic = true;
let mut should_try_unfail_non_deterministic = true;

// Exponential backoff that starts with two minutes and keeps
// increasing its timeout exponentially until it reaches the ceiling.
let mut backoff = ExponentialBackoff::new(MINUTE * 2, *SUBGRAPH_ERROR_RETRY_CEIL_SECS);

loop {
debug!(logger, "Starting or restarting subgraph");

Expand Down Expand Up @@ -634,18 +648,29 @@ where

match res {
Ok(needs_restart) => {
// Runs only once
// Keep trying to unfail subgraph for everytime it advances block(s) until it's
// health is not Failed anymore.
if should_try_unfail_non_deterministic {
should_try_unfail_non_deterministic = false;

// If the deployment head advanced, we can unfail
// the non-deterministic error (if there's any).
ctx.inputs
.store
.unfail_non_deterministic_error(&block_ptr)?;
}

deployment_failed.set(0.0);
match ctx.inputs.store.health(&ctx.inputs.deployment.hash).await? {
SubgraphHealth::Failed => {
// If the unfail call didn't change the subgraph health, we keep
// `should_try_unfail_non_deterministic` as `true` until it's
// actually unfailed.
}
SubgraphHealth::Healthy | SubgraphHealth::Unhealthy => {
// Stop trying to unfail.
should_try_unfail_non_deterministic = false;
deployment_failed.set(0.0);
backoff.reset();
}
};
}

// Notify the BlockStream implementation that a block was succesfully consumed
// and that its internal cursoring mechanism can be saved to memory.
Expand Down Expand Up @@ -674,24 +699,74 @@ where

// Handle unexpected stream errors by marking the subgraph as failed.
Err(e) => {
deployment_failed.set(1.0);

let message = format!("{:#}", e).replace("\n", "\t");
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);
let deterministic = e.is_deterministic();

let error = SubgraphError {
subgraph_id: id_for_err.clone(),
message,
block_ptr: Some(block_ptr),
handler: None,
deterministic: e.is_deterministic(),
deterministic,
};
deployment_failed.set(1.0);

store_for_err
.fail_subgraph(error)
.await
.context("Failed to set subgraph status to `failed`")?;
match deterministic {
true => {
// Fail subgraph:
// - Change status/health.
// - Save the error to the database.
store_for_err
.fail_subgraph(error)
.await
.context("Failed to set subgraph status to `failed`")?;

return Err(err);
}
false => {
// Shouldn't fail subgraph if it's already failed for non-deterministic
// reasons.
//
// If we don't do this check we would keep adding the same error to the
// database.
let should_fail_subgraph =
ctx.inputs.store.health(&ctx.inputs.deployment.hash).await?
!= SubgraphHealth::Failed;

if should_fail_subgraph {
// Fail subgraph:
// - Change status/health.
// - Save the error to the database.
store_for_err
.fail_subgraph(error)
.await
.context("Failed to set subgraph status to `failed`")?;
}

// Retry logic below:

// Cancel the stream for real.
ctx.state
.instances
.write()
.unwrap()
.remove(&ctx.inputs.deployment.id);

return Err(err);
error!(logger, "Subgraph failed for non-deterministic error: {}", e;
"attempt" => backoff.attempt,
"retry_delay_s" => backoff.delay().as_secs());

// Sleep before restarting.
backoff.sleep_async().await;

should_try_unfail_non_deterministic = true;

// And restart the subgraph.
break;
}
}
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions graph/src/components/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,8 @@ pub trait WritableStore: Send + Sync + 'static {
/// Report the name of the shard in which the subgraph is stored. This
/// should only be used for reporting and monitoring
fn shard(&self) -> &str;

async fn health(&self, id: &DeploymentHash) -> Result<SubgraphHealth, StoreError>;
}

#[async_trait]
Expand Down Expand Up @@ -1264,6 +1266,10 @@ impl WritableStore for MockStore {
fn shard(&self) -> &str {
unimplemented!()
}

async fn health(&self, _: &DeploymentHash) -> Result<SubgraphHealth, StoreError> {
unimplemented!()
}
}

pub trait BlockStore: Send + Sync + 'static {
Expand Down
3 changes: 1 addition & 2 deletions graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ impl SubgraphHealth {

pub fn is_failed(&self) -> bool {
match self {
SubgraphHealth::Healthy => false,
SubgraphHealth::Unhealthy => false,
SubgraphHealth::Failed => true,
SubgraphHealth::Healthy | SubgraphHealth::Unhealthy => false,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions graph/src/util/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ impl ExponentialBackoff {
self.attempt += 1;
delay
}

pub fn reset(&mut self) {
self.attempt = 0;
}
}
20 changes: 16 additions & 4 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ pub enum SubgraphHealth {

impl SubgraphHealth {
fn is_failed(&self) -> bool {
match self {
Self::Failed => true,
Self::Healthy | Self::Unhealthy => false,
}
use graph::data::subgraph::schema::SubgraphHealth as H;

H::from(*self).is_failed()
}
}

Expand Down Expand Up @@ -627,6 +626,19 @@ fn check_health(
.map_err(|e| e.into())
}

pub(crate) fn health(
conn: &PgConnection,
id: &DeploymentHash,
) -> Result<SubgraphHealth, StoreError> {
use subgraph_deployment as d;

d::table
.filter(d::deployment.eq(id.as_str()))
.select(d::health)
.get_result(conn)
.map_err(|e| e.into())
}

/// Reverts the errors and updates the subgraph health if necessary.
pub(crate) fn revert_subgraph_errors(
conn: &PgConnection,
Expand Down
60 changes: 32 additions & 28 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1321,48 +1321,43 @@ impl DeploymentStore {
// Deployment head (current_ptr) advanced more than the error.
// That means it's healthy, and the non-deterministic error got
// solved (didn't happen on another try).
//
// This should be the scenario where the unfail happens, however
// for now we unfail in all cases that non-deterministic errors
// were found and the deployment head advanced.
(Bound::Included(error_block_number), _)
if current_ptr.number >= error_block_number =>
{
info!(
self.logger,
"Unfailing the deployment status";
"subgraph_id" => deployment_id,
);

// Unfail the deployment.
deployment::update_deployment_status(
conn,
deployment_id,
deployment::SubgraphHealth::Healthy,
None,
)?;

// Delete the fatal error.
deployment::delete_error(conn, &subgraph_error.id)?;

Ok(())
}
// The deployment head is still before where non-deterministic error happened.
//
// Technically we shouldn't unfail the subgraph and delete the error
// until it's head actually passed the error block range. But for
// now we'll only log this and keep the old behavior.
// NOOP, the deployment head is still before where non-deterministic error happened.
block_range => {
info!(
self.logger,
"Subgraph error is still ahead of deployment head";
"Subgraph error is still ahead of deployment head, nothing to unfail";
"subgraph_id" => deployment_id,
"block_number" => format!("{}", current_ptr.number),
"block_hash" => format!("{}", current_ptr.hash),
"error_block_range" => format!("{:?}", block_range),
"error_block_hash" => subgraph_error.block_hash.as_ref().map(|hash| format!("0x{}", hex::encode(hash))),
);
}
};

info!(
self.logger,
"Unfailing the deployment status";
"subgraph_id" => deployment_id,
);

// Unfail the deployment.
deployment::update_deployment_status(
conn,
deployment_id,
deployment::SubgraphHealth::Healthy,
None,
)?;

// Delete the fatal error.
deployment::delete_error(conn, &subgraph_error.id)
Ok(())
}
}
})
}

Expand All @@ -1379,4 +1374,13 @@ impl DeploymentStore {
"shard" => self.pool.shard.as_str())
});
}

pub(crate) async fn health(
&self,
id: &DeploymentHash,
) -> Result<deployment::SubgraphHealth, StoreError> {
let id = id.clone();
self.with_conn(move |conn, _| deployment::health(&conn, &id).map_err(Into::into))
.await
}
}
9 changes: 8 additions & 1 deletion store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use graph::{
},
constraint_violation,
data::query::QueryTarget,
data::subgraph::schema::SubgraphError,
data::subgraph::schema::{self, SubgraphError},
data::{
store::{EntityVersion, Vid},
subgraph::status,
Expand Down Expand Up @@ -1325,6 +1325,13 @@ impl WritableStoreTrait for WritableStore {
fn shard(&self) -> &str {
self.site.shard.as_str()
}

async fn health(&self, id: &DeploymentHash) -> Result<schema::SubgraphHealth, StoreError> {
self.retry_async("health", || async {
self.writable.health(id).await.map(Into::into)
})
.await
}
}

fn same_subgraph(mods: &Vec<EntityModification>, id: &DeploymentHash) -> bool {
Expand Down
9 changes: 4 additions & 5 deletions store/postgres/tests/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,15 +966,14 @@ fn fail_unfail_non_deterministic_error_noop() {
// Fail the subgraph with a non-deterministic error, but with an advanced block.
writable.fail_subgraph(error).await.unwrap();

// Since the block range of the block won't match the deployment head, this would be NOOP,
// but we're skipping the confidence check for now.
// Since the block range of the block won't match the deployment head, this will be NOOP.
writable.unfail_non_deterministic_error(&BLOCKS[1]).unwrap();

// Unfail ocurrs as expected.
assert_eq!(count(), 1);
// State continues the same besides a new error added to the database.
assert_eq!(count(), 2);
let vi = get_version_info(&store, NAME);
assert_eq!(&*NAME, vi.deployment_id.as_str());
assert_eq!(false, vi.failed);
assert_eq!(true, vi.failed);
assert_eq!(Some(1), vi.latest_ethereum_block_number);

test_store::remove_subgraphs();
Expand Down