Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure earliest_block on copies is set correctly #4502

Merged
merged 3 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- No schema changes, migration is only there to trigger remapping of
-- foreign metadata
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- No schema changes, migration is only there to trigger remapping of
-- foreign metadata
select 1;
6 changes: 1 addition & 5 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,7 @@ pub fn set_account_like(
}

pub fn copy_account_like(conn: &PgConnection, src: &Site, dst: &Site) -> Result<usize, StoreError> {
let src_nsp = if src.shard == dst.shard {
"subgraphs".to_string()
} else {
ForeignServer::metadata_schema(&src.shard)
};
let src_nsp = ForeignServer::metadata_schema_in(&src.shard, &dst.shard);
let query = format!(
"insert into subgraphs.table_stats(deployment, table_name, is_account_like, last_pruned_block)
select $2 as deployment, ts.table_name, ts.is_account_like, ts.last_pruned_block
Expand Down
18 changes: 16 additions & 2 deletions store/postgres/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,24 @@ impl ForeignServer {
format!("shard_{}", shard.as_str())
}

/// The name of the schema under which the `subgraphs` schema for `shard`
/// is accessible in shards that are not `shard`
/// The name of the schema under which the `subgraphs` schema for
/// `shard` is accessible in shards that are not `shard`. In most cases
/// you actually want to use `metadata_schema_in`
pub fn metadata_schema(shard: &Shard) -> String {
format!("{}_subgraphs", Self::name(shard))
}

/// The name of the schema under which the `subgraphs` schema for
/// `shard` is accessible in the shard `current`. It is permissible for
/// `shard` and `current` to be the same.
pub fn metadata_schema_in(shard: &Shard, current: &Shard) -> String {
if shard == current {
"subgraphs".to_string()
} else {
Self::metadata_schema(&shard)
}
}

pub fn new_from_raw(shard: String, postgres_url: &str) -> Result<Self, anyhow::Error> {
Self::new(Shard::new(shard)?, postgres_url)
}
Expand Down Expand Up @@ -202,6 +214,8 @@ impl ForeignServer {
"subgraph_deployment_assignment",
"subgraph",
"subgraph_version",
"subgraph_deployment",
"subgraph_manifest",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mapping these could be useful for some analytics queries as well, nice.

] {
let create_stmt =
catalog::create_foreign_table(conn, "subgraphs", table_name, &nsp, &self.name)?;
Expand Down
26 changes: 21 additions & 5 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -944,11 +944,7 @@ pub(crate) fn copy_errors(
) -> Result<usize, StoreError> {
use subgraph_error as e;

let src_nsp = if src.shard == dst.shard {
"subgraphs".to_string()
} else {
ForeignServer::metadata_schema(&src.shard)
};
let src_nsp = ForeignServer::metadata_schema_in(&src.shard, &dst.shard);

// Check whether there are any errors for dst which indicates we already
// did copy
Expand Down Expand Up @@ -1167,6 +1163,26 @@ pub fn set_earliest_block(
Ok(())
}

/// Copy the `earliest_block` attribute from `src` to `dst`. The copy might
/// go across shards and use the metadata tables mapped into the shard for
/// `conn` which must be the shard for `dst`
pub fn copy_earliest_block(conn: &PgConnection, src: &Site, dst: &Site) -> Result<(), StoreError> {
use subgraph_deployment as d;

let src_nsp = ForeignServer::metadata_schema_in(&src.shard, &dst.shard);

let query = format!(
"(select earliest_block_number from {src_nsp}.subgraph_deployment where id = {})",
src.id
);

update(d::table.filter(d::id.eq(dst.id)))
.set(d::earliest_block_number.eq(sql(&query)))
.execute(conn)?;

Ok(())
}

pub fn on_sync(conn: &PgConnection, id: impl Into<DeploymentId>) -> Result<OnSync, StoreError> {
use subgraph_manifest as m;

Expand Down
15 changes: 9 additions & 6 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1577,12 +1577,6 @@ impl DeploymentStore {
info!(logger, "Counted the entities";
"time_ms" => start.elapsed().as_millis());

deployment::set_earliest_block(
&conn,
&dst.site,
src_deployment.earliest_block_number,
)?;

deployment::set_history_blocks(
&conn,
&dst.site,
Expand All @@ -1594,6 +1588,15 @@ impl DeploymentStore {
self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), &conn)?;
}

// The `earliest_block` for `src` might have changed while
// we did the copy if `src` was pruned while we copied;
// adjusting it very late in the copy process ensures that
// we truly do have all the data starting at
// `earliest_block` and do not inadvertently expose data
// that might be incomplete because a prune on the source
// removed data just before we copied it
deployment::copy_earliest_block(&conn, &src.site, &dst.site)?;

// Set the block ptr to the graft point to signal that we successfully
// performed the graft
crate::deployment::forward_block_ptr(&conn, &dst.site.deployment, &block)?;
Expand Down
6 changes: 1 addition & 5 deletions store/postgres/src/dynds/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,7 @@ pub(crate) fn copy(
return Ok(0);
}

let src_nsp = if src.shard == dst.shard {
"subgraphs".to_string()
} else {
ForeignServer::metadata_schema(&src.shard)
};
let src_nsp = ForeignServer::metadata_schema_in(&src.shard, &dst.shard);

// Check whether there are any dynamic data sources for dst which
// indicates we already did copy
Expand Down