Skip to content

Commit

Permalink
[#24357] docdb: support advisory lock operation
Browse files Browse the repository at this point in the history
Summary:
This diff introduces YBPgsqlLockOp (inherited from YBOperation). The caller can flush such op to DocDB with YBClient.

Each YBPgsqlLockOp refers to one lock, each lock should specify:
1. Lock mode: PG_LOCK_SHARE, PG_LOCK_EXCLUSIVE
2. Lock id: dbid, classid, objid, objsubid
3. Acquire with blocking or non-blocking method.

Such op is sent to pg_advisory_locks tablet leader with WriteRpc. Conflict Resolution is applied.
When all conflicts are resolved, the lock ops will be replicated with RAFT as WriteOperation.
When the WriteOperation is RAFT majority-replicated, the intent record corresponding to advisory lock will be written to intentsdb.
The format of the intent is `([dboid],[classid,objid,objsubid]),[IntentTypes],HybridTime => TxnId,kRowLock`. See `TransactionalWriter::Apply`.
e.g.
1. Intent of a shared lock:
```
SubDocKey(DocKey(0xbde6, [10000], [0, 0, 1]), []) [kStrongRead] HT{ days: 20062 time: 04:53:29.037186 } => TransactionId(649c099f-a889-42dc-a99e-d6f258aeed5b) WriteId(0) l
```
2. Intent of an exclusive lock:
```
SubDocKey(DocKey(0xbde6, [10000], [0, 0, 1]), []) [kStrongRead, kStrongWrite] HT{ days: 20062 time: 04:53:29.037186 } => TransactionId(649c099f-a889-42dc-a99e-d6f258aeed5b) WriteId(0) l
```

When a transaction is committed/aborted, all locks held by it will be automatically released. This is relying on the existing transaction processing path.

**Upgrade/Rollback safety:**
The feature is guarded by a TServer gflag yb_enable_advisory_lock (false by default).
Jira: DB-13269

Test Plan:
advisory_lock-test
advisory_lock_doc_operation-test.cc

Reviewers: bkolagani, hsunder

Reviewed By: bkolagani, hsunder

Subscribers: patnaik.balivada, rthallam, ybase, yql, qhu

Differential Revision: https://phorge.dev.yugabyte.com/D40471
  • Loading branch information
Huqicheng committed Dec 10, 2024
1 parent e733595 commit c56901e
Show file tree
Hide file tree
Showing 34 changed files with 828 additions and 85 deletions.
276 changes: 258 additions & 18 deletions src/yb/client/advisory_lock-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,60 @@
// under the License.
//

#include "yb/client/meta_cache.h"
#include "yb/client/session.h"
#include "yb/client/table.h"
#include "yb/client/transaction.h"
#include "yb/client/transaction_pool.h"
#include "yb/client/yb_op.h"
#include "yb/client/yb_table_name.h"
#include "yb/common/transaction_error.h"
#include "yb/integration-tests/cluster_itest_util.h"
#include "yb/master/master_defaults.h"
#include "yb/tablet/tablet.h"
#include "yb/tablet/tablet_peer.h"
#include "yb/tserver/mini_tablet_server.h"
#include "yb/tserver/tablet_server.h"
#include "yb/tserver/ysql_advisory_lock_table.h"

#include "yb/client/meta_cache.h"

#include "yb/integration-tests/mini_cluster.h"
#include "yb/integration-tests/yb_mini_cluster_test_base.h"

#include "yb/rpc/sidecars.h"
#include "yb/util/test_thread_holder.h"

DECLARE_int32(catalog_manager_bg_task_wait_ms);
DECLARE_uint32(num_advisory_locks_tablets);
DECLARE_bool(yb_enable_advisory_lock);

namespace yb {
namespace client {

const int kNumAdvisoryLocksTablets = 1;
const uint32_t kDBOid = 10000;

void CheckNumIntents(MiniCluster* cluster, size_t expected_num_records, const TableId& id = "") {
auto peers = ListTableActiveTabletLeadersPeers(cluster, id);
bool found = false;
for (const auto& peer : peers) {
if (!peer->IsLeaderAndReady()) {
continue;
}
found = true;
auto count = ASSERT_RESULT(
peer->tablet()->TEST_CountDBRecords(docdb::StorageDbType::kIntents));
LOG(INFO) << peer->LogPrefix() << "records: " << count;
ASSERT_EQ(count, expected_num_records);
}
ASSERT_TRUE(found) << "No active leader found";
}

bool IsStatusSkipLocking(const Status& s) {
if (s.ok() || !s.IsInternalError()) {
return false;
}
const TransactionError txn_err(s);
return txn_err.value() == TransactionErrorCode::kSkipLocking;
}

class AdvisoryLockTest: public MiniClusterTestWithClient<MiniCluster> {
public:
Expand All @@ -43,39 +80,244 @@ class AdvisoryLockTest: public MiniClusterTestWithClient<MiniCluster> {

ASSERT_OK(CreateClient());
if (ANNOTATE_UNPROTECTED_READ(FLAGS_yb_enable_advisory_lock)) {
ASSERT_OK(WaitForCreateTableToFinish());
ASSERT_OK(WaitForCreateTableToFinishAndLoadTable());
}
sidecars_ = std::make_unique<rpc::Sidecars>();
}

Status WaitForCreateTableToFinish() {
YBTableName table_name(
Status WaitForCreateTableToFinishAndLoadTable() {
client::YBTableName table_name(
YQL_DATABASE_CQL, master::kSystemNamespaceName, kPgAdvisoryLocksTableName);
return client_->WaitForCreateTableToFinish(
table_name, CoarseMonoClock::Now() + 10s * kTimeMultiplier);
RETURN_NOT_OK(client_->WaitForCreateTableToFinish(
table_name, CoarseMonoClock::Now() + 10s * kTimeMultiplier));
advisory_locks_table_ = GetYsqlAdvisoryLocksTable();
table_ = VERIFY_RESULT(advisory_locks_table_->GetTable());
return Status::OK();
}

Status CheckNumTablets(const YBTablePtr& table) {
auto future = client_->LookupAllTabletsFuture(table, CoarseMonoClock::Now() + 10s);
SCHECK_EQ(VERIFY_RESULT(future.get()).size(),
Status CheckNumTablets() {
SCHECK_EQ(VERIFY_RESULT(GetTablets()).size(),
ANNOTATE_UNPROTECTED_READ(FLAGS_num_advisory_locks_tablets),
IllegalState, "tablet number mismatch");
return Status::OK();
}

Result<std::vector<client::internal::RemoteTabletPtr>> GetTablets() {
CHECK_NOTNULL(table_.get());
auto future = client_->LookupAllTabletsFuture(table_, CoarseMonoClock::Now() + 10s);
return future.get();
}

std::unique_ptr<YsqlAdvisoryLocksTable> GetYsqlAdvisoryLocksTable() {
return std::make_unique<YsqlAdvisoryLocksTable>(*client_.get());
}

Result<client::YBTablePtr> GetTable() {
return GetYsqlAdvisoryLocksTable()->GetTable();
}

Result<client::YBTransactionPtr> StartTransaction(
IsolationLevel level = IsolationLevel::SNAPSHOT_ISOLATION) {
auto* server = cluster_->mini_tablet_server(0)->server();
auto& pool = server->TransactionPool();
auto txn = VERIFY_RESULT(pool.TakeAndInit(SNAPSHOT_ISOLATION, TransactionRpcDeadline()));
RETURN_NOT_OK(txn->SetPgTxnStart(server->Clock()->Now().GetPhysicalValueMicros()));
return txn;
}

Status Commit(client::YBTransactionPtr txn) {
auto commit_future = txn->CommitFuture(TransactionRpcDeadline());
return commit_future.get();
}

protected:
virtual void SetFlags() {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_advisory_lock) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_num_advisory_locks_tablets) = kNumAdvisoryLocksTablets;
}

std::unique_ptr<rpc::Sidecars> sidecars_;
std::unique_ptr<YsqlAdvisoryLocksTable> advisory_locks_table_;
client::YBTablePtr table_;
};

TEST_F(AdvisoryLockTest, TestAdvisoryLockTableCreated) {
auto table = GetYsqlAdvisoryLocksTable();
ASSERT_OK(CheckNumTablets(ASSERT_RESULT(table->GetTable())));
ASSERT_OK(CheckNumTablets());
}

TEST_F(AdvisoryLockTest, AcquireXactExclusiveLock_Int8) {
auto session = NewSession();
auto txn = ASSERT_RESULT(StartTransaction());
session->SetTransaction(txn);
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 3, table_->id());

// Acquire the same lock in the same session with non blocking mode.
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 5, table_->id());
ASSERT_OK(Commit(txn));
}

TEST_F(AdvisoryLockTest, AcquireXactExclusiveLock_Int4) {
auto session = NewSession();
auto txn = ASSERT_RESULT(StartTransaction());
session->SetTransaction(txn);
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 1, 1, 2, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 3, table_->id());

// Acquire the same lock in the same session with non blocking mode.
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 1, 1, 2, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 5, table_->id());
ASSERT_OK(Commit(txn));
}

TEST_F(AdvisoryLockTest, TryAcquireXactExclusiveLock) {
auto session = NewSession();
auto txn = ASSERT_RESULT(StartTransaction());
session->SetTransaction(txn);
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 3, table_->id());

auto session2 = NewSession();
auto txn2 = ASSERT_RESULT(StartTransaction());
session2->SetTransaction(txn2);
// Acquire the same lock in a different session with non blocking mode.
auto s = session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ false, sidecars_.get())));
ASSERT_TRUE(IsStatusSkipLocking(s)) << s;

ASSERT_OK(Commit(txn));
ASSERT_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ false, sidecars_.get()))));

ASSERT_OK(Commit(txn2));
}

TEST_F(AdvisoryLockTest, AcquireAdvisoryLockWithoutTransaction) {
auto session = NewSession();
auto s = session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get())));
LOG(INFO) << s;
ASSERT_NOK(s);
ASSERT_TRUE(s.IsInvalidArgument());
ASSERT_STR_CONTAINS(s.message().ToBuffer(), "No transaction found in write batch");
CheckNumIntents(cluster_.get(), 0, table_->id());
}

TEST_F(AdvisoryLockTest, AcquireLocksInDifferentDBs) {
// Locks acquired in different DBs shouldn't block each other.
auto session = NewSession();
auto txn = ASSERT_RESULT(StartTransaction());
session->SetTransaction(txn);
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 3, table_->id());

auto session2 = NewSession();
auto txn2 = ASSERT_RESULT(StartTransaction());
session2->SetTransaction(txn2);
ASSERT_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid + 1, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 6, table_->id());
}

TEST_F(AdvisoryLockTest, ShareLocks) {
// Locks acquired in different DBs shouldn't block each other.
auto session = NewSession();
auto txn = ASSERT_RESULT(StartTransaction());
session->SetTransaction(txn);
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_SHARE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 3, table_->id());

auto session2 = NewSession();
auto txn2 = ASSERT_RESULT(StartTransaction());
session2->SetTransaction(txn2);
ASSERT_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_SHARE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 6, table_->id());

auto s = session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ false, sidecars_.get())));
ASSERT_TRUE(IsStatusSkipLocking(s)) << s;
}

TEST_F(AdvisoryLockTest, WaitOnConflict) {
auto session = NewSession();
auto txn = ASSERT_RESULT(StartTransaction());
session->SetTransaction(txn);
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 3, table_->id());

auto session2 = NewSession();
auto txn2 = ASSERT_RESULT(StartTransaction());
session2->SetTransaction(txn2);
TestThreadHolder thread_holder;
std::atomic_bool session2_locked{false};
thread_holder.AddThreadFunctor([session2, this, &session2_locked] {
ASSERT_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
session2_locked.store(true);
});
SleepFor(1s);
ASSERT_FALSE(session2_locked.load());
ASSERT_OK(Commit(txn));
thread_holder.JoinAll();
}

TEST_F(AdvisoryLockTest, LeaderChange) {
// Acquired locks should be found after leader change.
auto session = NewSession();
auto txn = ASSERT_RESULT(StartTransaction());
session->SetTransaction(txn);
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 3, table_->id());

auto tablets = ASSERT_RESULT(GetTablets());
ASSERT_EQ(tablets.size(), 1);
auto id = tablets[0]->tablet_id();
auto peer = ASSERT_RESULT(GetLeaderPeerForTablet(cluster_.get(), id));

// Stepdown the leader.
auto map = ASSERT_RESULT(itest::CreateTabletServerMap(
ASSERT_RESULT(cluster_->GetLeaderMasterProxy<master::MasterClusterProxy>()),
&client_->proxy_cache()));
ASSERT_OK(itest::LeaderStepDown(
map[peer->permanent_uuid()].get(), id, /* new_leader= */ nullptr, 10s));

// Another session shouldn't be able to acquire the lock.
auto session2 = NewSession();
auto txn2 = ASSERT_RESULT(StartTransaction());
session2->SetTransaction(txn2);
// Acquire the same lock in a different session with non blocking mode.
auto s = session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ false, sidecars_.get())));
ASSERT_TRUE(IsStatusSkipLocking(s)) << s;
CheckNumIntents(cluster_.get(), 3, table_->id());
}

class AdvisoryLocksDisabledTest : public AdvisoryLockTest {
Expand All @@ -87,16 +329,14 @@ class AdvisoryLocksDisabledTest : public AdvisoryLockTest {
};

TEST_F(AdvisoryLocksDisabledTest, ToggleAdvisoryLockFlag) {
auto table = GetYsqlAdvisoryLocksTable();
// Wait for the background task to run a few times.
SleepFor(FLAGS_catalog_manager_bg_task_wait_ms * kTimeMultiplier * 3ms);
auto res = table->GetTable();
auto res = GetTable();
ASSERT_NOK(res);
ASSERT_TRUE(res.status().IsNotSupported());
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_advisory_lock) = true;
ASSERT_OK(WaitForCreateTableToFinish());
ASSERT_OK(CheckNumTablets(ASSERT_RESULT(table->GetTable())));
ASSERT_OK(WaitForCreateTableToFinishAndLoadTable());
ASSERT_OK(CheckNumTablets());
}

} // namespace client
} // namespace yb
Loading

0 comments on commit c56901e

Please sign in to comment.