Skip to content

Commit

Permalink
core, store : Allow redeployment of grafted subgraph even when graft_…
Browse files Browse the repository at this point in the history
…base is not available
  • Loading branch information
incrypto32 committed Jun 14, 2023
1 parent 39094b1 commit fd791dd
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 16 deletions.
6 changes: 3 additions & 3 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,17 +566,17 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
deployment,
deployment.clone(),
raw,
resolver,
logger,
ENV_VARS.max_spec_version.clone(),
)
.map_err(SubgraphRegistrarError::ResolveError)
.await?;

let exists = store.is_deployed(&deployment)?;
let manifest = unvalidated
.validate(store.cheap_clone(), true)
.validate(store.cheap_clone(), !exists)
.await
.map_err(SubgraphRegistrarError::ManifestValidationError)?;

Expand Down
5 changes: 5 additions & 0 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,11 @@ impl DeploymentStore {
.await
}

pub(crate) fn exists(&self, id: Arc<Site>) -> Result<bool, StoreError> {
let conn = self.get_conn()?;
deployment::exists(&conn, &id)
}

pub(crate) fn graft_pending(
&self,
id: &DeploymentHash,
Expand Down
70 changes: 57 additions & 13 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,21 @@ impl SubgraphStoreInner {
store.find_layout(site)
}

pub(crate) fn deployment_exists(&self, id: &DeploymentHash) -> Result<bool, StoreError> {
let (store, site) = match self.store(id) {
Ok(pair) => pair,
Err(err) => {
if let StoreError::DeploymentNotFound(_) = err {
return Ok(false);
} else {
return Err(err);
}
}
};

store.exists(site)
}

fn place_on_node(
&self,
mut nodes: Vec<NodeId>,
Expand Down Expand Up @@ -516,11 +531,8 @@ impl SubgraphStoreInner {

self.evict(schema.id())?;

let graft_base = deployment
.graft_base
.as_ref()
.map(|base| self.layout(base))
.transpose()?;
let (schema_version, exists) =
self.schema_version_for_deployment(schema.id(), deployment.graft_base.as_ref())?;

let (site, node_id) = {
// We need to deal with two situations:
Expand All @@ -534,21 +546,31 @@ impl SubgraphStoreInner {
// assignment that we used last time to avoid creating
// the same deployment in another shard
let (shard, node_id) = self.place(&name, &network_name, node_id)?;
let schema_version = match &graft_base {
None => DeploymentSchemaVersion::LATEST,
Some(src_layout) => src_layout.site.schema_version,
};
let conn = self.primary_conn()?;
let site = conn.allocate_site(shard, schema.id(), network_name, schema_version)?;
let node_id = conn.assigned_node(&site)?.unwrap_or(node_id);
(site, node_id)
};
let site = Arc::new(site);

if let Some(graft_base) = &graft_base {
self.primary_conn()?
.record_active_copy(graft_base.site.as_ref(), site.as_ref())?;
}
// if the deployment already exists, we don't need to perform any copying
// so we can set graft_base to None
// if it doesn't exist, we need to copy the graft base to the new deployment
let graft_base = if !exists {
let graft_base = deployment
.graft_base
.as_ref()
.map(|base| self.layout(base))
.transpose()?;

if let Some(graft_base) = &graft_base {
self.primary_conn()?
.record_active_copy(graft_base.site.as_ref(), site.as_ref())?;
}
graft_base
} else {
None
};

// Create the actual databases schema and metadata entries
let deployment_store = self
Expand Down Expand Up @@ -586,6 +608,28 @@ impl SubgraphStoreInner {
Ok(site.as_ref().into())
}

/// Returns (DeploymentSchemaVersion, bool ) where the bool indicate if the
/// deployment already exist. If the deployment
/// does not exist, returns the schema version of the graft base if
/// present, or the DeploymentSchemaVersion::LATEST otherwise.
fn schema_version_for_deployment(
&self,
deployment: &DeploymentHash,
graft_base: Option<&DeploymentHash>,
) -> Result<(DeploymentSchemaVersion, bool), StoreError> {
let exists = self.deployment_exists(deployment)?;

if exists {
let layout = self.layout(deployment)?;
Ok((layout.site.schema_version, true))
} else if let Some(graft_base) = graft_base {
let layout = self.layout(graft_base)?;
Ok((layout.site.schema_version, false))
} else {
Ok((DeploymentSchemaVersion::LATEST, false))
}
}

pub fn copy_deployment(
&self,
src: &DeploymentLocator,
Expand Down

0 comments on commit fd791dd

Please sign in to comment.