Skip to content

Commit

Permalink
Merge pull request #6516 from lichuang/fix_txn_watch
Browse files Browse the repository at this point in the history
fix: fix txn watch kv changed bug
  • Loading branch information
BohuTANG authored Jul 7, 2022
2 parents d129b29 + b901753 commit a6fba28
Showing 1 changed file with 39 additions and 6 deletions.
45 changes: 39 additions & 6 deletions common/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub trait StateMachineSubscriber: Debug + Sync + Send {
fn kv_changed(&self, key: &str, prev: Option<SeqV>, current: Option<SeqV>);
}

type NotifyKVEvent = (String, Option<SeqV>, Option<SeqV>);

/// The state machine of the `MemStore`.
/// It includes user data and two raft-related informations:
/// `last_applied_logs` and `client_serial_responses` to achieve idempotence.
Expand Down Expand Up @@ -527,17 +529,22 @@ impl StateMachine {
txn_tree: &TransactionSledTree,
put: &TxnPutRequest,
resp: &mut TxnReply,
events: &mut Option<Vec<NotifyKVEvent>>,
) -> MetaStorageResult<()> {
let sub_tree = txn_tree.key_space::<GenericKV>();

let (prev, _result) = self.txn_sub_tree_upsert(
let (prev, result) = self.txn_sub_tree_upsert(
&sub_tree,
&put.key,
&MatchSeq::Any,
Operation::Update(put.value.clone()),
None,
)?;

if let Some(events) = events {
events.push((put.key.to_string(), prev.clone(), result));
}

let put_resp = TxnPutResponse {
key: put.key.clone(),
prev_value: if put.prev_value {
Expand All @@ -559,17 +566,22 @@ impl StateMachine {
txn_tree: &TransactionSledTree,
delete: &TxnDeleteRequest,
resp: &mut TxnReply,
events: &mut Option<Vec<NotifyKVEvent>>,
) -> MetaStorageResult<()> {
let sub_tree = txn_tree.key_space::<GenericKV>();

let (prev, _result) = self.txn_sub_tree_upsert(
let (prev, result) = self.txn_sub_tree_upsert(
&sub_tree,
&delete.key,
&MatchSeq::Any,
Operation::Delete,
None,
)?;

if let Some(events) = events {
events.push((delete.key.to_string(), prev.clone(), result));
}

let del_resp = TxnDeleteResponse {
key: delete.key.clone(),
success: prev.is_some(),
Expand All @@ -593,6 +605,7 @@ impl StateMachine {
delete_by_prefix: &TxnDeleteByPrefixRequest,
kv_pairs: Option<&DeleteByPrefixKeyMap>,
resp: &mut TxnReply,
events: &mut Option<Vec<NotifyKVEvent>>,
) -> MetaStorageResult<()> {
let mut count: u32 = 0;
if let Some(kv_pairs) = kv_pairs {
Expand All @@ -606,8 +619,13 @@ impl StateMachine {
Operation::Delete,
None,
);
if ret.is_ok() {

if let Ok(ret) = ret {
count += 1;

if let Some(events) = events {
events.push((key.to_string(), ret.0.clone(), ret.1));
}
}
}
}
Expand All @@ -632,24 +650,26 @@ impl StateMachine {
op: &TxnOp,
kv_pairs: Option<&DeleteByPrefixKeyMap>,
resp: &mut TxnReply,
events: &mut Option<Vec<NotifyKVEvent>>,
) -> MetaStorageResult<()> {
tracing::debug!(op = display(op), "txn execute TxnOp");
match &op.request {
Some(txn_op::Request::Get(get)) => {
self.txn_execute_get_operation(txn_tree, get, resp)?;
}
Some(txn_op::Request::Put(put)) => {
self.txn_execute_put_operation(txn_tree, put, resp)?;
self.txn_execute_put_operation(txn_tree, put, resp, events)?;
}
Some(txn_op::Request::Delete(delete)) => {
self.txn_execute_delete_operation(txn_tree, delete, resp)?;
self.txn_execute_delete_operation(txn_tree, delete, resp, events)?;
}
Some(txn_op::Request::DeleteByPrefix(delete_by_prefix)) => {
self.txn_execute_delete_by_prefix_operation(
txn_tree,
delete_by_prefix,
kv_pairs,
resp,
events,
)?;
}
None => {}
Expand Down Expand Up @@ -695,8 +715,21 @@ impl StateMachine {
responses: vec![],
};

let mut events: Option<Vec<NotifyKVEvent>> = if self.subscriber.is_some() {
Some(vec![])
} else {
None
};
for op in ops {
self.txn_execute_operation(txn_tree, op, kv_op_pairs, &mut resp)?;
self.txn_execute_operation(txn_tree, op, kv_op_pairs, &mut resp, &mut events)?;
}

if let Some(subscriber) = &self.subscriber {
if let Some(events) = events {
for event in events {
subscriber.kv_changed(&event.0, event.1, event.2);
}
}
}

Ok(AppliedState::TxnReply(resp))
Expand Down

1 comment on commit a6fba28

@vercel
Copy link

@vercel vercel bot commented on a6fba28 Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.vercel.app
databend.rs

Please sign in to comment.