From a54409d11485b75e90d1d6ce240ff31bbe40fd04 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Wed, 14 Jun 2023 14:20:01 +0800 Subject: [PATCH 1/4] core, store : Allow redeployment of grafted subgraph even when graft_base is not available --- core/src/subgraph/registrar.rs | 6 +-- store/postgres/src/deployment_store.rs | 5 +++ store/postgres/src/subgraph_store.rs | 54 +++++++++++++++++++------- 3 files changed, 49 insertions(+), 16 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 8cab8c59ac9..292219f5d12 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -566,7 +566,7 @@ async fn create_subgraph_version( ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); let unvalidated = UnvalidatedSubgraphManifest::::resolve( - deployment, + deployment.clone(), raw, resolver, logger, @@ -574,9 +574,9 @@ async fn create_subgraph_version( ) .map_err(SubgraphRegistrarError::ResolveError) .await?; - + let exists = store.is_deployed(&deployment)?; let manifest = unvalidated - .validate(store.cheap_clone(), true) + .validate(store.cheap_clone(), !exists) .await .map_err(SubgraphRegistrarError::ManifestValidationError)?; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 7eb66502cd8..016991edeaf 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1441,6 +1441,11 @@ impl DeploymentStore { .await } + pub(crate) fn exists(&self, id: Arc) -> Result { + let conn = self.get_conn()?; + deployment::exists(&conn, &id) + } + pub(crate) fn graft_pending( &self, id: &DeploymentHash, diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 90ee9f72978..903470a3217 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -424,6 +424,16 @@ impl SubgraphStoreInner { store.find_layout(site) } + pub(crate) fn deployment_exists(&self, id: &DeploymentHash) -> Result { + let (store, site) = match self.store(id) { + Ok(pair) => pair, + Err(StoreError::DeploymentNotFound(_)) => return Ok(false), + Err(err) => return Err(err), + }; + + store.exists(site) + } + fn place_on_node( &self, mut nodes: Vec, @@ -516,11 +526,19 @@ impl SubgraphStoreInner { self.evict(schema.id())?; - let graft_base = deployment - .graft_base - .as_ref() - .map(|base| self.layout(base)) - .transpose()?; + let deployment_hash = schema.id(); + let exists = self.deployment_exists(deployment_hash)?; + let graft_base = deployment.graft_base.as_ref(); + + let schema_version = if exists { + let layout = self.layout(deployment_hash)?; + layout.site.schema_version + } else if let Some(graft_base) = graft_base { + let layout = self.layout(graft_base)?; + layout.site.schema_version + } else { + DeploymentSchemaVersion::LATEST + }; let (site, node_id) = { // We need to deal with two situations: @@ -534,10 +552,6 @@ impl SubgraphStoreInner { // assignment that we used last time to avoid creating // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id)?; - let schema_version = match &graft_base { - None => DeploymentSchemaVersion::LATEST, - Some(src_layout) => src_layout.site.schema_version, - }; let conn = self.primary_conn()?; let site = conn.allocate_site(shard, schema.id(), network_name, schema_version)?; let node_id = conn.assigned_node(&site)?.unwrap_or(node_id); @@ -545,10 +559,24 @@ impl SubgraphStoreInner { }; let site = Arc::new(site); - if let Some(graft_base) = &graft_base { - self.primary_conn()? - .record_active_copy(graft_base.site.as_ref(), site.as_ref())?; - } + // if the deployment already exists, we don't need to perform any copying + // so we can set graft_base to None + // if it doesn't exist, we need to copy the graft base to the new deployment + let graft_base = if !exists { + let graft_base = deployment + .graft_base + .as_ref() + .map(|base| self.layout(base)) + .transpose()?; + + if let Some(graft_base) = &graft_base { + self.primary_conn()? + .record_active_copy(graft_base.site.as_ref(), site.as_ref())?; + } + graft_base + } else { + None + }; // Create the actual databases schema and metadata entries let deployment_store = self From 2d372c969656bd19a6c3a937a4fc0d8b193639bc Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Mon, 19 Jun 2023 20:32:04 +0530 Subject: [PATCH 2/4] store,core,graph : move schema_version calculation to allocate_site --- core/src/subgraph/registrar.rs | 3 ++- graph/src/components/store/traits.rs | 3 +++ store/postgres/src/primary.rs | 12 +++++++++++- store/postgres/src/subgraph_store.rs | 22 +++++++++------------- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 292219f5d12..fdaea07c98f 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -575,8 +575,9 @@ async fn create_subgraph_version( .map_err(SubgraphRegistrarError::ResolveError) .await?; let exists = store.is_deployed(&deployment)?; + let graft_pending = store.graft_pending(&deployment)?; let manifest = unvalidated - .validate(store.cheap_clone(), !exists) + .validate(store.cheap_clone(), !exists || graft_pending) .await .map_err(SubgraphRegistrarError::ManifestValidationError)?; diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index fa3c96143ee..07c6ad8cf21 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -127,6 +127,9 @@ pub trait SubgraphStore: Send + Sync + 'static { /// Return the GraphQL schema supplied by the user fn input_schema(&self, subgraph_id: &DeploymentHash) -> Result, StoreError>; + /// Return a bool represeting whether there is a pending graft for the subgraph + fn graft_pending(&self, id: &DeploymentHash) -> Result; + /// Return the GraphQL schema that was derived from the user's schema by /// adding a root query type etc. to it fn api_schema( diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index ef15d36d07d..82b226dad07 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -1252,12 +1252,22 @@ impl<'a> Connection<'a> { shard: Shard, subgraph: &DeploymentHash, network: String, - schema_version: DeploymentSchemaVersion, + graft_base: Option<&DeploymentHash>, ) -> Result { if let Some(site) = queries::find_active_site(self.conn.as_ref(), subgraph)? { return Ok(site); } + let schema_version = match graft_base { + Some(graft_base) => { + let site = queries::find_active_site(self.conn.as_ref(), graft_base)?; + site.map(|site| site.schema_version).ok_or_else(|| { + StoreError::DeploymentNotFound("graft_base not found".to_string()) + }) + } + None => Ok(DeploymentSchemaVersion::LATEST), + }?; + self.create_site(shard, subgraph.clone(), network, schema_version, true) } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 903470a3217..89b62e8d9bb 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -16,8 +16,8 @@ use graph::{ components::{ server::index_node::VersionInfo, store::{ - self, BlockStore, DeploymentLocator, DeploymentSchemaVersion, - EnsLookup as EnsLookupTrait, PruneReporter, PruneRequest, SubgraphFork, + self, BlockStore, DeploymentLocator, EnsLookup as EnsLookupTrait, PruneReporter, + PruneRequest, SubgraphFork, }, }, constraint_violation, @@ -530,16 +530,6 @@ impl SubgraphStoreInner { let exists = self.deployment_exists(deployment_hash)?; let graft_base = deployment.graft_base.as_ref(); - let schema_version = if exists { - let layout = self.layout(deployment_hash)?; - layout.site.schema_version - } else if let Some(graft_base) = graft_base { - let layout = self.layout(graft_base)?; - layout.site.schema_version - } else { - DeploymentSchemaVersion::LATEST - }; - let (site, node_id) = { // We need to deal with two situations: // (1) We are really creating a new subgraph; it therefore needs @@ -553,7 +543,7 @@ impl SubgraphStoreInner { // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id)?; let conn = self.primary_conn()?; - let site = conn.allocate_site(shard, schema.id(), network_name, schema_version)?; + let site = conn.allocate_site(shard, schema.id(), network_name, graft_base)?; let node_id = conn.assigned_node(&site)?.unwrap_or(node_id); (site, node_id) }; @@ -1469,6 +1459,12 @@ impl SubgraphStoreTrait for SubgraphStore { } } + fn graft_pending(&self, id: &DeploymentHash) -> Result { + let (store, _) = self.store(id)?; + let graft_detail = store.graft_pending(id)?; + Ok(graft_detail.is_some()) + } + async fn least_block_ptr(&self, id: &DeploymentHash) -> Result, StoreError> { let (store, site) = self.store(id)?; store.block_ptr(site.cheap_clone()).await From 5062fe8a0b5f124c9864bb89f377f9e6ad26bef0 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 20 Jun 2023 00:02:35 +0530 Subject: [PATCH 3/4] store : move subgraph exist logic into allocate_site --- core/src/subgraph/registrar.rs | 4 ++-- store/postgres/src/deployment_store.rs | 5 ----- store/postgres/src/primary.rs | 9 +++++++-- store/postgres/src/subgraph_store.rs | 20 ++++---------------- 4 files changed, 13 insertions(+), 25 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index fdaea07c98f..6c7f69512a2 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -575,9 +575,9 @@ async fn create_subgraph_version( .map_err(SubgraphRegistrarError::ResolveError) .await?; let exists = store.is_deployed(&deployment)?; - let graft_pending = store.graft_pending(&deployment)?; + let should_validate = !exists || store.graft_pending(&deployment)?; let manifest = unvalidated - .validate(store.cheap_clone(), !exists || graft_pending) + .validate(store.cheap_clone(), should_validate) .await .map_err(SubgraphRegistrarError::ManifestValidationError)?; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 016991edeaf..7eb66502cd8 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1441,11 +1441,6 @@ impl DeploymentStore { .await } - pub(crate) fn exists(&self, id: Arc) -> Result { - let conn = self.get_conn()?; - deployment::exists(&conn, &id) - } - pub(crate) fn graft_pending( &self, id: &DeploymentHash, diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index 82b226dad07..d206117e9f4 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -1247,17 +1247,21 @@ impl<'a> Connection<'a> { } /// Create a site for a brand new deployment. + /// If it already exists, return the existing site. + /// and a boolean indicating whether a new site was created. + /// false means the site already existed. pub fn allocate_site( &self, shard: Shard, subgraph: &DeploymentHash, network: String, graft_base: Option<&DeploymentHash>, - ) -> Result { + ) -> Result<(Site, bool), StoreError> { if let Some(site) = queries::find_active_site(self.conn.as_ref(), subgraph)? { - return Ok(site); + return Ok((site, false)); } + let site_was_created = true; let schema_version = match graft_base { Some(graft_base) => { let site = queries::find_active_site(self.conn.as_ref(), graft_base)?; @@ -1269,6 +1273,7 @@ impl<'a> Connection<'a> { }?; self.create_site(shard, subgraph.clone(), network, schema_version, true) + .map(|site| (site, site_was_created)) } pub fn assigned_node(&self, site: &Site) -> Result, StoreError> { diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 89b62e8d9bb..ee5330b4bcf 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -424,16 +424,6 @@ impl SubgraphStoreInner { store.find_layout(site) } - pub(crate) fn deployment_exists(&self, id: &DeploymentHash) -> Result { - let (store, site) = match self.store(id) { - Ok(pair) => pair, - Err(StoreError::DeploymentNotFound(_)) => return Ok(false), - Err(err) => return Err(err), - }; - - store.exists(site) - } - fn place_on_node( &self, mut nodes: Vec, @@ -525,12 +515,9 @@ impl SubgraphStoreInner { assert!(!replace); self.evict(schema.id())?; - - let deployment_hash = schema.id(); - let exists = self.deployment_exists(deployment_hash)?; let graft_base = deployment.graft_base.as_ref(); - let (site, node_id) = { + let (site, exists, node_id) = { // We need to deal with two situations: // (1) We are really creating a new subgraph; it therefore needs // to go in the shard and onto the node that the placement @@ -543,9 +530,10 @@ impl SubgraphStoreInner { // the same deployment in another shard let (shard, node_id) = self.place(&name, &network_name, node_id)?; let conn = self.primary_conn()?; - let site = conn.allocate_site(shard, schema.id(), network_name, graft_base)?; + let (site, site_was_created) = + conn.allocate_site(shard, schema.id(), network_name, graft_base)?; let node_id = conn.assigned_node(&site)?.unwrap_or(node_id); - (site, node_id) + (site, !site_was_created, node_id) }; let site = Arc::new(site); From f275665dd81e6c86bc2508b59173d3e98f0c4dfb Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 20 Jun 2023 20:33:23 +0530 Subject: [PATCH 4/4] core: use existence check in graft_pending --- core/src/subgraph/registrar.rs | 12 ++++++++++-- store/postgres/src/primary.rs | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 6c7f69512a2..89498992586 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -574,8 +574,16 @@ async fn create_subgraph_version( ) .map_err(SubgraphRegistrarError::ResolveError) .await?; - let exists = store.is_deployed(&deployment)?; - let should_validate = !exists || store.graft_pending(&deployment)?; + + // Determine if the graft_base should be validated. + // Validate the graft_base if there is a pending graft, ensuring its presence. + // If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated. + // If the subgraph already exists and there is no pending graft, graft_base validation is not required. + let should_validate = match store.graft_pending(&deployment) { + Ok(graft_pending) => graft_pending, + Err(StoreError::DeploymentNotFound(_)) => true, + Err(e) => return Err(SubgraphRegistrarError::StoreError(e)), + }; let manifest = unvalidated .validate(store.cheap_clone(), should_validate) .await diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index d206117e9f4..8cb20ac555e 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -1247,9 +1247,9 @@ impl<'a> Connection<'a> { } /// Create a site for a brand new deployment. - /// If it already exists, return the existing site. + /// If it already exists, return the existing site /// and a boolean indicating whether a new site was created. - /// false means the site already existed. + /// `false` means the site already existed. pub fn allocate_site( &self, shard: Shard,