Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize subgraph ptr update #5317

Merged
merged 2 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,9 +788,10 @@ where
.store
.unfail_non_deterministic_error(&block_ptr)?;

// Stop trying to unfail.
self.state.should_try_unfail_non_deterministic = false;

if let UnfailOutcome::Unfailed = outcome {
// Stop trying to unfail.
self.state.should_try_unfail_non_deterministic = false;
self.metrics.stream.deployment_failed.set(0.0);
self.state.backoff.reset();
}
Expand Down
60 changes: 31 additions & 29 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,47 +418,49 @@ pub fn transact_block(
firehose_cursor: &FirehoseCursor,
count: i32,
) -> Result<BlockNumber, StoreError> {
use crate::diesel::BoolExpressionMethods;
use subgraph_deployment as d;

// Work around a Diesel issue with serializing BigDecimals to numeric
let number = format!("{}::numeric", ptr.number);

let count_sql = entity_count_sql(count);

let rows = update(
d::table.filter(d::id.eq(site.id)).filter(
// Asserts that the processing direction is forward.
d::latest_ethereum_block_number
.lt(sql(&number))
.or(d::latest_ethereum_block_number.is_null()),
),
)
.set((
d::latest_ethereum_block_number.eq(sql(&number)),
d::latest_ethereum_block_hash.eq(ptr.hash_slice()),
d::firehose_cursor.eq(firehose_cursor.as_ref()),
d::entity_count.eq(sql(&count_sql)),
d::current_reorg_depth.eq(0),
))
.returning(d::earliest_block_number)
.get_results::<BlockNumber>(conn)
.map_err(StoreError::from)?;
// Sanity check: The processing direction is forward.
//
// Performance note: This costs us an extra DB query on every update. We used to put this in the
// `where` clause of the `update` statement, but that caused Postgres to use bitmap scans instead
// of a simple primary key lookup. So a separate query it is.
let block_ptr = block_ptr(conn, &site.deployment)?;
if let Some(block_ptr_from) = block_ptr {
if block_ptr_from.number >= ptr.number {
return Err(StoreError::DuplicateBlockProcessing(
site.deployment.clone(),
ptr.number,
));
}
}

let rows = update(d::table.filter(d::id.eq(site.id)))
.set((
d::latest_ethereum_block_number.eq(sql(&number)),
d::latest_ethereum_block_hash.eq(ptr.hash_slice()),
d::firehose_cursor.eq(firehose_cursor.as_ref()),
d::entity_count.eq(sql(&count_sql)),
d::current_reorg_depth.eq(0),
))
.returning(d::earliest_block_number)
.get_results::<BlockNumber>(conn)
.map_err(StoreError::from)?;

match rows.len() {
// Common case: A single row was updated.
1 => Ok(rows[0]),

// No matching rows were found. This is an error. By the filter conditions, this can only be
// due to a missing deployment (which `block_ptr` catches) or duplicate block processing.
0 => match block_ptr(conn, &site.deployment)? {
Some(block_ptr_from) if block_ptr_from.number >= ptr.number => Err(
StoreError::DuplicateBlockProcessing(site.deployment.clone(), ptr.number),
),
None | Some(_) => Err(StoreError::Unknown(anyhow!(
"unknown error forwarding block ptr"
))),
},
// No matching rows were found. This is logically impossible, as the `block_ptr` would have
// caught a non-existing deployment.
0 => Err(StoreError::Unknown(anyhow!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor note, but for internal errors of the "can't happen" type, I usually use StoreError::ConstraintViolation. Looks good otherwise though.

"unknown error forwarding block ptr"
))),

// More than one matching row was found.
_ => Err(StoreError::ConstraintViolation(
Expand Down
Loading