Skip to content

Commit

Permalink
Merge pull request #5856 from lichuang/schema_retry_max_time
Browse files Browse the repository at this point in the history
Improve: add schema api txn max retry times
  • Loading branch information
BohuTANG authored Jun 9, 2022
2 parents 757bbd6 + 4c4f2aa commit b180407
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 8 deletions.
1 change: 1 addition & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ build_exceptions! {
DropTableWithDropTime(2314),
DropDbWithDropTime(2315),
UndropDbWithNoDropTime(2316),
TxnRetryMaxTimes(2317),

// Cluster error codes.
ClusterUnknownNode(2401),
Expand Down
66 changes: 58 additions & 8 deletions common/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use common_meta_types::app_error::DropDbWithDropTime;
use common_meta_types::app_error::DropTableWithDropTime;
use common_meta_types::app_error::TableAlreadyExists;
use common_meta_types::app_error::TableVersionMismatched;
use common_meta_types::app_error::TxnRetryMaxTimes;
use common_meta_types::app_error::UndropDbHasNoHistory;
use common_meta_types::app_error::UndropDbWithNoDropTime;
use common_meta_types::app_error::UndropTableAlreadyExists;
Expand Down Expand Up @@ -105,6 +106,7 @@ use crate::SchemaApi;
use crate::TableIdGen;

const DEFAULT_DATA_RETENTION_SECONDS: i64 = 24 * 60 * 60;
const TXN_MAX_RETRY_TIMES: u32 = 10;

/// SchemaApi is implemented upon KVApi.
/// Thus every type that impl KVApi impls SchemaApi.
Expand All @@ -121,7 +123,9 @@ impl<KV: KVApi> SchemaApi for KV {
CreateDatabaseWithDropTime::new(&name_key.db_name),
)));
}
loop {
let mut retry = 0;
while retry < TXN_MAX_RETRY_TIMES {
retry += 1;
// Get db by name to ensure absence
let (db_id_seq, db_id) = get_u64_value(self, name_key).await?;
tracing::debug!(db_id_seq, db_id, ?name_key, "get_database");
Expand Down Expand Up @@ -197,12 +201,18 @@ impl<KV: KVApi> SchemaApi for KV {
}
}
}

Err(MetaError::AppError(AppError::TxnRetryMaxTimes(
TxnRetryMaxTimes::new("create_database", TXN_MAX_RETRY_TIMES),
)))
}

async fn drop_database(&self, req: DropDatabaseReq) -> Result<DropDatabaseReply, MetaError> {
let tenant_dbname = &req.name_ident;
let mut retry = 0;

loop {
while retry < TXN_MAX_RETRY_TIMES {
retry += 1;
let res = get_db_or_err(
self,
tenant_dbname,
Expand Down Expand Up @@ -267,6 +277,10 @@ impl<KV: KVApi> SchemaApi for KV {
}
}
}

Err(MetaError::AppError(AppError::TxnRetryMaxTimes(
TxnRetryMaxTimes::new("drop_database", TXN_MAX_RETRY_TIMES),
)))
}

async fn undrop_database(
Expand All @@ -275,7 +289,9 @@ impl<KV: KVApi> SchemaApi for KV {
) -> Result<UndropDatabaseReply, MetaError> {
let name_key = &req.name_ident;

loop {
let mut retry = 0;
while retry < TXN_MAX_RETRY_TIMES {
retry += 1;
let res =
get_db_or_err(self, name_key, format!("undrop_database: {}", &name_key)).await;

Expand Down Expand Up @@ -365,6 +381,10 @@ impl<KV: KVApi> SchemaApi for KV {
}
}
}

Err(MetaError::AppError(AppError::TxnRetryMaxTimes(
TxnRetryMaxTimes::new("undrop_database", TXN_MAX_RETRY_TIMES),
)))
}

async fn rename_database(
Expand All @@ -377,7 +397,9 @@ impl<KV: KVApi> SchemaApi for KV {
db_name: req.new_db_name.clone(),
};

loop {
let mut retry = 0;
while retry < TXN_MAX_RETRY_TIMES {
retry += 1;
// get old db, not exists return err
let (old_db_id_seq, old_db_id) = get_u64_value(self, tenant_dbname).await?;
if req.if_exists {
Expand Down Expand Up @@ -496,6 +518,10 @@ impl<KV: KVApi> SchemaApi for KV {
}
}
}

Err(MetaError::AppError(AppError::TxnRetryMaxTimes(
TxnRetryMaxTimes::new("rename_database", TXN_MAX_RETRY_TIMES),
)))
}

async fn get_database(&self, req: GetDatabaseReq) -> Result<Arc<DatabaseInfo>, MetaError> {
Expand Down Expand Up @@ -651,7 +677,9 @@ impl<KV: KVApi> SchemaApi for KV {
)));
}

loop {
let mut retry = 0;
while retry < TXN_MAX_RETRY_TIMES {
retry += 1;
// Get db by name to ensure presence

let (_, db_id, db_meta_seq, db_meta) =
Expand Down Expand Up @@ -769,6 +797,10 @@ impl<KV: KVApi> SchemaApi for KV {
}
}
}

Err(MetaError::AppError(AppError::TxnRetryMaxTimes(
TxnRetryMaxTimes::new("create_table", TXN_MAX_RETRY_TIMES),
)))
}

async fn drop_table(&self, req: DropTableReq) -> Result<DropTableReply, MetaError> {
Expand All @@ -778,7 +810,9 @@ impl<KV: KVApi> SchemaApi for KV {
let mut tb_count = 0;
let mut tb_count_seq;

loop {
let mut retry = 0;
while retry < TXN_MAX_RETRY_TIMES {
retry += 1;
// Get db by name to ensure presence

let (_, db_id, db_meta_seq, db_meta) =
Expand Down Expand Up @@ -886,6 +920,10 @@ impl<KV: KVApi> SchemaApi for KV {
}
}
}

Err(MetaError::AppError(AppError::TxnRetryMaxTimes(
TxnRetryMaxTimes::new("drop_table", TXN_MAX_RETRY_TIMES),
)))
}

async fn undrop_table(&self, req: UndropTableReq) -> Result<UndropTableReply, MetaError> {
Expand All @@ -895,7 +933,9 @@ impl<KV: KVApi> SchemaApi for KV {
let mut tb_count = 0;
let mut tb_count_seq;

loop {
let mut retry = 0;
while retry < TXN_MAX_RETRY_TIMES {
retry += 1;
// Get db by name to ensure presence

let (_, db_id, db_meta_seq, db_meta) =
Expand Down Expand Up @@ -1029,6 +1069,10 @@ impl<KV: KVApi> SchemaApi for KV {
}
}
}

Err(MetaError::AppError(AppError::TxnRetryMaxTimes(
TxnRetryMaxTimes::new("undrop_table", TXN_MAX_RETRY_TIMES),
)))
}

async fn rename_table(&self, req: RenameTableReq) -> Result<RenameTableReply, MetaError> {
Expand All @@ -1040,7 +1084,9 @@ impl<KV: KVApi> SchemaApi for KV {
table_name: req.new_table_name.clone(),
};

loop {
let mut retry = 0;
while retry < TXN_MAX_RETRY_TIMES {
retry += 1;
// Get db by name to ensure presence

let (_, db_id, db_meta_seq, db_meta) =
Expand Down Expand Up @@ -1207,6 +1253,10 @@ impl<KV: KVApi> SchemaApi for KV {
}
}
}

Err(MetaError::AppError(AppError::TxnRetryMaxTimes(
TxnRetryMaxTimes::new("rename_table", TXN_MAX_RETRY_TIMES),
)))
}

async fn get_table(&self, req: GetTableReq) -> Result<Arc<TableInfo>, MetaError> {
Expand Down
29 changes: 29 additions & 0 deletions common/meta/types/src/app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,22 @@ impl UnknownShareId {
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, thiserror::Error)]
#[error("TxnRetryMaxTimes: Txn {op} has retry {max_retry} times, abort.")]
pub struct TxnRetryMaxTimes {
op: String,
max_retry: u32,
}

impl TxnRetryMaxTimes {
pub fn new(op: &str, max_retry: u32) -> Self {
Self {
op: op.to_string(),
max_retry,
}
}
}

#[derive(thiserror::Error, serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
pub enum AppError {
#[error(transparent)]
Expand Down Expand Up @@ -372,6 +388,9 @@ pub enum AppError {

#[error(transparent)]
UnknownShareId(#[from] UnknownShareId),

#[error(transparent)]
TxnRetryMaxTimes(#[from] TxnRetryMaxTimes),
}

impl AppErrorMessage for UnknownDatabase {
Expand Down Expand Up @@ -452,6 +471,15 @@ impl AppErrorMessage for UnknownShareId {
}
}

impl AppErrorMessage for TxnRetryMaxTimes {
fn message(&self) -> String {
format!(
"TxnRetryMaxTimes: Txn {} has retry {} times",
self.op, self.max_retry
)
}
}

impl AppErrorMessage for UndropTableWithNoDropTime {
fn message(&self) -> String {
format!("Undrop table '{}' with no drop_on time", self.table_name)
Expand Down Expand Up @@ -512,6 +540,7 @@ impl From<AppError> for ErrorCode {
AppError::ShareAlreadyExists(err) => ErrorCode::ShareAlreadyExists(err.message()),
AppError::UnknownShare(err) => ErrorCode::UnknownShare(err.message()),
AppError::UnknownShareId(err) => ErrorCode::UnknownShareId(err.message()),
AppError::TxnRetryMaxTimes(err) => ErrorCode::TxnRetryMaxTimes(err.message()),
}
}
}

0 comments on commit b180407

Please sign in to comment.