Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow redeployment of grafted subgraph even when graft_base is not available #4695

Merged
merged 4 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,17 +566,17 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
) -> Result<DeploymentLocator, SubgraphRegistrarError> {
let raw_string = serde_yaml::to_string(&raw).unwrap();
let unvalidated = UnvalidatedSubgraphManifest::<C>::resolve(
deployment,
deployment.clone(),
raw,
resolver,
logger,
ENV_VARS.max_spec_version.clone(),
)
.map_err(SubgraphRegistrarError::ResolveError)
.await?;

let exists = store.is_deployed(&deployment)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an fn graft_pending which we perhaps should additionally check, because it checks if the subgraph has actually finished the graft process. It would be nice if we were able to use that here.

let manifest = unvalidated
.validate(store.cheap_clone(), true)
.validate(store.cheap_clone(), !exists)
.await
.map_err(SubgraphRegistrarError::ManifestValidationError)?;

Expand Down
5 changes: 5 additions & 0 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,11 @@ impl DeploymentStore {
.await
}

pub(crate) fn exists(&self, id: Arc<Site>) -> Result<bool, StoreError> {
let conn = self.get_conn()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets use with_conn here to make this async.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leoyvens i tried that initially wouldn't that mean i would have to make all the callers also async function to await on it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was unsure if i should do that, let me know if its ok.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any non-async callers? If there are then we'd need to keep this one and add an async version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fn create_deployment_internal is not async but its callers can be made async ( cause callers all the way is async ). Should i make them all async in that case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now that this is a bigger problem since fn create_deployment_internal is doing many sync DB operations, but is called from an async context. Ok we can leave this exists as sync for now, and later find a solution for the whole function.

deployment::exists(&conn, &id)
}

pub(crate) fn graft_pending(
&self,
id: &DeploymentHash,
Expand Down
54 changes: 41 additions & 13 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,16 @@ impl SubgraphStoreInner {
store.find_layout(site)
}

pub(crate) fn deployment_exists(&self, id: &DeploymentHash) -> Result<bool, StoreError> {
let (store, site) = match self.store(id) {
Ok(pair) => pair,
Err(StoreError::DeploymentNotFound(_)) => return Ok(false),
Err(err) => return Err(err),
};

store.exists(site)
}

fn place_on_node(
&self,
mut nodes: Vec<NodeId>,
Expand Down Expand Up @@ -516,11 +526,19 @@ impl SubgraphStoreInner {

self.evict(schema.id())?;

let graft_base = deployment
.graft_base
.as_ref()
.map(|base| self.layout(base))
.transpose()?;
let deployment_hash = schema.id();
let exists = self.deployment_exists(deployment_hash)?;
let graft_base = deployment.graft_base.as_ref();

let schema_version = if exists {
let layout = self.layout(deployment_hash)?;
layout.site.schema_version
} else if let Some(graft_base) = graft_base {
let layout = self.layout(graft_base)?;
layout.site.schema_version
} else {
DeploymentSchemaVersion::LATEST
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could avoid the need for a schema_version variable here by having fn allocate_site take a graft_base parameter instead of schema_version, and do the logic there.

fn allocate_site already checks for existence, and if it needs to search for the graft base I believe it could use fn find_active_site.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, will make that change


let (site, node_id) = {
// We need to deal with two situations:
Expand All @@ -534,21 +552,31 @@ impl SubgraphStoreInner {
// assignment that we used last time to avoid creating
// the same deployment in another shard
let (shard, node_id) = self.place(&name, &network_name, node_id)?;
let schema_version = match &graft_base {
None => DeploymentSchemaVersion::LATEST,
Some(src_layout) => src_layout.site.schema_version,
};
let conn = self.primary_conn()?;
let site = conn.allocate_site(shard, schema.id(), network_name, schema_version)?;
let node_id = conn.assigned_node(&site)?.unwrap_or(node_id);
(site, node_id)
};
let site = Arc::new(site);

if let Some(graft_base) = &graft_base {
self.primary_conn()?
.record_active_copy(graft_base.site.as_ref(), site.as_ref())?;
}
// if the deployment already exists, we don't need to perform any copying
// so we can set graft_base to None
// if it doesn't exist, we need to copy the graft base to the new deployment
let graft_base = if !exists {
let graft_base = deployment
.graft_base
.as_ref()
.map(|base| self.layout(base))
.transpose()?;

if let Some(graft_base) = &graft_base {
self.primary_conn()?
.record_active_copy(graft_base.site.as_ref(), site.as_ref())?;
}
graft_base
} else {
None
};

// Create the actual databases schema and metadata entries
let deployment_store = self
Expand Down