Skip to content

Commit

Permalink
store: Copy metadata across shards and allow cross-shard grafting
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Mar 24, 2021
1 parent ecb0a28 commit 7d1ba4d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 44 deletions.
38 changes: 19 additions & 19 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl DeploymentStore {
schema: &Schema,
deployment: SubgraphDeploymentEntity,
site: Arc<Site>,
graft_site: Option<Arc<Site>>,
graft_base: Option<Arc<Layout>>,
replace: bool,
) -> Result<(), StoreError> {
let conn = self.get_conn()?;
Expand All @@ -178,9 +178,7 @@ impl DeploymentStore {

let layout = Layout::create_relational_schema(&conn, site.clone(), schema)?;
// See if we are grafting and check that the graft is permissible
if let Some(graft_site) = graft_site {
let base =
self.layout(&conn, graft_site)?;
if let Some(base) = graft_base {
let errors = layout.can_copy_from(&base);
if !errors.is_empty() {
return Err(StoreError::Unknown(anyhow!(
Expand Down Expand Up @@ -466,7 +464,7 @@ impl DeploymentStore {
}
}

/// Return the layout for the subgraph. Since constructing a `Layout`
/// Return the layout for a deployment. Since constructing a `Layout`
/// object takes a bit of computation, we cache layout objects that do
/// not have a pending migration in the Store, i.e., for the lifetime of
/// the Store. Layout objects with a pending migration can not be
Expand Down Expand Up @@ -501,6 +499,18 @@ impl DeploymentStore {
Ok(layout.clone())
}

/// Return the layout for a deployment. This might use a database
/// connection for the lookup and should only be called if the caller
/// does not have a connection currently. If it does, use `layout`
pub(crate) fn find_layout(&self, site: Arc<Site>) -> Result<Arc<Layout>, StoreError> {
if let Some(layout) = self.layout_cache.lock().unwrap().get(&site.deployment) {
return Ok(layout.clone());
}

let conn = self.get_conn()?;
self.layout(&conn, site)
}

fn subgraph_info_with_conn(
&self,
conn: &PgConnection,
Expand Down Expand Up @@ -1028,22 +1038,12 @@ impl DeploymentStore {
&self,
logger: &Logger,
site: Arc<Site>,
graft_src: Option<(Arc<Site>, EthereumBlockPointer)>,
graft_src: Option<(Arc<Layout>, EthereumBlockPointer)>,
) -> Result<(), StoreError> {
let (dst, graft_info) = {
let conn = self.get_conn()?;
let dst = self.layout(&conn, site.clone())?;
match graft_src {
Some((src, block)) => {
let src = self.layout(&conn, src.clone())?;
(dst, Some((src, block)))
}
None => (dst, None),
}
};
let dst = self.find_layout(site)?;

// Do any cleanup to bring the subgraph into a known good state
if let Some((src, block)) = graft_info {
if let Some((src, block)) = graft_src {
let start = Instant::now();

info!(
Expand All @@ -1064,7 +1064,7 @@ impl DeploymentStore {
let conn = self.get_conn()?;
conn.transaction(|| -> Result<(), StoreError> {
// 2. Copy dynamic data sources and adjust their ID
let count = dynds::copy(&conn, &src.site.deployment, &dst.site.deployment)?;
let count = dynds::copy(&conn, &src.site, &dst.site)?;
info!(logger, "Copied {} dynamic data sources", count;
"time_ms" => start.elapsed().as_millis());

Expand Down
32 changes: 20 additions & 12 deletions store/postgres/src/dynds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use graph::{
},
};

use crate::connection_pool::ForeignServer;
use crate::primary::Site;

table! {
subgraphs.dynamic_ethereum_contract_data_source (vid) {
vid -> BigInt,
Expand Down Expand Up @@ -157,24 +160,29 @@ pub(crate) fn insert(
.map_err(|e| e.into())
}

pub(crate) fn copy(
conn: &PgConnection,
src: &SubgraphDeploymentId,
dst: &SubgraphDeploymentId,
) -> Result<usize, StoreError> {
const QUERY: &str = "\
pub(crate) fn copy(conn: &PgConnection, src: &Site, dst: &Site) -> Result<usize, StoreError> {
let src_nsp = if src.shard == dst.shard {
"subgraphs".to_string()
} else {
ForeignServer::metadata_schema(&src.shard)
};

let query = format!(
"\
insert into subgraphs.dynamic_ethereum_contract_data_source(name,
address, abi, start_block, ethereum_block_hash,
ethereum_block_number, deployment, context)
select e.name, e.address, e.abi, e.start_block,
e.ethereum_block_hash, e.ethereum_block_number, $2 as deployment,
e.context
from subgraphs.dynamic_ethereum_contract_data_source e
where e.deployment = $1";

Ok(sql_query(QUERY)
.bind::<Text, _>(src.as_str())
.bind::<Text, _>(dst.as_str())
from {src_nsp}.dynamic_ethereum_contract_data_source e
where e.deployment = $1",
src_nsp = src_nsp
);

Ok(sql_query(&query)
.bind::<Text, _>(src.deployment.as_str())
.bind::<Text, _>(dst.deployment.as_str())
.execute(conn)?)
}

Expand Down
28 changes: 15 additions & 13 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use graph::{
};
use store::StoredDynamicDataSource;

use crate::{connection_pool::ConnectionPool, primary, primary::Site};
use crate::{connection_pool::ConnectionPool, primary, primary::Site, relational::Layout};
use crate::{
deployment_store::{DeploymentStore, ReplicaId},
detail::DeploymentDetail,
Expand Down Expand Up @@ -272,6 +272,11 @@ impl SubgraphStore {
Ok((store, site))
}

fn layout(&self, id: &SubgraphDeploymentId) -> Result<Arc<Layout>, StoreError> {
let (store, site) = self.store(id)?;
store.find_layout(site)
}

fn place(
&self,
name: &SubgraphName,
Expand Down Expand Up @@ -330,26 +335,23 @@ impl SubgraphStore {
.allocate_site(shard.clone(), &schema.id, network_name)?;
let site = Arc::new(site);

let graft_site = deployment
let graft_base = deployment
.graft_base
.as_ref()
.map(|base| self.primary_conn()?.find_existing_site(&base))
.transpose()?
.map(|site| Arc::new(site));
if let Some(ref graft_site) = graft_site {
if &graft_site.shard != &shard {
return Err(constraint_violation!("Can not graft across shards. {} is in shard {}, and the base {} is in shard {}", site.deployment, site.shard, graft_site.deployment, graft_site.shard));
}
.map(|base| self.layout(base))
.transpose()?;

if let Some(graft_base) = &graft_base {
self.primary_conn()?
.record_active_copy(graft_site.as_ref(), site.as_ref())?;
.record_active_copy(graft_base.site.as_ref(), site.as_ref())?;
}

// Create the actual databases schema and metadata entries
let deployment_store = self
.stores
.get(&shard)
.ok_or_else(|| StoreError::UnknownShard(shard.to_string()))?;
deployment_store.create_deployment(schema, deployment, site, graft_site, replace)?;
deployment_store.create_deployment(schema, deployment, site, graft_base, replace)?;

let exists_and_synced = |id: &SubgraphDeploymentId| {
let (store, _) = self.store(id)?;
Expand Down Expand Up @@ -774,8 +776,8 @@ impl SubgraphStoreTrait for SubgraphStore {

let graft_base = match store.graft_pending(id)? {
Some((base_id, base_ptr)) => {
let src = self.primary_conn()?.find_existing_site(&base_id)?;
Some((Arc::new(src), base_ptr))
let src = self.layout(&base_id)?;
Some((src, base_ptr))
}
None => None,
};
Expand Down

0 comments on commit 7d1ba4d

Please sign in to comment.