Skip to content

Commit

Permalink
Merge pull request #4838 from ariesdevil/dev
Browse files Browse the repository at this point in the history
[MetaSrv]rename table should keep table_id nochange
  • Loading branch information
drmingdrmer authored Apr 13, 2022
2 parents 698c025 + 95e3880 commit d4baa9c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 106 deletions.
8 changes: 4 additions & 4 deletions common/meta/api/src/meta_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ impl MetaApiTestSuite {

let got = mt.get_table((tenant, db_name, new_tbl_name).into()).await?;
let want = TableInfo {
ident: TableIdent::new(2, 2),
ident: TableIdent::new(1, 2),
desc: format!("'{}'.'{}'.'{}'", tenant, db_name, new_tbl_name),
name: new_tbl_name.into(),
meta: table_meta(created_on),
Expand Down Expand Up @@ -795,12 +795,12 @@ impl MetaApiTestSuite {
tracing::info!("--- create table again after rename, ok");
{
let res = mt.create_table(req.clone()).await?;
assert_eq!(3, res.table_id, "table id is 3");
assert_eq!(2, res.table_id, "table id should be 2");

let got = mt.get_table((tenant, db_name, tbl_name).into()).await?;

let want = TableInfo {
ident: TableIdent::new(3, 3),
ident: TableIdent::new(2, 3),
desc: format!("'{}'.'{}'.'{}'", tenant, db_name, tbl_name),
name: tbl_name.into(),
meta: table_meta(created_on),
Expand Down Expand Up @@ -882,7 +882,7 @@ impl MetaApiTestSuite {
.get_table((tenant, new_db_name, new_tbl_name).into())
.await?;
let want = TableInfo {
ident: TableIdent::new(4, 4),
ident: TableIdent::new(2, 4),
desc: format!("'{}'.'{}'.'{}'", tenant, new_db_name, new_tbl_name),
name: new_tbl_name.into(),
meta: table_meta(created_on),
Expand Down
111 changes: 9 additions & 102 deletions common/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,25 +221,6 @@ impl StateMachine {
Ok((snap, last_applied, snapshot_id))
}

/// Internal func to get an auto-incr seq number.
/// It is just what Cmd::IncrSeq does and is also used by Cmd that requires
/// a unique id such as Cmd::AddDatabase which needs make a new database id.
///
/// Note: this can only be called inside apply().
async fn incr_seq(&self, key: &str) -> MetaResult<u64> {
let sequences = self.sequences();

let curr = sequences
.update_and_fetch(&key.to_string(), |old| Some(old.unwrap_or_default() + 1))
.await?;

let curr = curr.unwrap();

tracing::debug!("applied IncrSeq: {}={}", key, curr);

Ok(curr.0)
}

/// Apply an log entry to state machine.
///
/// If a duplicated log entry is detected by checking data.txid, no update
Expand Down Expand Up @@ -468,7 +449,7 @@ impl StateMachine {
let db_id = self.txn_get_database_id(tenant, db_name, txn_tree)?;

let (table_id, prev, result) =
self.txn_create_table(txn_tree, db_id, table_name, table_meta)?;
self.txn_create_table(txn_tree, db_id, None, table_name, table_meta)?;
if prev.is_some() {
return Ok(AppliedState::TableMeta(Change::nochange_with_id(
table_id.unwrap(),
Expand Down Expand Up @@ -517,7 +498,7 @@ impl StateMachine {
txn_tree: &TransactionSledTree,
) -> MetaStorageResult<AppliedState> {
let db_id = self.txn_get_database_id(tenant, db_name, txn_tree)?;
let (_, prev, result) = self.txn_drop_table(txn_tree, db_id, table_name)?;
let (table_id, prev, result) = self.txn_drop_table(txn_tree, db_id, table_name)?;
if prev.is_none() {
return Err(MetaStorageError::AppError(AppError::UnknownTable(
UnknownTable::new(table_name, "apply_rename_table_cmd"),
Expand All @@ -528,7 +509,7 @@ impl StateMachine {
let table_meta = &prev.as_ref().unwrap().data;
let db_id = self.txn_get_database_id(tenant, new_db_name, txn_tree)?;
let (new_table_id, new_prev, new_result) =
self.txn_create_table(txn_tree, db_id, new_table_name, table_meta)?;
self.txn_create_table(txn_tree, db_id, table_id, new_table_name, table_meta)?;
if new_prev.is_some() {
return Err(MetaStorageError::AppError(AppError::TableAlreadyExists(
TableAlreadyExists::new(new_table_name, "apply_rename_table_cmd"),
Expand Down Expand Up @@ -704,71 +685,6 @@ impl StateMachine {
}
}

async fn sub_tree_upsert<'s, V, KS>(
&'s self,
sub_tree: AsKeySpace<'s, KS>,
key: &KS::K,
seq: &MatchSeq,
value_op: Operation<V>,
value_meta: Option<KVMeta>,
) -> MetaResult<(Option<SeqV<V>>, Option<SeqV<V>>)>
where
V: Clone + Debug,
KS: SledKeySpace<V = SeqV<V>>,
{
// TODO(xp): need to be done all in a tx

let prev = sub_tree.get(key)?;

// If prev is timed out, treat it as a None.
let prev = Self::unexpired_opt(prev);

if seq.match_seq(&prev).is_err() {
return Ok((prev.clone(), prev));
}

// result is the state after applying an operation.
let result = self
.sub_tree_do_update(&sub_tree, key, prev.clone(), value_meta, value_op)
.await?;

tracing::debug!("applied upsert: {} {:?}", key, result);
Ok((prev, result))
}

async fn sub_tree_do_update<'s, V, KS>(
&'s self,
sub_tree: &AsKeySpace<'s, KS>,
key: &KS::K,
prev: Option<SeqV<V>>,
value_meta: Option<KVMeta>,
value_op: Operation<V>,
) -> MetaResult<Option<SeqV<V>>>
where
V: Clone + Debug,
KS: SledKeySpace<V = SeqV<V>>,
{
let mut seq_kv_value = match value_op {
Operation::Update(v) => SeqV::with_meta(0, value_meta.clone(), v),
Operation::Delete => {
sub_tree.remove(key, true).await?;
return Ok(None);
}
Operation::AsIs => match prev {
None => return Ok(None),
Some(ref prev_kv_value) => prev_kv_value.clone().set_meta(value_meta),
},
};

// insert the updated record.

seq_kv_value.seq = self.incr_seq(KS::NAME).await?;

sub_tree.insert(key, &seq_kv_value).await?;

Ok(Some(seq_kv_value))
}

fn txn_incr_seq(&self, key: &str, txn_tree: &TransactionSledTree) -> MetaStorageResult<u64> {
let seq_sub_tree = txn_tree.key_space::<Sequences>();

Expand Down Expand Up @@ -878,6 +794,7 @@ impl StateMachine {
&self,
txn_tree: &TransactionSledTree,
db_id: u64,
table_id: Option<u64>,
table_name: &str,
table_meta: &TableMeta,
) -> MetaStorageResult<(
Expand All @@ -902,7 +819,11 @@ impl StateMachine {
}

let table_meta = table_meta.clone();
let table_id = self.txn_incr_seq(SEQ_TABLE_ID, txn_tree)?;
let table_id = if let Some(table_id) = table_id {
table_id
} else {
self.txn_incr_seq(SEQ_TABLE_ID, txn_tree)?
};

self.txn_sub_tree_upsert(
&table_lookup_tree,
Expand Down Expand Up @@ -1090,20 +1011,6 @@ impl StateMachine {
Ok(x)
}

pub async fn upsert_table(
&self,
table_id: u64,
tbl: TableMeta,
seq: &MatchSeq,
) -> Result<Option<SeqV<TableMeta>>, MetaError> {
let tables = self.tables();
let (_prev, result) = self
.sub_tree_upsert(tables, &table_id, seq, Operation::Update(tbl), None)
.await?;
self.incr_seq(SEQ_DATABASE_META_ID).await?; // need this?
Ok(result)
}

pub fn unexpired_opt<V: Debug>(seq_value: Option<SeqV<V>>) -> Option<SeqV<V>> {
seq_value.and_then(Self::unexpired)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
1
1
1
1

0 comments on commit d4baa9c

Please sign in to comment.