Skip to content

Commit

Permalink
chore: simplify SchemaApi::truncate_table() (#16506)
Browse files Browse the repository at this point in the history
* refactor: SchemaApi::list_tables() should specify db-id instead of db-name

- Change list_tables() to accept db_id instead of db_name for better precision
- Modify return type from TableInfo to (table_name, table_id,
  SeqV<table_meta>).
  Because some info in TableInfo can not be provided by SchemaApi

* refactor: when SchemaApi::truncate_table(), no need to assert table seq

truncate_table() first lists all copied files belonging to a table,
then deletes them in small chunks. The delete operations assert that
the seq of each file does not change.

With this approach, there is no need to assert that the seq of the
containing table does not change.

* chore: simplify SchemaApi::truncate_table()

* chore: fixup lint
  • Loading branch information
drmingdrmer authored Sep 25, 2024
1 parent 83a3e5a commit b4e3025
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 279 deletions.
9 changes: 8 additions & 1 deletion src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,16 @@ pub trait SchemaApi: Send + Sync {
async fn get_tables_history(
&self,
req: ListTableReq,
db_name: &str,
) -> Result<Vec<Arc<TableInfo>>, KVAppError>;

async fn list_tables(&self, req: ListTableReq) -> Result<Vec<Arc<TableInfo>>, KVAppError>;
/// List all tables in the database.
///
/// Returns a list of `(table_name, table_id, table_meta)` tuples.
async fn list_tables(
&self,
req: ListTableReq,
) -> Result<Vec<(String, TableId, SeqV<TableMeta>)>, KVAppError>;

/// Return TableMeta by table_id.
///
Expand Down
172 changes: 52 additions & 120 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ use databend_common_meta_app::schema::RenameTableReq;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyAction;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
use databend_common_meta_app::schema::TableCopiedFileInfo;
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableIdHistoryIdent;
Expand Down Expand Up @@ -202,6 +201,7 @@ use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_crud_api::KVPbCrudApi;
use crate::list_keys;
use crate::list_u64_value;
use crate::meta_txn_error::MetaTxnError;
use crate::name_id_value_api::NameIdValueApi;
use crate::name_value_api::NameValueApi;
Expand All @@ -217,8 +217,6 @@ use crate::txn_op_put;
use crate::util::db_id_has_to_exist;
use crate::util::deserialize_id_get_response;
use crate::util::deserialize_struct_get_response;
use crate::util::get_table_by_id_or_err;
use crate::util::list_tables_from_unshare_db;
use crate::util::mget_pb_values;
use crate::util::txn_delete_exact;
use crate::util::txn_op_put_pb;
Expand Down Expand Up @@ -1569,29 +1567,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn get_tables_history(
&self,
req: ListTableReq,
db_name: &str,
) -> Result<Vec<Arc<TableInfo>>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let tenant_dbname = &req.inner;

// Get db by name to ensure presence
let res = get_db_or_err(
self,
tenant_dbname,
format!("get_tables_history: {}", tenant_dbname.display()),
)
.await;

let (seq_db_id, _db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
}
};

// List tables by tenant, db_id, table_name.
let table_id_history_ident = TableIdHistoryIdent {
database_id: *seq_db_id.data,
database_id: req.database_id.db_id,
table_name: "dummy".to_string(),
};

Expand All @@ -1605,7 +1587,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.iter()
.map(|table_id_list_key| {
TableIdHistoryIdent {
database_id: *seq_db_id.data,
database_id: req.database_id.db_id,
table_name: table_id_list_key.table_name.clone(),
}
.to_string_key()
Expand Down Expand Up @@ -1640,11 +1622,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.map(|(table_id, seqv)| {
Arc::new(TableInfo {
ident: TableIdent::new(table_id.table_id, seqv.seq()),
desc: format!(
"'{}'.'{}'",
tenant_dbname.database_name(),
table_id_list_key.table_name,
),
desc: format!("'{}'.'{}'", db_name, table_id_list_key.table_name,),
name: table_id_list_key.table_name.to_string(),
meta: seqv.data,
db_type: DatabaseType::NormalDB,
Expand All @@ -1661,29 +1639,38 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn list_tables(&self, req: ListTableReq) -> Result<Vec<Arc<TableInfo>>, KVAppError> {
async fn list_tables(
&self,
req: ListTableReq,
) -> Result<Vec<(String, TableId, SeqV<TableMeta>)>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let tenant_dbname = &req.inner;
let dbid_tbname = DBIdTableName {
db_id: req.database_id.db_id,
// Use empty name to scan all tables
table_name: "".to_string(),
};

// Get db by name to ensure presence
let res = get_db_or_err(
self,
tenant_dbname,
format!("list_tables: {}", tenant_dbname.display()),
)
.await;
let (names, ids) = list_u64_value(self, &dbid_tbname).await?;

let (seq_db_id, _db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
}
};
let ids = ids
.into_iter()
.map(|id| TableId { table_id: id })
.collect::<Vec<_>>();

let tb_infos = list_tables_from_unshare_db(self, *seq_db_id.data, tenant_dbname).await?;
let mut seq_metas = vec![];
for chunk in ids.chunks(DEFAULT_MGET_SIZE) {
let got = self.get_pb_values_vec(chunk.to_vec()).await?;
seq_metas.extend(got);
}

Ok(tb_infos)
let res = names
.into_iter()
.zip(ids)
.zip(seq_metas)
.filter_map(|((n, id), seq_meta)| seq_meta.map(|x| (n.table_name, id, x)))
.collect::<Vec<_>>();
Ok(res)
}

#[logcall::logcall]
Expand Down Expand Up @@ -2105,8 +2092,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let ctx = func_name!();

let table_id = TableId {
table_id: req.table_id,
};
Expand All @@ -2118,20 +2103,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// If table seq is not changed before and after listing, we can be sure the list of copied
// files is consistent to this version of the table.

let (mut seq_1, _tb_meta) = get_table_by_id_or_err(self, &table_id, ctx).await?;
let mut seq_1 = self.get_seq(&table_id).await?;

let mut trials = txn_backoff(None, func_name!());
let copied_files = loop {
trials.next().unwrap()?.await;

let copied_files = list_table_copied_files(self, table_id.table_id).await?;
let copied_file_ident = TableCopiedFileNameIdent {
table_id: table_id.table_id,
file: "dummy".to_string(),
};
let dir_name = DirName::new(copied_file_ident);
let copied_files = self.list_pb_vec(&dir_name).await?;

let (seq_2, _tb_meta) = get_table_by_id_or_err(self, &table_id, ctx).await?;
let seq_2 = self.get_seq(&table_id).await?;

if seq_1 == seq_2 {
debug!(
"list all copied file of table {}: {:?}",
table_id.table_id, copied_files
"list all copied file of table {}: {} files",
table_id.table_id,
copied_files.len()
);
break copied_files;
} else {
Expand All @@ -2141,64 +2132,22 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// 2. Remove the copied files only when the seq of a copied file has not changed.
//
// During running this step with several small transaction, other transactions may be
// modifying the table.
//
// - We assert the table seq is not changed in each transaction.
// - We do not assert the seq of each copied file in each transaction, since we only delete
// non-changed ones.

for chunk in copied_files.chunks(chunk_size as usize) {
let str_keys: Vec<_> = chunk.iter().map(|f| f.to_string_key()).collect();

// Load the `seq` of every copied file
let seqs = {
let seq_infos: Vec<(u64, Option<TableCopiedFileInfo>)> =
mget_pb_values(self, &str_keys).await?;

seq_infos.into_iter().map(|(seq, _)| seq)
let txn = TxnRequest {
condition: vec![],
if_then: chunk
.iter()
.map(|(name, seq_file)| {
TxnOp::delete_exact(name.to_string_key(), Some(seq_file.seq()))
})
.collect(),
else_then: vec![],
};

let mut if_then = vec![];
for (copied_seq, copied_str_key) in seqs.zip(str_keys) {
if copied_seq == 0 {
continue;
}

if_then.push(TxnOp::delete_exact(copied_str_key, Some(copied_seq)));
}

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let (tb_meta_seq, tb_meta) = get_table_by_id_or_err(self, &table_id, ctx).await?;

let mut if_then = if_then.clone();

// Update to increase table meta seq, so that to assert no other process modify the table
if_then.push(txn_op_put(&table_id, serialize_struct(&tb_meta)?));

let txn_req = TxnRequest {
condition: vec![txn_cond_seq(&table_id, Eq, tb_meta_seq)],
if_then,
else_then: vec![],
};

debug!("submit chunk delete copied files: {:?}", txn_req);

let (succ, _responses) = send_txn(self, txn_req).await?;
debug!(
id :? =(&table_id),
succ = succ,
ctx = ctx;
""
);

if succ {
break;
}
}
let (_succ, _responses) = send_txn(self, txn).await?;
}

Ok(TruncateTableReply {})
Expand Down Expand Up @@ -3472,23 +3421,6 @@ async fn remove_copied_files_for_dropped_table(
unreachable!()
}

/// List the copied file identities belonging to a table.
async fn list_table_copied_files(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
table_id: u64,
) -> Result<Vec<TableCopiedFileNameIdent>, MetaError> {
let copied_file_ident = TableCopiedFileNameIdent {
table_id,
file: "dummy".to_string(),
};

let dir_name = DirName::new(copied_file_ident);

let copied_files = list_keys(kv_api, &dir_name).await?;

Ok(copied_files)
}

/// Get the retention boundary time before which the data can be permanently removed.
fn get_retention_boundary(now: DateTime<Utc>) -> DateTime<Utc> {
now - Duration::from_secs(DEFAULT_DATA_RETENTION_SECONDS as u64)
Expand Down
Loading

0 comments on commit b4e3025

Please sign in to comment.