Skip to content

Commit

Permalink
instance_manager: Add exponential backoff retry for non-deterministic…
Browse files Browse the repository at this point in the history
… errors
  • Loading branch information
evaporei committed Nov 22, 2021
1 parent c37db13 commit 30420ed
Showing 1 changed file with 36 additions and 3 deletions.
39 changes: 36 additions & 3 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use graph::data::store::scalar::Bytes;
use graph::data::subgraph::{UnifiedMappingApiVersion, MAX_SPEC_VERSION};
use graph::prelude::TryStreamExt;
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
use graph::util::lfu_cache::LfuCache;
use graph::util::{backoff::ExponentialBackoff, lfu_cache::LfuCache};
use graph::{blockchain::block_stream::BlockStreamMetrics, components::store::WritableStore};
use graph::{blockchain::block_stream::BlockWithTriggers, data::subgraph::SubgraphFeature};
use graph::{
Expand All @@ -26,9 +26,11 @@ use graph::{
use lazy_static::lazy_static;
use std::collections::{BTreeSet, HashMap};
use std::sync::{Arc, RwLock};
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::task;

const MINUTE: Duration = Duration::from_secs(60);

lazy_static! {
/// Size limit of the entity LFU cache, in bytes.
// Multiplied by 1000 because the env var is in KB.
Expand All @@ -42,6 +44,14 @@ lazy_static! {
// Used for testing Graph Node itself.
pub static ref DISABLE_FAIL_FAST: bool =
std::env::var("GRAPH_DISABLE_FAIL_FAST").is_ok();

/// Ceiling for the backoff retry of non-deterministic errors, in seconds.
pub static ref SUBGRAPH_ERROR_RETRY_CEIL_SECS: Duration =
std::env::var("GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS")
.unwrap_or((MINUTE * 30).as_secs().to_string())
.parse::<u64>()
.map(Duration::from_secs)
.expect("invalid GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS");
}

type SharedInstanceKeepAliveMap = Arc<RwLock<HashMap<DeploymentId, CancelGuard>>>;
Expand Down Expand Up @@ -487,6 +497,10 @@ where
.unwrap()
.insert(ctx.inputs.deployment.id, block_stream_canceler);

// Exponential backoff that starts with two minutes and keeps
// increasing its timeout exponentially until it reaches the ceiling.
let mut backoff = ExponentialBackoff::new(MINUTE * 2, *SUBGRAPH_ERROR_RETRY_CEIL_SECS);

debug!(logger, "Starting block stream");

// Process events from the stream as long as no restart is needed
Expand Down Expand Up @@ -656,6 +670,7 @@ where

// Handle unexpected stream errors by marking the subgraph as failed.
Err(e) => {
let deterministic = e.is_deterministic();
let message = format!("{:#}", e).replace("\n", "\t");
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);

Expand All @@ -664,7 +679,7 @@ where
message,
block_ptr: Some(block_ptr),
handler: None,
deterministic: e.is_deterministic(),
deterministic,
};
deployment_failed.set(1.0);

Expand All @@ -673,6 +688,24 @@ where
.await
.context("Failed to set subgraph status to `failed`")?;

if !deterministic {
// Cancel the stream for real
ctx.state
.instances
.write()
.unwrap()
.remove(&ctx.inputs.deployment.id);

// Sleep before restarting
error!(logger, "Subgraph failed for non-deterministic error: {}", e;
"attempt" => backoff.attempt,
"retry_delay_s" => backoff.delay().as_secs());
backoff.sleep_async().await;

// And restart the subgraph
break;
}

return Err(err);
}
}
Expand Down

0 comments on commit 30420ed

Please sign in to comment.