diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 8b138d91185..31345910449 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -1106,7 +1106,8 @@ pub trait PruneReporter: Send + 'static { fn start_switch(&mut self) {} fn copy_nonfinal_start(&mut self, table: &str) {} - fn copy_nonfinal_finish(&mut self, table: &str, rows: usize) {} + fn copy_nonfinal_batch(&mut self, table: &str, rows: usize, total_rows: usize, finished: bool) { + } fn finish_switch(&mut self) {} fn finish_prune(&mut self) {} diff --git a/node/src/manager/commands/prune.rs b/node/src/manager/commands/prune.rs index e635df7775e..95676b7c3ee 100644 --- a/node/src/manager/commands/prune.rs +++ b/node/src/manager/commands/prune.rs @@ -22,8 +22,8 @@ struct Progress { start: Instant, analyze_start: Instant, switch_start: Instant, + table_start: Instant, final_start: Instant, - final_table_start: Instant, nonfinal_start: Instant, } @@ -34,7 +34,7 @@ impl Progress { analyze_start: Instant::now(), switch_start: Instant::now(), final_start: Instant::now(), - final_table_start: Instant::now(), + table_start: Instant::now(), nonfinal_start: Instant::now(), } } @@ -82,14 +82,14 @@ impl PruneReporter for Progress { print_copy_header(); self.final_start = Instant::now(); - self.final_table_start = self.final_start; + self.table_start = self.final_start; } fn copy_final_batch(&mut self, table: &str, _rows: usize, total_rows: usize, finished: bool) { - print_copy_row(table, total_rows, self.final_table_start.elapsed()); + print_copy_row(table, total_rows, self.table_start.elapsed()); if finished { println!(""); - self.final_table_start = Instant::now(); + self.table_start = Instant::now(); } std::io::stdout().flush().ok(); } @@ -119,9 +119,18 @@ impl PruneReporter for Progress { self.nonfinal_start = Instant::now(); } - fn copy_nonfinal_finish(&mut self, table: &str, rows: usize) { - print_copy_row(table, rows, self.nonfinal_start.elapsed()); - println!(""); + fn copy_nonfinal_batch( + &mut self, + table: &str, + _rows: usize, + total_rows: usize, + finished: bool, + ) { + print_copy_row(table, total_rows, self.table_start.elapsed()); + if finished { + println!(""); + self.table_start = Instant::now(); + } std::io::stdout().flush().ok(); } diff --git a/store/postgres/src/advisory_lock.rs b/store/postgres/src/advisory_lock.rs index 3014b810bbc..674b96809b5 100644 --- a/store/postgres/src/advisory_lock.rs +++ b/store/postgres/src/advisory_lock.rs @@ -14,6 +14,7 @@ //! * 2, n: to lock the deployment with id n to make sure only one write //! happens to it +use diesel::sql_types::Bool; use diesel::{sql_query, PgConnection, RunQueryDsl}; use graph::prelude::StoreError; @@ -47,8 +48,34 @@ pub(crate) fn unlock_copying(conn: &PgConnection, dst: &Site) -> Result<(), Stor .map_err(StoreError::from) } -pub(crate) fn lock_deployment_xact(conn: &PgConnection, site: &Site) -> Result<(), StoreError> { - sql_query(&format!("select pg_advisory_xact_lock(2, {})", site.id)) +/// Try to lock deployment `site` with a session lock. Return `true` if we +/// got the lock, and `false` if we did not. You don't want to use this +/// directly. Instead, use `deployment::with_lock` +pub(crate) fn lock_deployment_session( + conn: &PgConnection, + site: &Site, +) -> Result { + #[derive(QueryableByName)] + struct Locked { + #[sql_type = "Bool"] + locked: bool, + } + + sql_query(&format!( + "select pg_try_advisory_lock(2, {}) as locked", + site.id + )) + .get_result::(conn) + .map(|res| res.locked) + .map_err(StoreError::from) +} + +/// Release the lock acquired with `lock_deployment_session`. +pub(crate) fn unlock_deployment_session( + conn: &PgConnection, + site: &Site, +) -> Result<(), StoreError> { + sql_query(&format!("select pg_advisory_unlock(2, {})", site.id)) .execute(conn) .map(|_| ()) .map_err(StoreError::from) diff --git a/store/postgres/src/catalog.rs b/store/postgres/src/catalog.rs index 3bf19c8d0b5..63ac619b33d 100644 --- a/store/postgres/src/catalog.rs +++ b/store/postgres/src/catalog.rs @@ -202,49 +202,6 @@ pub fn supports_proof_of_indexing( table_exists(conn, namespace.as_str(), &POI_TABLE_NAME) } -/// Whether the given table has an exclusion constraint. When we create -/// tables, they either have an exclusion constraint on `(id, block_range)`, -/// or just a GIST index on those columns. If this returns `true`, there is -/// an exclusion constraint on the table, if it returns `false` we only have -/// an index. -/// -/// This function only checks whether there is some exclusion constraint on -/// the table since checking fully if that is exactly the constraint we -/// think it is is a bit more complex. But if the table is part of a -/// deployment that we created, the conclusions in hte previous paragraph -/// are true. -pub fn has_exclusion_constraint( - conn: &PgConnection, - namespace: &Namespace, - table: &SqlName, -) -> Result { - #[derive(Debug, QueryableByName)] - struct Row { - #[sql_type = "Bool"] - #[allow(dead_code)] - uses_excl: bool, - } - - let query = " - select count(*) > 0 as uses_excl - from pg_catalog.pg_constraint con, - pg_catalog.pg_class rel, - pg_catalog.pg_namespace nsp - where rel.oid = con.conrelid - and nsp.oid = con.connamespace - and con.contype = 'x' - and nsp.nspname = $1 - and rel.relname = $2; - "; - - sql_query(query) - .bind::(namespace) - .bind::(table.as_str()) - .get_result::(conn) - .map_err(StoreError::from) - .map(|row| row.uses_excl) -} - pub fn current_servers(conn: &PgConnection) -> Result, StoreError> { #[derive(QueryableByName)] struct Srv { @@ -323,6 +280,12 @@ pub fn recreate_schema(conn: &PgConnection, nsp: &str) -> Result<(), StoreError> Ok(conn.batch_execute(&query)?) } +/// Drop the schema `nsp` and all its contents if it exists +pub fn drop_schema(conn: &PgConnection, nsp: &str) -> Result<(), StoreError> { + let query = format!("drop schema if exists {nsp} cascade;", nsp = nsp); + Ok(conn.batch_execute(&query)?) +} + pub fn migration_count(conn: &PgConnection) -> Result { use __diesel_schema_migrations as m; diff --git a/store/postgres/src/deployment.rs b/store/postgres/src/deployment.rs index c73f4999ae4..f82bda17e34 100644 --- a/store/postgres/src/deployment.rs +++ b/store/postgres/src/deployment.rs @@ -13,17 +13,20 @@ use diesel::{ sql_query, sql_types::{Nullable, Text}, }; -use graph::data::subgraph::{ - schema::{DeploymentCreate, SubgraphManifestEntity}, - SubgraphFeature, -}; use graph::prelude::{ anyhow, bigdecimal::ToPrimitive, hex, web3::types::H256, BigDecimal, BlockNumber, BlockPtr, DeploymentHash, DeploymentState, Schema, StoreError, }; use graph::{blockchain::block_stream::FirehoseCursor, data::subgraph::schema::SubgraphError}; +use graph::{ + data::subgraph::{ + schema::{DeploymentCreate, SubgraphManifestEntity}, + SubgraphFeature, + }, + util::backoff::ExponentialBackoff, +}; use stable_hash_legacy::crypto::SetHasher; -use std::{collections::BTreeSet, convert::TryFrom, ops::Bound}; +use std::{collections::BTreeSet, convert::TryFrom, ops::Bound, time::Duration}; use std::{str::FromStr, sync::Arc}; use crate::connection_pool::ForeignServer; @@ -1047,11 +1050,20 @@ pub fn set_earliest_block( Ok(()) } -/// Lock the deployment `site` for writes for the remainder of the current -/// transaction. This lock is used to coordinate the changes that the -/// subgraph writer makes with changes that other parts of the system, in -/// particular, pruning make +/// Lock the deployment `site` for writes while `f` is running. The lock can +/// cross transactions, and `f` can therefore execute multiple transactions +/// while other write activity for that deployment is locked out. Block the +/// current thread until we can acquire the lock. // see also: deployment-lock-for-update -pub fn lock(conn: &PgConnection, site: &Site) -> Result<(), StoreError> { - advisory_lock::lock_deployment_xact(conn, site) +pub fn with_lock(conn: &PgConnection, site: &Site, f: F) -> Result +where + F: FnOnce() -> Result, +{ + let mut backoff = ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(15)); + while !advisory_lock::lock_deployment_session(conn, site)? { + backoff.sleep(); + } + let res = f(); + advisory_lock::unlock_deployment_session(conn, site)?; + res } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index cd8d23dc834..eb44c4c5a64 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1065,58 +1065,57 @@ impl DeploymentStore { self.get_conn()? }; - let event = conn.transaction(|| -> Result<_, StoreError> { - // Emit a store event for the changes we are about to make. We - // wait with sending it until we have done all our other work - // so that we do not hold a lock on the notification queue - // for longer than we have to - let event: StoreEvent = StoreEvent::from_mods(&site.deployment, mods); - - // Make the changes - let layout = self.layout(&conn, site.clone())?; - - // see also: deployment-lock-for-update - deployment::lock(&conn, &site)?; - - let section = stopwatch.start_section("apply_entity_modifications"); - let count = self.apply_entity_modifications( - &conn, - layout.as_ref(), - mods, - block_ptr_to, - stopwatch, - )?; - section.end(); - - dynds::insert( - &conn, - &site, - data_sources, - block_ptr_to, - manifest_idx_and_name, - )?; - - dynds::update_offchain_status(&conn, &site, processed_data_sources)?; + let event = deployment::with_lock(&conn, &site, || { + conn.transaction(|| -> Result<_, StoreError> { + // Emit a store event for the changes we are about to make. We + // wait with sending it until we have done all our other work + // so that we do not hold a lock on the notification queue + // for longer than we have to + let event: StoreEvent = StoreEvent::from_mods(&site.deployment, mods); + + // Make the changes + let layout = self.layout(&conn, site.clone())?; + + let section = stopwatch.start_section("apply_entity_modifications"); + let count = self.apply_entity_modifications( + &conn, + layout.as_ref(), + mods, + block_ptr_to, + stopwatch, + )?; + section.end(); - if !deterministic_errors.is_empty() { - deployment::insert_subgraph_errors( + dynds::insert( &conn, - &site.deployment, - deterministic_errors, - block_ptr_to.block_number(), + &site, + data_sources, + block_ptr_to, + manifest_idx_and_name, )?; - } - deployment::transact_block( - &conn, - &site, - block_ptr_to, - firehose_cursor, - layout.count_query.as_str(), - count, - )?; + dynds::update_offchain_status(&conn, &site, processed_data_sources)?; - Ok(event) + if !deterministic_errors.is_empty() { + deployment::insert_subgraph_errors( + &conn, + &site.deployment, + deterministic_errors, + block_ptr_to.block_number(), + )?; + } + + deployment::transact_block( + &conn, + &site, + block_ptr_to, + firehose_cursor, + layout.count_query.as_str(), + count, + )?; + + Ok(event) + }) })?; Ok(event) @@ -1129,50 +1128,54 @@ impl DeploymentStore { block_ptr_to: BlockPtr, firehose_cursor: &FirehoseCursor, ) -> Result { - let event = conn.transaction(|| -> Result<_, StoreError> { - // see also: deployment-lock-for-update - deployment::lock(conn, &site)?; - - // Don't revert past a graft point - let info = self.subgraph_info_with_conn(conn, site.as_ref())?; - if let Some(graft_block) = info.graft_block { - if graft_block > block_ptr_to.number { - return Err(anyhow!( - "Can not revert subgraph `{}` to block {} as it was \ + let event = deployment::with_lock(conn, &site, || { + conn.transaction(|| -> Result<_, StoreError> { + // Don't revert past a graft point + let info = self.subgraph_info_with_conn(conn, site.as_ref())?; + if let Some(graft_block) = info.graft_block { + if graft_block > block_ptr_to.number { + return Err(anyhow!( + "Can not revert subgraph `{}` to block {} as it was \ grafted at block {} and reverting past a graft point \ is not possible", - site.deployment.clone(), - block_ptr_to.number, - graft_block - ) - .into()); + site.deployment.clone(), + block_ptr_to.number, + graft_block + ) + .into()); + } } - } - // The revert functions want the number of the first block that we need to get rid of - let block = block_ptr_to.number + 1; + // The revert functions want the number of the first block that we need to get rid of + let block = block_ptr_to.number + 1; - deployment::revert_block_ptr(conn, &site.deployment, block_ptr_to, firehose_cursor)?; + deployment::revert_block_ptr( + conn, + &site.deployment, + block_ptr_to, + firehose_cursor, + )?; - // Revert the data - let layout = self.layout(conn, site.clone())?; + // Revert the data + let layout = self.layout(conn, site.clone())?; - let (event, count) = layout.revert_block(conn, block)?; + let (event, count) = layout.revert_block(conn, block)?; - // Revert the meta data changes that correspond to this subgraph. - // Only certain meta data changes need to be reverted, most - // importantly creation of dynamic data sources. We ensure in the - // rest of the code that we only record history for those meta data - // changes that might need to be reverted - Layout::revert_metadata(&conn, &site, block)?; + // Revert the meta data changes that correspond to this subgraph. + // Only certain meta data changes need to be reverted, most + // importantly creation of dynamic data sources. We ensure in the + // rest of the code that we only record history for those meta data + // changes that might need to be reverted + Layout::revert_metadata(&conn, &site, block)?; - deployment::update_entity_count( - conn, - site.as_ref(), - layout.count_query.as_str(), - count, - )?; - Ok(event) + deployment::update_entity_count( + conn, + site.as_ref(), + layout.count_query.as_str(), + count, + )?; + Ok(event) + }) })?; Ok(event) diff --git a/store/postgres/src/primary.rs b/store/postgres/src/primary.rs index c0542a6b518..3e4639d48bc 100644 --- a/store/postgres/src/primary.rs +++ b/store/postgres/src/primary.rs @@ -239,6 +239,10 @@ impl Namespace { Ok(Namespace(s)) } + pub fn prune(id: DeploymentId) -> Self { + Namespace(format!("prune{id}")) + } + pub fn as_str(&self) -> &str { &self.0 } diff --git a/store/postgres/src/relational/ddl.rs b/store/postgres/src/relational/ddl.rs index 0a4a235823c..4c9d6a18927 100644 --- a/store/postgres/src/relational/ddl.rs +++ b/store/postgres/src/relational/ddl.rs @@ -30,7 +30,7 @@ impl Layout { tables.sort_by_key(|table| table.position); // Output 'create table' statements for all tables for table in tables { - table.as_ddl(&mut out, self)?; + table.as_ddl(&mut out)?; } Ok(out) @@ -73,7 +73,7 @@ impl Table { } // Changes to this function require changing `column_names`, too - pub(crate) fn create_table(&self, out: &mut String, layout: &Layout) -> fmt::Result { + pub(crate) fn create_table(&self, out: &mut String) -> fmt::Result { fn columns_ddl(table: &Table) -> Result { let mut cols = String::new(); let mut first = true; @@ -94,15 +94,14 @@ impl Table { writeln!( out, r#" - create table {nsp}.{name} ( + create table {qname} ( {vid} bigserial primary key, {block} int not null, {cols}, unique({id}) ); "#, - nsp = layout.catalog.site.namespace, - name = self.name.quoted(), + qname = self.qualified_name, cols = columns_ddl(self)?, vid = VID_COLUMN, block = BLOCK_COLUMN, @@ -112,100 +111,31 @@ impl Table { writeln!( out, r#" - create table {nsp}.{name} ( + create table {qname} ( {vid} bigserial primary key, {block_range} int4range not null, {cols} ); "#, - nsp = layout.catalog.site.namespace, - name = self.name.quoted(), + qname = self.qualified_name, cols = columns_ddl(self)?, vid = VID_COLUMN, block_range = BLOCK_RANGE_COLUMN )?; - self.exclusion_ddl( - out, - layout.catalog.site.namespace.as_str(), - Catalog::create_exclusion_constraint(), - ) - } - } - - /// Generate SQL that renames the table `from` to `to`. The code assumes - /// that `from` has been created with `create_table`, but that no other - /// indexes have been created on it, and also renames any indexes and - /// constraints, so that the database, after running the generated SQL, - /// will have the exact same table as if `from.create_sql` had been run - /// initially. - /// - /// The code does not do anything about the sequence for `vid`; it is - /// assumed that both `from` and `to` already use the same sequence for - /// that - /// - /// For mutable tables, if `has_exclusion_constraint` is true, assume - /// there is an exclusion constraint on `(id, block_range)`; if it is - /// false, assume there is a GIST index on those columns. For immutbale - /// tables, this argument has no effect. It is assumed that the name of - /// the constraint or index is what `create_table` uses for these. - pub(crate) fn rename_sql( - out: &mut String, - layout: &Layout, - from: &Table, - to: &Table, - has_exclusion_constraint: bool, - ) -> fmt::Result { - let nsp = layout.site.namespace.as_str(); - let from_name = &from.name; - let to_name = &to.name; - let id = &to.primary_key().name; - - writeln!( - out, - r#"alter table "{nsp}"."{from_name}" rename to "{to_name}";"# - )?; - - writeln!( - out, - r#"alter table "{nsp}"."{to_name}" rename constraint "{from_name}_pkey" to "{to_name}_pkey";"# - )?; - - if to.immutable { - writeln!( - out, - r#"alter table "{nsp}"."{to_name}" rename constraint "{from_name}_{id}_key" to "{to_name}_{id}_key";"# - )?; - } else { - if has_exclusion_constraint { - writeln!( - out, - r#"alter table "{nsp}"."{to_name}" rename constraint "{from_name}_{id}_{BLOCK_RANGE_COLUMN}_excl" to "{to_name}_{id}_{BLOCK_RANGE_COLUMN}_excl";"# - )?; - } else { - writeln!( - out, - r#"alter index "{nsp}"."{from_name}_{id}_{BLOCK_RANGE_COLUMN}_excl" rename to "{to_name}_{id}_{BLOCK_RANGE_COLUMN}_excl";"# - )?; - } + self.exclusion_ddl(out, Catalog::create_exclusion_constraint()) } - - Ok(()) } - pub(crate) fn create_time_travel_indexes( - &self, - out: &mut String, - layout: &Layout, - ) -> fmt::Result { + fn create_time_travel_indexes(&self, out: &mut String) -> fmt::Result { if self.immutable { write!( out, "create index brin_{table_name}\n \ - on {schema_name}.{table_name}\n \ + on {qname}\n \ using brin({block}, vid);\n", table_name = self.name, - schema_name = layout.catalog.site.namespace, + qname = self.qualified_name, block = BLOCK_COLUMN ) } else { @@ -231,10 +161,10 @@ impl Table { // We also index `vid` as that correlates with the order in which // entities are stored. write!(out,"create index brin_{table_name}\n \ - on {schema_name}.{table_name}\n \ + on {qname}\n \ using brin(lower(block_range), coalesce(upper(block_range), {block_max}), vid);\n", table_name = self.name, - schema_name = layout.catalog.site.namespace, + qname = self.qualified_name, block_max = BLOCK_NUMBER_MAX)?; // Add a BTree index that helps with the `RevertClampQuery` by making @@ -242,20 +172,16 @@ impl Table { write!( out, "create index {table_name}_block_range_closed\n \ - on {schema_name}.{table_name}(coalesce(upper(block_range), {block_max}))\n \ + on {qname}(coalesce(upper(block_range), {block_max}))\n \ where coalesce(upper(block_range), {block_max}) < {block_max};\n", table_name = self.name, - schema_name = layout.catalog.site.namespace, + qname = self.qualified_name, block_max = BLOCK_NUMBER_MAX ) } } - pub(crate) fn create_attribute_indexes( - &self, - out: &mut String, - layout: &Layout, - ) -> fmt::Result { + fn create_attribute_indexes(&self, out: &mut String) -> fmt::Result { // Create indexes. Skip columns whose type is an array of enum, // since there is no good way to index them with Postgres 9.6. // Once we move to Postgres 11, we can enable that @@ -316,12 +242,12 @@ impl Table { }; write!( out, - "create index attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {schema_name}.\"{table_name}\" using {method}({index_expr});\n", + "create index attr_{table_index}_{column_index}_{table_name}_{column_name}\n on {qname} using {method}({index_expr});\n", table_index = self.position, table_name = self.name, column_index = i, column_name = column.name, - schema_name = layout.catalog.site.namespace, + qname = self.qualified_name, method = method, index_expr = index_expr, )?; @@ -334,21 +260,21 @@ impl Table { /// /// See the unit tests at the end of this file for the actual DDL that /// gets generated - fn as_ddl(&self, out: &mut String, layout: &Layout) -> fmt::Result { - self.create_table(out, layout)?; - self.create_time_travel_indexes(out, layout)?; - self.create_attribute_indexes(out, layout) + pub(crate) fn as_ddl(&self, out: &mut String) -> fmt::Result { + self.create_table(out)?; + self.create_time_travel_indexes(out)?; + self.create_attribute_indexes(out) } - pub fn exclusion_ddl(&self, out: &mut String, nsp: &str, as_constraint: bool) -> fmt::Result { + pub fn exclusion_ddl(&self, out: &mut String, as_constraint: bool) -> fmt::Result { if as_constraint { writeln!( out, r#" - alter table {nsp}.{name} + alter table {qname} add constraint {bare_name}_{id}_{block_range}_excl exclude using gist ({id} with =, {block_range} with &&); "#, - name = self.name.quoted(), + qname = self.qualified_name, bare_name = self.name, id = self.primary_key().name, block_range = BLOCK_RANGE_COLUMN @@ -357,10 +283,10 @@ impl Table { writeln!( out, r#" - create index {bare_name}_{id}_{block_range}_excl on {nsp}.{name} + create index {bare_name}_{id}_{block_range}_excl on {qname} using gist ({id}, {block_range}); "#, - name = self.name.quoted(), + qname = self.qualified_name, bare_name = self.name, id = self.primary_key().name, block_range = BLOCK_RANGE_COLUMN diff --git a/store/postgres/src/relational/ddl_tests.rs b/store/postgres/src/relational/ddl_tests.rs index 97bf271e457..abcf8f1e595 100644 --- a/store/postgres/src/relational/ddl_tests.rs +++ b/store/postgres/src/relational/ddl_tests.rs @@ -41,20 +41,20 @@ fn table_is_sane() { assert!(table.column(&bad_sql_name).is_none()); } -#[test] -fn generate_ddl() { - // Check that the two strings are the same after replacing runs of - // whitespace with a single space - #[track_caller] - fn check_eqv(left: &str, right: &str) { - let left_s = left.split_whitespace().join(" "); - let right_s = right.split_whitespace().join(" "); - if left_s != right_s { - // Make sure the original strings show up in the error message - assert_eq!(left, right); - } +// Check that the two strings are the same after replacing runs of +// whitespace with a single space +#[track_caller] +fn check_eqv(left: &str, right: &str) { + let left_s = left.split_whitespace().join(" "); + let right_s = right.split_whitespace().join(" "); + if left_s != right_s { + // Make sure the original strings show up in the error message + assert_eq!(left, right); } +} +#[test] +fn generate_ddl() { let layout = test_layout(THING_GQL); let sql = layout.as_ddl().expect("Failed to generate DDL"); check_eqv(THING_DDL, &sql); @@ -86,16 +86,22 @@ fn exlusion_ddl() { // When `as_constraint` is false, just create an index let mut out = String::new(); table - .exclusion_ddl(&mut out, "sgd0815", false) + .exclusion_ddl(&mut out, false) .expect("can write exclusion DDL"); - assert_eq!("create index thing_id_block_range_excl on sgd0815.\"thing\"\n using gist (id, block_range);", out.trim()); + check_eqv( + r#"create index thing_id_block_range_excl on "sgd0815"."thing" using gist (id, block_range);"#, + out.trim(), + ); // When `as_constraint` is true, add an exclusion constraint let mut out = String::new(); table - .exclusion_ddl(&mut out, "sgd0815", true) + .exclusion_ddl(&mut out, true) .expect("can write exclusion DDL"); - assert_eq!("alter table sgd0815.\"thing\"\n add constraint thing_id_block_range_excl exclude using gist (id with =, block_range with &&);", out.trim()); + check_eqv( + r#"alter table "sgd0815"."thing" add constraint thing_id_block_range_excl exclude using gist (id with =, block_range with &&);"#, + out.trim(), + ); } #[test] @@ -173,38 +179,7 @@ fn can_copy_from() { ); } -#[test] -fn replace_sql() { - const REPLACE_BAND: &str = "\ - alter table \"sgd0815\".\"band_r$\" rename to \"band\";\n\ - alter table \"sgd0815\".\"band\" rename constraint \"band_r$_pkey\" to \"band_pkey\";\n\ - alter table \"sgd0815\".\"band\" rename constraint \"band_r$_id_block_range_excl\" to \"band_id_block_range_excl\";"; - - const REPLACE_SONG: &str = "\ - alter table \"sgd0815\".\"song_r$\" rename to \"song\";\n\ - alter table \"sgd0815\".\"song\" rename constraint \"song_r$_pkey\" to \"song_pkey\";\n\ - alter table \"sgd0815\".\"song\" rename constraint \"song_r$_id_key\" to \"song_id_key\";"; - - fn check(exp: &str, object: &str) { - let layout = test_layout(MUSIC_GQL); - let src = layout - .table_for_entity(&EntityType::new(object.to_string())) - .unwrap(); - let dst = src.new_like(&layout.site.namespace, &SqlName(format!("{}_r$", src.name))); - - let mut out = String::new(); - Table::rename_sql(&mut out, &layout, &dst, &src, true) - .expect("generating rename_sql works"); - assert_eq!(exp, out.trim()); - src.create_table(&mut out, &layout).unwrap(); - dst.create_table(&mut out, &layout).unwrap(); - } - - check(REPLACE_BAND, "Band"); - check(REPLACE_SONG, "Song"); -} - -const THING_GQL: &str = " +const THING_GQL: &str = r#" type Thing @entity { id: ID! bigThing: Thing! @@ -223,82 +198,82 @@ const THING_GQL: &str = " bytes: Bytes, bigInt: BigInt, color: Color, - }"; + }"#; -const THING_DDL: &str = "create type sgd0815.\"color\" +const THING_DDL: &str = r#"create type sgd0815."color" as enum ('BLUE', 'red', 'yellow'); -create type sgd0815.\"size\" - as enum (\'large\', \'medium\', \'small\'); -create table sgd0815.\"thing\" ( +create type sgd0815."size" + as enum ('large', 'medium', 'small'); +create table "sgd0815"."thing" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"big_thing\" text not null + "id" text not null, + "big_thing" text not null ); -alter table sgd0815.\"thing\" +alter table "sgd0815"."thing" add constraint thing_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_thing - on sgd0815.thing + on "sgd0815"."thing" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index thing_block_range_closed - on sgd0815.thing(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."thing"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_0_0_thing_id - on sgd0815.\"thing\" using btree(\"id\"); + on "sgd0815"."thing" using btree("id"); create index attr_0_1_thing_big_thing - on sgd0815.\"thing\" using gist(\"big_thing\", block_range); + on "sgd0815"."thing" using gist("big_thing", block_range); -create table sgd0815.\"scalar\" ( +create table "sgd0815"."scalar" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"bool\" boolean, - \"int\" integer, - \"big_decimal\" numeric, - \"string\" text, - \"bytes\" bytea, - \"big_int\" numeric, - \"color\" \"sgd0815\".\"color\" + "id" text not null, + "bool" boolean, + "int" integer, + "big_decimal" numeric, + "string" text, + "bytes" bytea, + "big_int" numeric, + "color" "sgd0815"."color" ); -alter table sgd0815.\"scalar\" +alter table "sgd0815"."scalar" add constraint scalar_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_scalar - on sgd0815.scalar + on "sgd0815"."scalar" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index scalar_block_range_closed - on sgd0815.scalar(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."scalar"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_1_0_scalar_id - on sgd0815.\"scalar\" using btree(\"id\"); + on "sgd0815"."scalar" using btree("id"); create index attr_1_1_scalar_bool - on sgd0815.\"scalar\" using btree(\"bool\"); + on "sgd0815"."scalar" using btree("bool"); create index attr_1_2_scalar_int - on sgd0815.\"scalar\" using btree(\"int\"); + on "sgd0815"."scalar" using btree("int"); create index attr_1_3_scalar_big_decimal - on sgd0815.\"scalar\" using btree(\"big_decimal\"); + on "sgd0815"."scalar" using btree("big_decimal"); create index attr_1_4_scalar_string - on sgd0815.\"scalar\" using btree(left(\"string\", 256)); + on "sgd0815"."scalar" using btree(left("string", 256)); create index attr_1_5_scalar_bytes - on sgd0815.\"scalar\" using btree(substring(\"bytes\", 1, 64)); + on "sgd0815"."scalar" using btree(substring("bytes", 1, 64)); create index attr_1_6_scalar_big_int - on sgd0815.\"scalar\" using btree(\"big_int\"); + on "sgd0815"."scalar" using btree("big_int"); create index attr_1_7_scalar_color - on sgd0815.\"scalar\" using btree(\"color\"); + on "sgd0815"."scalar" using btree("color"); -"; +"#; -const MUSIC_GQL: &str = "type Musician @entity { +const MUSIC_GQL: &str = r#"type Musician @entity { id: ID! name: String! mainBand: Band bands: [Band!]! - writtenSongs: [Song]! @derivedFrom(field: \"writtenBy\") + writtenSongs: [Song]! @derivedFrom(field: "writtenBy") } type Band @entity { id: ID! name: String! - members: [Musician!]! @derivedFrom(field: \"bands\") + members: [Musician!]! @derivedFrom(field: "bands") originalSongs: [Song!]! } @@ -306,100 +281,100 @@ type Song @entity(immutable: true) { id: ID! title: String! writtenBy: Musician! - band: Band @derivedFrom(field: \"originalSongs\") + band: Band @derivedFrom(field: "originalSongs") } type SongStat @entity { id: ID! - song: Song @derivedFrom(field: \"id\") + song: Song @derivedFrom(field: "id") played: Int! -}"; -const MUSIC_DDL: &str = "create table sgd0815.\"musician\" ( +}"#; +const MUSIC_DDL: &str = r#"create table "sgd0815"."musician" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"name\" text not null, - \"main_band\" text, - \"bands\" text[] not null + "id" text not null, + "name" text not null, + "main_band" text, + "bands" text[] not null ); -alter table sgd0815.\"musician\" +alter table "sgd0815"."musician" add constraint musician_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_musician - on sgd0815.musician + on "sgd0815"."musician" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index musician_block_range_closed - on sgd0815.musician(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."musician"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_0_0_musician_id - on sgd0815.\"musician\" using btree(\"id\"); + on "sgd0815"."musician" using btree("id"); create index attr_0_1_musician_name - on sgd0815.\"musician\" using btree(left(\"name\", 256)); + on "sgd0815"."musician" using btree(left("name", 256)); create index attr_0_2_musician_main_band - on sgd0815.\"musician\" using gist(\"main_band\", block_range); + on "sgd0815"."musician" using gist("main_band", block_range); create index attr_0_3_musician_bands - on sgd0815.\"musician\" using gin(\"bands\"); + on "sgd0815"."musician" using gin("bands"); -create table sgd0815.\"band\" ( +create table "sgd0815"."band" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"name\" text not null, - \"original_songs\" text[] not null + "id" text not null, + "name" text not null, + "original_songs" text[] not null ); -alter table sgd0815.\"band\" +alter table "sgd0815"."band" add constraint band_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_band - on sgd0815.band + on "sgd0815"."band" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index band_block_range_closed - on sgd0815.band(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."band"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_1_0_band_id - on sgd0815.\"band\" using btree(\"id\"); + on "sgd0815"."band" using btree("id"); create index attr_1_1_band_name - on sgd0815.\"band\" using btree(left(\"name\", 256)); + on "sgd0815"."band" using btree(left("name", 256)); create index attr_1_2_band_original_songs - on sgd0815.\"band\" using gin(\"original_songs\"); + on "sgd0815"."band" using gin("original_songs"); -create table sgd0815.\"song\" ( +create table "sgd0815"."song" ( vid bigserial primary key, block$ int not null, - \"id\" text not null, - \"title\" text not null, - \"written_by\" text not null, + "id" text not null, + "title" text not null, + "written_by" text not null, unique(id) ); create index brin_song - on sgd0815.song + on "sgd0815"."song" using brin(block$, vid); create index attr_2_1_song_title - on sgd0815.\"song\" using btree(left(\"title\", 256)); + on "sgd0815"."song" using btree(left("title", 256)); create index attr_2_2_song_written_by - on sgd0815.\"song\" using btree(\"written_by\", block$); + on "sgd0815"."song" using btree("written_by", block$); -create table sgd0815.\"song_stat\" ( +create table "sgd0815"."song_stat" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"played\" integer not null + "id" text not null, + "played" integer not null ); -alter table sgd0815.\"song_stat\" +alter table "sgd0815"."song_stat" add constraint song_stat_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_song_stat - on sgd0815.song_stat + on "sgd0815"."song_stat" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index song_stat_block_range_closed - on sgd0815.song_stat(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."song_stat"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_3_0_song_stat_id - on sgd0815.\"song_stat\" using btree(\"id\"); + on "sgd0815"."song_stat" using btree("id"); create index attr_3_1_song_stat_played - on sgd0815.\"song_stat\" using btree(\"played\"); + on "sgd0815"."song_stat" using btree("played"); -"; +"#; -const FOREST_GQL: &str = " +const FOREST_GQL: &str = r#" interface ForestDweller { id: ID!, forest: Forest @@ -411,84 +386,84 @@ type Animal implements ForestDweller @entity { type Forest @entity { id: ID!, # Array of interfaces as derived reference - dwellers: [ForestDweller!]! @derivedFrom(field: \"forest\") + dwellers: [ForestDweller!]! @derivedFrom(field: "forest") } type Habitat @entity { id: ID!, # Use interface as direct reference most_common: ForestDweller!, dwellers: [ForestDweller!]! -}"; +}"#; -const FOREST_DDL: &str = "create table sgd0815.\"animal\" ( +const FOREST_DDL: &str = r#"create table "sgd0815"."animal" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"forest\" text + "id" text not null, + "forest" text ); -alter table sgd0815.\"animal\" +alter table "sgd0815"."animal" add constraint animal_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_animal - on sgd0815.animal + on "sgd0815"."animal" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index animal_block_range_closed - on sgd0815.animal(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."animal"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_0_0_animal_id - on sgd0815.\"animal\" using btree(\"id\"); + on "sgd0815"."animal" using btree("id"); create index attr_0_1_animal_forest - on sgd0815.\"animal\" using gist(\"forest\", block_range); + on "sgd0815"."animal" using gist("forest", block_range); -create table sgd0815.\"forest\" ( +create table "sgd0815"."forest" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null + "id" text not null ); -alter table sgd0815.\"forest\" +alter table "sgd0815"."forest" add constraint forest_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_forest - on sgd0815.forest + on "sgd0815"."forest" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index forest_block_range_closed - on sgd0815.forest(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."forest"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_1_0_forest_id - on sgd0815.\"forest\" using btree(\"id\"); + on "sgd0815"."forest" using btree("id"); -create table sgd0815.\"habitat\" ( +create table "sgd0815"."habitat" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"most_common\" text not null, - \"dwellers\" text[] not null + "id" text not null, + "most_common" text not null, + "dwellers" text[] not null ); -alter table sgd0815.\"habitat\" +alter table "sgd0815"."habitat" add constraint habitat_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_habitat - on sgd0815.habitat + on "sgd0815"."habitat" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index habitat_block_range_closed - on sgd0815.habitat(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."habitat"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_2_0_habitat_id - on sgd0815.\"habitat\" using btree(\"id\"); + on "sgd0815"."habitat" using btree("id"); create index attr_2_1_habitat_most_common - on sgd0815.\"habitat\" using gist(\"most_common\", block_range); + on "sgd0815"."habitat" using gist("most_common", block_range); create index attr_2_2_habitat_dwellers - on sgd0815.\"habitat\" using gin(\"dwellers\"); + on "sgd0815"."habitat" using gin("dwellers"); -"; -const FULLTEXT_GQL: &str = " +"#; +const FULLTEXT_GQL: &str = r#" type _Schema_ @fulltext( - name: \"search\" + name: "search" language: en algorithm: rank - include: [\ + include: [ { - entity: \"Animal\", + entity: "Animal", fields: [ - {name: \"name\"}, - {name: \"species\"} + {name: "name"}, + {name: "species"} ] } ] @@ -501,84 +476,84 @@ type Animal @entity { } type Forest @entity { id: ID!, - dwellers: [Animal!]! @derivedFrom(field: \"forest\") + dwellers: [Animal!]! @derivedFrom(field: "forest") } type Habitat @entity { id: ID!, most_common: Animal!, dwellers: [Animal!]! -}"; +}"#; -const FULLTEXT_DDL: &str = "create table sgd0815.\"animal\" ( +const FULLTEXT_DDL: &str = r#"create table "sgd0815"."animal" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"name\" text not null, - \"species\" text not null, - \"forest\" text, - \"search\" tsvector + "id" text not null, + "name" text not null, + "species" text not null, + "forest" text, + "search" tsvector ); -alter table sgd0815.\"animal\" +alter table "sgd0815"."animal" add constraint animal_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_animal - on sgd0815.animal + on "sgd0815"."animal" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index animal_block_range_closed - on sgd0815.animal(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."animal"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_0_0_animal_id - on sgd0815.\"animal\" using btree(\"id\"); + on "sgd0815"."animal" using btree("id"); create index attr_0_1_animal_name - on sgd0815.\"animal\" using btree(left(\"name\", 256)); + on "sgd0815"."animal" using btree(left("name", 256)); create index attr_0_2_animal_species - on sgd0815.\"animal\" using btree(left(\"species\", 256)); + on "sgd0815"."animal" using btree(left("species", 256)); create index attr_0_3_animal_forest - on sgd0815.\"animal\" using gist(\"forest\", block_range); + on "sgd0815"."animal" using gist("forest", block_range); create index attr_0_4_animal_search - on sgd0815.\"animal\" using gin(\"search\"); + on "sgd0815"."animal" using gin("search"); -create table sgd0815.\"forest\" ( +create table "sgd0815"."forest" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null + "id" text not null ); -alter table sgd0815.\"forest\" +alter table "sgd0815"."forest" add constraint forest_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_forest - on sgd0815.forest + on "sgd0815"."forest" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index forest_block_range_closed - on sgd0815.forest(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."forest"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_1_0_forest_id - on sgd0815.\"forest\" using btree(\"id\"); + on "sgd0815"."forest" using btree("id"); -create table sgd0815.\"habitat\" ( +create table "sgd0815"."habitat" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"most_common\" text not null, - \"dwellers\" text[] not null + "id" text not null, + "most_common" text not null, + "dwellers" text[] not null ); -alter table sgd0815.\"habitat\" +alter table "sgd0815"."habitat" add constraint habitat_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_habitat - on sgd0815.habitat + on "sgd0815"."habitat" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index habitat_block_range_closed - on sgd0815.habitat(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."habitat"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_2_0_habitat_id - on sgd0815.\"habitat\" using btree(\"id\"); + on "sgd0815"."habitat" using btree("id"); create index attr_2_1_habitat_most_common - on sgd0815.\"habitat\" using gist(\"most_common\", block_range); + on "sgd0815"."habitat" using gist("most_common", block_range); create index attr_2_2_habitat_dwellers - on sgd0815.\"habitat\" using gin(\"dwellers\"); + on "sgd0815"."habitat" using gin("dwellers"); -"; +"#; -const FORWARD_ENUM_GQL: &str = " +const FORWARD_ENUM_GQL: &str = r#" type Thing @entity { id: ID!, orientation: Orientation! @@ -587,27 +562,27 @@ type Thing @entity { enum Orientation { UP, DOWN } -"; +"#; -const FORWARD_ENUM_SQL: &str = "create type sgd0815.\"orientation\" - as enum (\'DOWN\', \'UP\'); -create table sgd0815.\"thing\" ( +const FORWARD_ENUM_SQL: &str = r#"create type sgd0815."orientation" + as enum ('DOWN', 'UP'); +create table "sgd0815"."thing" ( vid bigserial primary key, block_range int4range not null, - \"id\" text not null, - \"orientation\" \"sgd0815\".\"orientation\" not null + "id" text not null, + "orientation" "sgd0815"."orientation" not null ); -alter table sgd0815.\"thing\" +alter table "sgd0815"."thing" add constraint thing_id_block_range_excl exclude using gist (id with =, block_range with &&); create index brin_thing - on sgd0815.thing + on "sgd0815"."thing" using brin(lower(block_range), coalesce(upper(block_range), 2147483647), vid); create index thing_block_range_closed - on sgd0815.thing(coalesce(upper(block_range), 2147483647)) + on "sgd0815"."thing"(coalesce(upper(block_range), 2147483647)) where coalesce(upper(block_range), 2147483647) < 2147483647; create index attr_0_0_thing_id - on sgd0815.\"thing\" using btree(\"id\"); + on "sgd0815"."thing" using btree("id"); create index attr_0_1_thing_orientation - on sgd0815.\"thing\" using btree(\"orientation\"); + on "sgd0815"."thing" using btree("orientation"); -"; +"#; diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index ed7d83a63cf..27f0444d188 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -3,7 +3,7 @@ use std::{fmt::Write, sync::Arc, time::Instant}; use diesel::{ connection::SimpleConnection, sql_query, - sql_types::{BigInt, Integer, Nullable}, + sql_types::{BigInt, Integer}, Connection, PgConnection, RunQueryDsl, }; use graph::{ @@ -20,54 +20,51 @@ use crate::{ relational::{Table, VID_COLUMN}, }; -use super::{Layout, SqlName}; +use super::{Layout, Namespace}; /// Utility to copy relevant data out of a source table and into a new /// destination table and replace the source table with the destination /// table struct TablePair { + // The original unpruned table src: Arc, + // The temporary table to which we copy the data we'd like to keep. It + // has the same name as `src` but is in a different namespace dst: Arc
, + src_nsp: Namespace, + dst_nsp: Namespace, } impl TablePair { /// Create a `TablePair` for `src`. This creates a new table `dst` with - /// the same structure as the `src` table in the database, but without - /// various indexes. Those are created with `switch` - fn create(conn: &PgConnection, layout: &Layout, src: Arc
) -> Result { - let new_name = SqlName::verbatim(format!("{}_n$", src.name)); - let nsp = &layout.site.namespace; - - let dst = src.new_like(&layout.site.namespace, &new_name); + /// the same structure as the `src` table in the database, but in a + /// different namespace so that the names of indexes etc. don't clash + fn create( + conn: &PgConnection, + src: Arc
, + src_nsp: Namespace, + dst_nsp: Namespace, + ) -> Result { + let dst = src.new_like(&dst_nsp, &src.name); let mut query = String::new(); - if catalog::table_exists(conn, layout.site.namespace.as_str(), &dst.name)? { - writeln!(query, "truncate table {nsp}.{new_name};")?; + if catalog::table_exists(conn, dst_nsp.as_str(), &dst.name)? { + writeln!(query, "truncate table {};", dst.qualified_name)?; } else { - dst.create_table(&mut query, layout)?; - - // Have the new table use the same vid sequence as the source - // table - writeln!( - query, - "\ - alter table {nsp}.{new_name} \ - alter column {VID_COLUMN} \ - set default nextval('{nsp}.{src_name}_vid_seq'::regclass);", - src_name = src.name - )?; - writeln!(query, "drop sequence {nsp}.{new_name}_vid_seq;")?; - writeln!( - query, - "alter sequence {nsp}.{src_name}_vid_seq owned by {nsp}.{new_name}.vid", - src_name = src.name - )?; + dst.as_ddl(&mut query)?; } conn.batch_execute(&query)?; - Ok(TablePair { src, dst }) + Ok(TablePair { + src, + dst, + src_nsp, + dst_nsp, + }) } + /// Copy all entity versions visible between `earliest_block` and + /// `final_block` in batches, where each batch is a separate transaction fn copy_final_entities( &self, conn: &PgConnection, @@ -76,14 +73,6 @@ impl TablePair { final_block: BlockNumber, cancel: &CancelHandle, ) -> Result> { - #[derive(QueryableByName)] - struct VidRange { - #[sql_type = "Nullable"] - min_vid: Option, - #[sql_type = "Nullable"] - max_vid: Option, - } - #[derive(QueryableByName)] struct LastVid { #[sql_type = "BigInt"] @@ -92,34 +81,12 @@ impl TablePair { last_vid: i64, } - let (min_vid, max_vid) = match sql_query(&format!( - "select min(vid) as min_vid, max(vid) as max_vid from {src} \ - where coalesce(upper(block_range), 2147483647) > $1 \ - and coalesce(upper(block_range), 2147483647) <= $2", - src = self.src.qualified_name - )) - .bind::(earliest_block) - .bind::(final_block) - .get_result::(conn)? - { - VidRange { - min_vid: None, - max_vid: None, - } => { - return Ok(0); - } - VidRange { - min_vid: Some(min), - max_vid: Some(max), - } => (min, max), - _ => unreachable!("min and max are Some or None at the same time"), - }; - cancel.check_cancel()?; - let column_list = self.column_list(); let mut batch_size = AdaptiveBatchSize::new(&self.src); - let mut next_vid = min_vid; + // The first vid we still need to copy. When we start, we start with + // 0 so that we don't constrain the set of rows based on their vid + let mut next_vid = 0; let mut total_rows: usize = 0; loop { let start = Instant::now(); @@ -130,80 +97,116 @@ impl TablePair { where lower(block_range) <= $2 \ and coalesce(upper(block_range), 2147483647) > $1 \ and coalesce(upper(block_range), 2147483647) <= $2 \ + and block_range && int4range($1, $2, '[]') \ and vid >= $3 \ - and vid <= $4 \ order by vid \ - limit $5 \ + limit $4 \ returning vid) \ - select max(cp.vid) as last_vid, count(*) as rows from cp", + select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp", src = self.src.qualified_name, dst = self.dst.qualified_name )) .bind::(earliest_block) .bind::(final_block) .bind::(next_vid) - .bind::(max_vid) .bind::(&batch_size) .get_result::(conn) })?; cancel.check_cancel()?; total_rows += rows as usize; - reporter.copy_final_batch( - self.src.name.as_str(), - rows as usize, - total_rows, - last_vid >= max_vid, - ); + let done = rows == 0; + reporter.copy_final_batch(self.src.name.as_str(), rows as usize, total_rows, done); - if last_vid >= max_vid { - break; + if done { + return Ok(total_rows); } batch_size.adapt(start.elapsed()); next_vid = last_vid + 1; } - - Ok(total_rows) } + /// Copy all entity versions visible after `final_block` in batches, + /// where each batch is a separate transaction fn copy_nonfinal_entities( &self, conn: &PgConnection, + reporter: &mut dyn PruneReporter, final_block: BlockNumber, ) -> Result { let column_list = self.column_list(); - sql_query(&format!( - "insert into {dst}({column_list}) \ - select {column_list} from {src} \ - where coalesce(upper(block_range), 2147483647) > $1 \ - and block_range && int4range($1, null) \ - order by vid", - dst = self.dst.qualified_name, - src = self.src.qualified_name, - )) - .bind::(final_block) - .execute(conn) - .map_err(StoreError::from) + #[derive(QueryableByName)] + struct LastVid { + #[sql_type = "BigInt"] + rows: i64, + #[sql_type = "BigInt"] + last_vid: i64, + } + + let mut batch_size = AdaptiveBatchSize::new(&self.src); + // The first vid we still need to copy. When we start, we start with + // 0 so that we don't constrain the set of rows based on their vid + let mut next_vid = 0; + let mut total_rows = 0; + loop { + let start = Instant::now(); + let LastVid { rows, last_vid } = conn.transaction(|| { + sql_query(&format!( + "with cp as (insert into {dst}({column_list}) \ + select {column_list} from {src} \ + where coalesce(upper(block_range), 2147483647) > $1 \ + and block_range && int4range($1, null) \ + and vid >= $2 \ + order by vid \ + limit $3 + returning vid) \ + select coalesce(max(cp.vid), 0) as last_vid, count(*) as rows from cp", + dst = self.dst.qualified_name, + src = self.src.qualified_name, + )) + .bind::(final_block) + .bind::(next_vid) + .bind::(&batch_size) + .get_result::(conn) + .map_err(StoreError::from) + })?; + total_rows += rows as usize; + reporter.copy_nonfinal_batch( + self.src.name.as_str(), + rows as usize, + total_rows, + rows == 0, + ); + if rows == 0 { + return Ok(total_rows); + } + batch_size.adapt(start.elapsed()); + next_vid = last_vid + 1; + } } - /// Replace the `src` table with the `dst` table. This makes sure (as - /// does the rest of the code in `TablePair`) that the table and all - /// associated objects (indexes, constraints, etc.) have the same names - /// as they had initially so that pruning can be performed again in the - /// future without any name clashes in the database. - fn switch(self, conn: &PgConnection, layout: &Layout) -> Result<(), StoreError> { - sql_query(&format!("drop table {}", self.src.qualified_name)).execute(conn)?; + /// Replace the `src` table with the `dst` table + fn switch(self, conn: &PgConnection) -> Result<(), StoreError> { + let src_qname = &self.src.qualified_name; + let dst_qname = &self.dst.qualified_name; + let src_nsp = &self.src_nsp; + let dst_nsp = &self.dst_nsp; + + let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name); - let uses_excl = - catalog::has_exclusion_constraint(conn, &layout.site.namespace, &self.dst.name)?; let mut query = String::new(); - Table::rename_sql(&mut query, &layout, &self.dst, &self.src, uses_excl)?; - self.src.create_time_travel_indexes(&mut query, layout)?; - self.src.create_attribute_indexes(&mut query, layout)?; - conn.batch_execute(&query)?; + // Make sure the vid sequence continues from where it was + writeln!( + query, + "select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));" + )?; + + writeln!(query, "drop table {src_qname};")?; + writeln!(query, "alter table {dst_qname} set schema {src_nsp}")?; + conn.transaction(|| conn.batch_execute(&query))?; Ok(()) } @@ -277,7 +280,10 @@ impl Layout { // Determine which tables are prunable and create a shadow table for // them via `TablePair::create` - let prunable_tables = { + let dst_nsp = Namespace::prune(self.site.id); + let prunable_tables = conn.transaction(|| -> Result<_, StoreError> { + catalog::recreate_schema(conn, dst_nsp.as_str())?; + let mut prunable_tables: Vec = self .tables .values() @@ -288,11 +294,18 @@ impl Layout { .map(|s| (table, s)) }) .filter(|(_, stats)| stats.ratio <= prune_ratio) - .map(|(table, _)| TablePair::create(conn, self, table.cheap_clone())) + .map(|(table, _)| { + TablePair::create( + conn, + table.cheap_clone(), + self.site.namespace.clone(), + dst_nsp.clone(), + ) + }) .collect::>()?; prunable_tables.sort_by(|a, b| a.src.name.as_str().cmp(b.src.name.as_str())); - prunable_tables - }; + Ok(prunable_tables) + })?; cancel.check_cancel()?; // Copy final entities. This can happen in parallel to indexing as @@ -311,26 +324,26 @@ impl Layout { // Copy nonfinal entities, and replace the original `src` table with // the smaller `dst` table reporter.start_switch(); - conn.transaction(|| -> Result<(), CancelableError> { - // see also: deployment-lock-for-update - deployment::lock(conn, &self.site)?; - + // see also: deployment-lock-for-update + deployment::with_lock(conn, &self.site, || -> Result<_, StoreError> { for table in &prunable_tables { reporter.copy_nonfinal_start(table.src.name.as_str()); - let rows = table.copy_nonfinal_entities(conn, final_block)?; - reporter.copy_nonfinal_finish(table.src.name.as_str(), rows); - cancel.check_cancel()?; + table.copy_nonfinal_entities(conn, reporter, final_block)?; + cancel.check_cancel().map_err(CancelableError::from)?; } for table in prunable_tables { - table.switch(conn, self)?; - cancel.check_cancel()?; + conn.transaction(|| table.switch(conn))?; + cancel.check_cancel().map_err(CancelableError::from)?; } Ok(()) })?; reporter.finish_switch(); + // Get rid of the temporary prune schema + catalog::drop_schema(conn, dst_nsp.as_str())?; + // Analyze the new tables reporter.start_analyze(); for table in &prunable_src {