Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow redeployment of grafted subgraph even when graft_base is not available #4695

Merged
merged 4 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
deployment,
deployment.clone(),
raw,
resolver,
logger,
Expand All @@ -575,8 +575,17 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
.map_err(SubgraphRegistrarError::ResolveError)
.await?;

// Determine if the graft_base should be validated.
// Validate the graft_base if there is a pending graft, ensuring its presence.
// If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated.
// If the subgraph already exists and there is no pending graft, graft_base validation is not required.
let should_validate = match store.graft_pending(&deployment) {
Ok(graft_pending) => graft_pending,
Err(StoreError::DeploymentNotFound(_)) => true,
Err(e) => return Err(SubgraphRegistrarError::StoreError(e)),
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to have comments here.

let manifest = unvalidated
.validate(store.cheap_clone(), true)
.validate(store.cheap_clone(), should_validate)
.await
.map_err(SubgraphRegistrarError::ManifestValidationError)?;

Expand Down
3 changes: 3 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ pub trait SubgraphStore: Send + Sync + 'static {
/// Return the GraphQL schema supplied by the user
fn input_schema(&self, subgraph_id: &DeploymentHash) -> Result<Arc<InputSchema>, StoreError>;

/// Return a bool represeting whether there is a pending graft for the subgraph
fn graft_pending(&self, id: &DeploymentHash) -> Result<bool, StoreError>;

/// Return the GraphQL schema that was derived from the user's schema by
/// adding a root query type etc. to it
fn api_schema(
Expand Down
21 changes: 18 additions & 3 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1247,18 +1247,33 @@ impl<'a> Connection<'a> {
}

/// Create a site for a brand new deployment.
/// If it already exists, return the existing site
/// and a boolean indicating whether a new site was created.
/// `false` means the site already existed.
pub fn allocate_site(
&self,
shard: Shard,
subgraph: &DeploymentHash,
network: String,
schema_version: DeploymentSchemaVersion,
) -> Result<Site, StoreError> {
graft_base: Option<&DeploymentHash>,
) -> Result<(Site, bool), StoreError> {
if let Some(site) = queries::find_active_site(self.conn.as_ref(), subgraph)? {
return Ok(site);
return Ok((site, false));
}

let site_was_created = true;
let schema_version = match graft_base {
Some(graft_base) => {
let site = queries::find_active_site(self.conn.as_ref(), graft_base)?;
site.map(|site| site.schema_version).ok_or_else(|| {
StoreError::DeploymentNotFound("graft_base not found".to_string())
})
}
None => Ok(DeploymentSchemaVersion::LATEST),
}?;

self.create_site(shard, subgraph.clone(), network, schema_version, true)
.map(|site| (site, site_was_created))
}

pub fn assigned_node(&self, site: &Site) -> Result<Option<NodeId>, StoreError> {
Expand Down
50 changes: 31 additions & 19 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use graph::{
components::{
server::index_node::VersionInfo,
store::{
self, BlockStore, DeploymentLocator, DeploymentSchemaVersion,
EnsLookup as EnsLookupTrait, PruneReporter, PruneRequest, SubgraphFork,
self, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, PruneReporter,
PruneRequest, SubgraphFork,
},
},
constraint_violation,
Expand Down Expand Up @@ -515,14 +515,9 @@ impl SubgraphStoreInner {
assert!(!replace);

self.evict(schema.id())?;
let graft_base = deployment.graft_base.as_ref();

let graft_base = deployment
.graft_base
.as_ref()
.map(|base| self.layout(base))
.transpose()?;

let (site, node_id) = {
let (site, exists, node_id) = {
// We need to deal with two situations:
// (1) We are really creating a new subgraph; it therefore needs
// to go in the shard and onto the node that the placement
Expand All @@ -534,21 +529,32 @@ impl SubgraphStoreInner {
// assignment that we used last time to avoid creating
// the same deployment in another shard
let (shard, node_id) = self.place(&name, &network_name, node_id)?;
let schema_version = match &graft_base {
None => DeploymentSchemaVersion::LATEST,
Some(src_layout) => src_layout.site.schema_version,
};
let conn = self.primary_conn()?;
let site = conn.allocate_site(shard, schema.id(), network_name, schema_version)?;
let (site, site_was_created) =
conn.allocate_site(shard, schema.id(), network_name, graft_base)?;
let node_id = conn.assigned_node(&site)?.unwrap_or(node_id);
(site, node_id)
(site, !site_was_created, node_id)
};
let site = Arc::new(site);

if let Some(graft_base) = &graft_base {
self.primary_conn()?
.record_active_copy(graft_base.site.as_ref(), site.as_ref())?;
}
// if the deployment already exists, we don't need to perform any copying
// so we can set graft_base to None
// if it doesn't exist, we need to copy the graft base to the new deployment
let graft_base = if !exists {
let graft_base = deployment
.graft_base
.as_ref()
.map(|base| self.layout(base))
.transpose()?;

if let Some(graft_base) = &graft_base {
self.primary_conn()?
.record_active_copy(graft_base.site.as_ref(), site.as_ref())?;
}
graft_base
} else {
None
};

// Create the actual databases schema and metadata entries
let deployment_store = self
Expand Down Expand Up @@ -1441,6 +1447,12 @@ impl SubgraphStoreTrait for SubgraphStore {
}
}

fn graft_pending(&self, id: &DeploymentHash) -> Result<bool, StoreError> {
let (store, _) = self.store(id)?;
let graft_detail = store.graft_pending(id)?;
Ok(graft_detail.is_some())
}

async fn least_block_ptr(&self, id: &DeploymentHash) -> Result<Option<BlockPtr>, StoreError> {
let (store, site) = self.store(id)?;
store.block_ptr(site.cheap_clone()).await
Expand Down