Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta/schema_api): add API list_all_tables #8254

Merged
merged 1 commit into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use common_meta_app::schema::RenameDatabaseReply;
use common_meta_app::schema::RenameDatabaseReq;
use common_meta_app::schema::RenameTableReply;
use common_meta_app::schema::RenameTableReq;
use common_meta_app::schema::TableId;
use common_meta_app::schema::TableIdent;
use common_meta_app::schema::TableInfo;
use common_meta_app::schema::TableMeta;
Expand Down Expand Up @@ -93,6 +94,14 @@ pub trait SchemaApi: Send + Sync {

async fn create_table(&self, req: CreateTableReq) -> Result<CreateTableReply, KVAppError>;

/// List all tables belonging to every db and every tenant.
///
/// I.e.: all tables found in key-space: `__fd_table_by_id/`.
/// Notice that this key space includes both deleted and non-deleted table-metas. `TableMeta.drop_on.is_some()` indicates it's a deleted(but not yet gargbage collected) one.
///
/// It returns a list of (table-id, table-meta-seq, table-meta).
async fn list_all_tables(&self) -> Result<Vec<(TableId, u64, TableMeta)>, KVAppError>;

async fn drop_table(&self, req: DropTableReq) -> Result<DropTableReply, KVAppError>;

async fn undrop_table(&self, req: UndropTableReq) -> Result<UndropTableReply, KVAppError>;
Expand Down
30 changes: 30 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,12 @@ use common_meta_types::errors::app_error::WrongShareObject;
use common_meta_types::ConditionResult;
use common_meta_types::GCDroppedDataReply;
use common_meta_types::GCDroppedDataReq;
use common_meta_types::InvalidReply;
use common_meta_types::KVAppError;
use common_meta_types::MatchSeqExt;
use common_meta_types::MetaError;
use common_meta_types::MetaId;
use common_meta_types::MetaNetworkError;
use common_meta_types::TxnCondition;
use common_meta_types::TxnOp;
use common_meta_types::TxnRequest;
Expand Down Expand Up @@ -1017,6 +1020,33 @@ impl<KV: KVApi> SchemaApi for KV {
)))
}

/// List all tables belonging to every db and every tenant.
///
/// It returns a list of (table-id, table-meta-seq, table-meta).
#[tracing::instrument(level = "debug", ret, err, skip_all)]
async fn list_all_tables(&self) -> Result<Vec<(TableId, u64, TableMeta)>, KVAppError> {
debug!("SchemaApi: {}", func_name!());

let reply = self
.prefix_list_kv(&vec![TableId::PREFIX, ""].join("/"))
.await?;

let mut res = vec![];

for (kk, vv) in reply.into_iter() {
BohuTANG marked this conversation as resolved.
Show resolved Hide resolved
let table_id = TableId::from_key(&kk).map_err(|e| {
let inv = InvalidReply::new("list_all_tables", &e);
let meta_net_err = MetaNetworkError::InvalidReply(inv);
MetaError::NetworkError(meta_net_err)
})?;

let table_meta: TableMeta = deserialize_struct(&vv.data)?;

res.push((table_id, vv.seq, table_meta));
}
Ok(res)
}

#[tracing::instrument(level = "debug", ret, err, skip_all)]
async fn drop_table(&self, req: DropTableReq) -> Result<DropTableReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());
Expand Down
76 changes: 76 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ impl SchemaApiTestSuite {
suite.table_update_meta(&b.build().await).await?;
suite.table_upsert_option(&b.build().await).await?;
suite.table_list(&b.build().await).await?;
suite.table_list_all(&b.build().await).await?;
suite
.table_drop_undrop_list_history(&b.build().await)
.await?;
Expand Down Expand Up @@ -3632,6 +3633,81 @@ impl SchemaApiTestSuite {
Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn table_list_all<MT: SchemaApi>(&self, mt: &MT) -> anyhow::Result<()> {
let tenant = "tenant1";
let db1_name = "db1";
let db2_name = "db2";

info!("--- prepare db");
{
self.create_database(mt, tenant, db1_name, "eng1").await?;
self.create_database(mt, tenant, db2_name, "eng1").await?;
}

info!("--- create 2 tables: tb1 tb2");
{
// Table schema with metadata(due to serde issue).
let schema = Arc::new(DataSchema::new(vec![DataField::new(
"number",
u64::to_data_type(),
)]));

let options = maplit::btreemap! {"opt‐1".into() => "val-1".into()};

let mut req = CreateTableReq {
if_not_exists: false,
name_ident: TableNameIdent {
tenant: tenant.to_string(),
db_name: db1_name.to_string(),
table_name: "tb1".to_string(),
},
table_meta: TableMeta {
schema: schema.clone(),
engine: "JSON".to_string(),
options: options.clone(),
..Default::default()
},
};

let tb_ids = {
req.table_meta
.options
.insert("name".to_string(), "t1".to_string());
let res = mt.create_table(req.clone()).await?;
assert!(res.table_id >= 1, "table id >= 1");
let tb_id1 = res.table_id;

req.name_ident.db_name = db2_name.to_string();
req.name_ident.table_name = "tb2".to_string();
req.table_meta
.options
.insert("name".to_string(), "t2".to_string());
let res = mt.create_table(req.clone()).await?;
assert!(res.table_id > tb_id1, "table id > tb_id1: {}", tb_id1);
let tb_id2 = res.table_id;

vec![tb_id1, tb_id2]
};

info!("--- get_tables");
{
let res = mt.list_all_tables().await?;
assert_eq!(tb_ids.len(), res.len());

// check table-id
assert_eq!(tb_ids[0], res[0].0.table_id);
assert_eq!(tb_ids[1], res[1].0.table_id);

// check table-meta
assert_eq!(Some(&"t1".to_string()), res[0].2.options.get("name"));
assert_eq!(Some(&"t2".to_string()), res[1].2.options.get("name"));
}
}

Ok(())
}

// pub async fn share_create_get_drop<MT: SchemaApi>(&self, mt: &MT) -> anyhow::Result<()> {
// let tenant1 = "tenant1";
// let share_name1 = "share1";
Expand Down