Skip to content

Commit

Permalink
Merge pull request #3369 from ariesdevil/raft-follower-read
Browse files Browse the repository at this point in the history
follower forward ListDatabase and GetTableInfo requests
  • Loading branch information
databend-bot authored Dec 10, 2021
2 parents 72b2f40 + 082952f commit 5540a6d
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 10 deletions.
108 changes: 105 additions & 3 deletions common/meta/api/src/meta_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_meta_types::CreateTableReq;
use common_meta_types::DropDatabaseReq;
use common_meta_types::DropTableReq;
use common_meta_types::GetDatabaseReq;
use common_meta_types::GetTableReq;
use common_meta_types::ListDatabaseReq;
use common_meta_types::ListTableReq;
use common_meta_types::TableIdent;
Expand Down Expand Up @@ -480,7 +481,7 @@ impl MetaApiTestSuite {
tracing::debug!("get present database res: {:?}", res);
let res = res?;
assert_eq!(1, res.database_id, "db1 id is 1");
assert_eq!("db1".to_string(), res.db, "db1.db is db1");
assert_eq!("db1", res.db, "db1.db is db1");
}

tracing::info!("--- get nonexistent-db on follower, expect correct error");
Expand All @@ -498,6 +499,42 @@ impl MetaApiTestSuite {
Ok(())
}

pub async fn list_database_leader_follower<MT: MetaApi>(
&self,
leader: &MT,
follower: &MT,
) -> anyhow::Result<()> {
tracing::info!("--- create db1 and db3 on leader");
{
let dbs = vec!["db1", "db3"];
for db_name in dbs {
let req = CreateDatabaseReq {
if_not_exists: false,
db: db_name.to_string(),
engine: "github".to_string(),
options: Default::default(),
};
let res = leader.create_database(req).await;
tracing::info!("create database res: {:?}", res);
assert!(res.is_ok());
}
}

tracing::info!("--- list databases from follower");
{
let res = follower.list_databases(ListDatabaseReq {}).await;
tracing::debug!("get database list: {:?}", res);
let res = res?;
assert_eq!(2, res.len(), "database list len is 2");
assert_eq!(1, res[0].database_id, "db1 id is 1");
assert_eq!("db1", res[0].db, "db1.name is db1");
assert_eq!(2, res[1].database_id, "db3 id is 2");
assert_eq!("db3", res[1].db, "db3.name is db3");
}

Ok(())
}

pub async fn list_table_leader_follower<MT: MetaApi>(
&self,
leader: &MT,
Expand Down Expand Up @@ -548,9 +585,74 @@ impl MetaApiTestSuite {
let res = res?;
assert_eq!(2, res.len(), "table list len is 2");
assert_eq!(1, res[0].ident.table_id, "tb1 id is 1");
assert_eq!("tb1".to_string(), res[0].name, "tb1.name is tb1");
assert_eq!("tb1", res[0].name, "tb1.name is tb1");
assert_eq!(2, res[1].ident.table_id, "tb2 id is 2");
assert_eq!("tb2".to_string(), res[1].name, "tb2.name is tb2");
assert_eq!("tb2", res[1].name, "tb2.name is tb2");
}

Ok(())
}

pub async fn table_get_leader_follower<MT: MetaApi>(
&self,
leader: &MT,
follower: &MT,
) -> anyhow::Result<()> {
tracing::info!("--- create table tb1 on leader");
let db_name = "db1";
{
let req = CreateDatabaseReq {
if_not_exists: false,
db: db_name.to_string(),
engine: "github".to_string(),
options: Default::default(),
};
let res = leader.create_database(req).await;
tracing::info!("create database res: {:?}", res);
assert!(res.is_ok());

let schema = Arc::new(DataSchema::new(vec![DataField::new(
"number",
DataType::UInt64,
false,
)]));

let options = maplit::hashmap! {"opt‐1".into() => "val-1".into()};

let req = CreateTableReq {
if_not_exists: false,
db: db_name.to_string(),
table: "tb1".to_string(),
table_meta: TableMeta {
schema: schema.clone(),
engine: "JSON".to_string(),
options: options.clone(),
},
};

let res = leader.create_table(req).await;
tracing::info!("create table res: {:?}", res);
assert!(res.is_ok());
}

tracing::info!("--- get tb1 on follower");
{
let res = follower.get_table(GetTableReq::new("db1", "tb1")).await;
tracing::debug!("get present table res: {:?}", res);
let res = res?;
assert_eq!(1, res.ident.table_id, "tb1 id is 1");
assert_eq!("tb1", res.name, "tb1.name is tb1");
}

tracing::info!("--- get nonexistent-table on follower, expect correct error");
{
let res = follower
.get_table(GetTableReq::new("db1", "nonexistent"))
.await;
tracing::debug!("get present table res: {:?}", res);
let err = res.unwrap_err();
println!("{:?}", err);
assert_eq!(ErrorCode::UnknownTable("").code(), err.code());
}

Ok(())
Expand Down
34 changes: 29 additions & 5 deletions metasrv/src/executor/meta_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::ToErrorCode;
use common_meta_api::MetaApi;
use common_meta_flight::FlightReq;
use common_meta_flight::GetTableExtReq;
use common_meta_types::Change;
Expand Down Expand Up @@ -247,8 +246,20 @@ impl RequestHandler<FlightReq<GetTableReq>> for ActionHandler {
&self,
act: FlightReq<GetTableReq>,
) -> common_exception::Result<Arc<TableInfo>> {
let sm = self.meta_node.get_state_machine().await;
sm.get_table(act.req).await
let res = self
.meta_node
.handle_admin_req(AdminRequest {
forward_to_leader: true,
req: AdminRequestInner::GetTable(act.req),
})
.await?;
let res: Arc<TableInfo> = res
.try_into()
.map_err_to_code(ErrorCode::UnknownException, || {
"handling FlightReq GetTableReq".to_string()
})?;

Ok(res)
}
}

Expand Down Expand Up @@ -279,8 +290,21 @@ impl RequestHandler<FlightReq<ListDatabaseReq>> for ActionHandler {
&self,
req: FlightReq<ListDatabaseReq>,
) -> common_exception::Result<Vec<Arc<DatabaseInfo>>> {
let sm = self.meta_node.get_state_machine().await;
sm.list_databases(req.req).await
let res = self
.meta_node
.handle_admin_req(AdminRequest {
forward_to_leader: true,
req: AdminRequestInner::ListDatabase(req.req),
})
.await?;

let res: Vec<Arc<DatabaseInfo>> = res
.try_into()
.map_err_to_code(ErrorCode::UnknownException, || {
"handling FlightReq ListDatabaseReq".to_string()
})?;

Ok(res)
}
}

Expand Down
6 changes: 6 additions & 0 deletions metasrv/src/meta_service/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use async_raft::raft::VoteRequest;
use common_meta_raft_store::state_machine::AppliedState;
use common_meta_types::DatabaseInfo;
use common_meta_types::GetDatabaseReq;
use common_meta_types::GetTableReq;
use common_meta_types::ListDatabaseReq;
use common_meta_types::ListTableReq;
use common_meta_types::LogEntry;
use common_meta_types::NodeId;
Expand All @@ -42,8 +44,10 @@ pub struct JoinRequest {
pub enum AdminRequestInner {
Join(JoinRequest),
Write(LogEntry),
ListDatabase(ListDatabaseReq),
GetDatabase(GetDatabaseReq),
ListTable(ListTableReq),
GetTable(GetTableReq),
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
Expand All @@ -65,8 +69,10 @@ impl AdminRequest {
pub enum AdminResponse {
Join(()),
AppliedState(AppliedState),
ListDatabase(Vec<Arc<DatabaseInfo>>),
DatabaseInfo(Arc<DatabaseInfo>),
ListTable(Vec<Arc<TableInfo>>),
TableInfo(Arc<TableInfo>),
}

impl tonic::IntoRequest<RaftRequest> for AdminRequest {
Expand Down
15 changes: 13 additions & 2 deletions metasrv/src/meta_service/meta_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,27 @@ impl<'a> MetaLeader<'a> {
Ok(AdminResponse::AppliedState(res))
}

AdminRequestInner::ListDatabase(req) => {
let sm = self.meta_node.get_state_machine().await;
let res = sm.list_databases(req).await?;
Ok(AdminResponse::ListDatabase(res))
}

AdminRequestInner::GetDatabase(req) => {
let x = self.meta_node.get_state_machine().await;
let res = x.get_database(req).await?;
let sm = self.meta_node.get_state_machine().await;
let res = sm.get_database(req).await?;
Ok(AdminResponse::DatabaseInfo(res))
}
AdminRequestInner::ListTable(req) => {
let sm = self.meta_node.get_state_machine().await;
let res = sm.list_tables(req).await?;
Ok(AdminResponse::ListTable(res))
}
AdminRequestInner::GetTable(req) => {
let sm = self.meta_node.get_state_machine().await;
let res = sm.get_table(req).await?;
Ok(AdminResponse::TableInfo(res))
}
}
}

Expand Down
29 changes: 29 additions & 0 deletions metasrv/tests/it/flight/metasrv_flight_meta_api_leader_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,35 @@ async fn test_meta_api_database_create_get_drop() -> anyhow::Result<()> {
.await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_meta_api_list_database() -> anyhow::Result<()> {
let (_log_guards, ut_span) = init_meta_ut!();
let _ent = ut_span.enter();

let tcs = start_metasrv_cluster(&[0, 1]).await?;

let client0 = tcs[0].flight_client().await?;
let client1 = tcs[1].flight_client().await?;

MetaApiTestSuite {}
.list_database_leader_follower(&client0, &client1)
.await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_meta_api_table_create_get_drop() -> anyhow::Result<()> {
let (_log_guards, ut_span) = init_meta_ut!();
let _ent = ut_span.enter();

let tcs = start_metasrv_cluster(&[0, 1]).await?;

let client0 = tcs[0].flight_client().await?;
let client1 = tcs[1].flight_client().await?;
MetaApiTestSuite {}
.table_get_leader_follower(&client0, &client1)
.await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_meta_api_list_table() -> anyhow::Result<()> {
let (_log_guards, ut_span) = init_meta_ut!();
Expand Down

0 comments on commit 5540a6d

Please sign in to comment.