Skip to content

Commit

Permalink
fix: change logic to wait root node at waiting list for
Browse files Browse the repository at this point in the history
  • Loading branch information
thawk105 committed Dec 6, 2023
1 parent 99e25ea commit a285a91
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 13 deletions.
41 changes: 31 additions & 10 deletions src/concurrency_control/ongoing_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,35 @@ bool ongoing_tx::exist_id(std::size_t id) {

Status ongoing_tx::waiting_bypass(session* ti) {
// @pre shared lock to tx_info_

auto exist_living_wait_for = [](session* target_ti) {
auto wait_for{target_ti->extract_wait_for()};
for (auto&& elem : tx_info_) {
auto the_tx_id = std::get<ongoing_tx::index_id>(elem);
auto f_itr = wait_for.find(the_tx_id);
if (f_itr != wait_for.end()) { return true; }
}
return false;
};

/**
* 現時点で前置候補の LTX群。これらで走行中のものをバイパスする
* 現時点で前置候補の LTX群。これらで走行中のものをルート以外バイパスする
*/
auto wait_for{ti->extract_wait_for()};

std::set<std::tuple<std::size_t, session*>> bypass_target{};
for (auto&& elem : tx_info_) {
auto the_tx_id = std::get<ongoing_tx::index_id>(elem);
auto f_itr = wait_for.find(the_tx_id);
if (f_itr != wait_for.end()) {
// found
auto* token = std::get<ongoing_tx::index_session>(elem);

// check exist living wait for, for not to remove path to root.
if (!exist_living_wait_for(token)) {
// not bypass for tree root
continue;
}

bypass_target.insert(std::make_tuple(the_tx_id, token));
// set valid epoch if need
if (ti->get_valid_epoch() > token->get_valid_epoch()) {
Expand Down Expand Up @@ -168,14 +185,18 @@ bool ongoing_tx::exist_wait_for(session* ti, Status& out_status) {
wait_for.end()) {
// wait_for hit.
/**
* boundary wait 確定.
* waiting by pass: 自分が(前置するかもしれなくて)待つ相手x1
* に対する前置を確定するとともに、x1 が前置する相手に前置する。
* これは待ち確認のたびにパスを一つ短絡化するため、
* get_requested_commit() の確認を噛ませていない。
* */
out_status = waiting_bypass(ti);
return out_status == Status::OK;
* boundary wait 確定.
* waiting by pass: 自分が(前置するかもしれなくて)待つ相手x1
* に対する前置を確定するとともに、x1 が前置する相手に前置する。
* これは待ち確認のたびにパスを一つ短絡化するため、
* get_requested_commit() の確認を噛ませていない。
* https://github.com/project-tsurugi/tsurugi-issues/issues/438#issuecomment-1839876140
* ルートになるまでパスを縮めてはいけない。
*/
if (wait_for.size() > 2) {
out_status = waiting_bypass(ti);
}
return true;
}
} else {
// considering for only high priori ltx
Expand Down
16 changes: 13 additions & 3 deletions test/concurrency_control/complicated/tsurugi_issue438_4_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ TEST_F(tsurugi_issue438_4_test, simple) {
ASSERT_OK(tx_begin({t, transaction_type::SHORT}));
ASSERT_OK(insert(t, current_batch, "1", "1"));
ASSERT_OK(commit(t));

// tx2
ASSERT_OK(tx_begin({t, transaction_type::LONG, {receipts}}));
wait_epoch_update();
ScanHandle shd{};
Expand All @@ -63,6 +65,7 @@ TEST_F(tsurugi_issue438_4_test, simple) {
ASSERT_EQ(Status::WARN_SCAN_LIMIT, next(t, shd));
ASSERT_OK(close_scan(t, shd));

// tx3
Token t2;
ASSERT_OK(enter(t2));
ASSERT_OK(tx_begin({t2, transaction_type::SHORT}));
Expand All @@ -83,6 +86,7 @@ TEST_F(tsurugi_issue438_4_test, simple) {
ASSERT_OK(close_scan(t2, shd2));
ASSERT_OK(commit(t2));

// tx1
Token t3;
ASSERT_OK(enter(t3));
ASSERT_OK(tx_begin({t3, transaction_type::LONG}));
Expand All @@ -93,7 +97,7 @@ TEST_F(tsurugi_issue438_4_test, simple) {
ASSERT_OK(read_key_from_scan(t3, shd3, buf));
ASSERT_EQ(buf, "1");
ASSERT_OK(read_value_from_scan(t3, shd3, buf));
ASSERT_EQ(buf, "2");
ASSERT_EQ(buf, "2"); // read tx3
ASSERT_EQ(Status::WARN_SCAN_LIMIT, next(t3, shd3));
ASSERT_OK(close_scan(t3, shd3));
ASSERT_EQ(Status::WARN_NOT_FOUND,
Expand All @@ -113,11 +117,17 @@ TEST_F(tsurugi_issue438_4_test, simple) {
* expect waiting at once, but at becoming waiting, tx tries to waiting
* bypass and bypass t1, and not wait
*/
ASSERT_TRUE(commit(t3, cb));
ASSERT_FALSE(commit(t3, cb));
/**
* This needs for t3 aborts but it didn't exist at
* https://github.com/project-tsurugi/tsurugi-issues/issues/438#issuecomment-1835644560
*/
ASSERT_OK(insert(t, receipts, "2", "200"));
ASSERT_OK(commit(t)); // this needs to release waiting of t3
// note: this fits tsurugi issue 5 ex 3 but not fit for trace log
while (!was_called) { _mm_pause(); }
ASSERT_EQ(Status::ERR_CC, cb_rc); // expect error due to rub violation
ASSERT_EQ(reason_code::CC_LTX_READ_UPPER_BOUND_VIOLATION, rc);
ASSERT_OK(abort(t));

ASSERT_OK(leave(t));
ASSERT_OK(leave(t2));
Expand Down

0 comments on commit a285a91

Please sign in to comment.