From c56901ef7227f5f6aac7d75830a1516a12763469 Mon Sep 17 00:00:00 2001 From: qhu Date: Mon, 9 Dec 2024 21:30:35 +0000 Subject: [PATCH] [#24357] docdb: support advisory lock operation Summary: This diff introduces YBPgsqlLockOp (inherited from YBOperation). The caller can flush such op to DocDB with YBClient. Each YBPgsqlLockOp refers to one lock, each lock should specify: 1. Lock mode: PG_LOCK_SHARE, PG_LOCK_EXCLUSIVE 2. Lock id: dbid, classid, objid, objsubid 3. Acquire with blocking or non-blocking method. Such op is sent to pg_advisory_locks tablet leader with WriteRpc. Conflict Resolution is applied. When all conflicts are resolved, the lock ops will be replicated with RAFT as WriteOperation. When the WriteOperation is RAFT majority-replicated, the intent record corresponding to advisory lock will be written to intentsdb. The format of the intent is `([dboid],[classid,objid,objsubid]),[IntentTypes],HybridTime => TxnId,kRowLock`. See `TransactionalWriter::Apply`. e.g. 1. Intent of a shared lock: ``` SubDocKey(DocKey(0xbde6, [10000], [0, 0, 1]), []) [kStrongRead] HT{ days: 20062 time: 04:53:29.037186 } => TransactionId(649c099f-a889-42dc-a99e-d6f258aeed5b) WriteId(0) l ``` 2. Intent of an exclusive lock: ``` SubDocKey(DocKey(0xbde6, [10000], [0, 0, 1]), []) [kStrongRead, kStrongWrite] HT{ days: 20062 time: 04:53:29.037186 } => TransactionId(649c099f-a889-42dc-a99e-d6f258aeed5b) WriteId(0) l ``` When a transaction is committed/aborted, all locks held by it will be automatically released. This is relying on the existing transaction processing path. **Upgrade/Rollback safety:** The feature is guarded by a TServer gflag yb_enable_advisory_lock (false by default). Jira: DB-13269 Test Plan: advisory_lock-test advisory_lock_doc_operation-test.cc Reviewers: bkolagani, hsunder Reviewed By: bkolagani, hsunder Subscribers: patnaik.balivada, rthallam, ybase, yql, qhu Differential Revision: https://phorge.dev.yugabyte.com/D40471 --- src/yb/client/advisory_lock-test.cc | 276 ++++++++++++++++-- src/yb/client/async_rpc.cc | 61 ++-- src/yb/client/batcher.cc | 4 +- src/yb/client/client_fwd.h | 2 + src/yb/client/ql-stress-test.cc | 3 +- src/yb/client/yb_op.cc | 48 +++ src/yb/client/yb_op.h | 28 +- src/yb/common/pgsql_protocol.proto | 23 ++ src/yb/docdb/CMakeLists.txt | 3 +- .../docdb/advisory_lock_doc_operation-test.cc | 108 +++++++ src/yb/docdb/conflict_resolution.cc | 6 +- src/yb/docdb/doc_operation.h | 8 +- src/yb/docdb/doc_write_batch.cc | 21 +- src/yb/docdb/doc_write_batch.h | 22 +- src/yb/docdb/docdb-ttl-test.cc | 12 +- src/yb/docdb/docdb.cc | 4 +- src/yb/docdb/docdb.proto | 10 + src/yb/docdb/docdb_util.cc | 2 +- src/yb/docdb/pgsql_operation.cc | 72 +++++ src/yb/docdb/pgsql_operation.h | 40 +++ src/yb/docdb/rocksdb_writer.cc | 9 + src/yb/dockv/dockv.proto | 5 + src/yb/dockv/intent.cc | 10 + src/yb/dockv/intent.h | 3 + .../integration-tests/packed_row_test_base.cc | 3 +- src/yb/tablet/tablet.cc | 9 +- src/yb/tablet/tablet.h | 3 +- src/yb/tablet/write_query.cc | 72 ++++- src/yb/tablet/write_query.h | 2 + src/yb/tserver/tablet_service.cc | 1 + src/yb/tserver/tserver.proto | 3 + src/yb/tserver/ysql_advisory_lock_table.cc | 23 ++ src/yb/tserver/ysql_advisory_lock_table.h | 11 +- src/yb/yql/cql/ql/util/statement_result.cc | 6 +- 34 files changed, 828 insertions(+), 85 deletions(-) create mode 100644 src/yb/docdb/advisory_lock_doc_operation-test.cc diff --git a/src/yb/client/advisory_lock-test.cc b/src/yb/client/advisory_lock-test.cc index 445c580315f4..2e8d90510068 100644 --- a/src/yb/client/advisory_lock-test.cc +++ b/src/yb/client/advisory_lock-test.cc @@ -11,23 +11,60 @@ // under the License. // +#include "yb/client/meta_cache.h" +#include "yb/client/session.h" +#include "yb/client/table.h" +#include "yb/client/transaction.h" +#include "yb/client/transaction_pool.h" +#include "yb/client/yb_op.h" #include "yb/client/yb_table_name.h" +#include "yb/common/transaction_error.h" +#include "yb/integration-tests/cluster_itest_util.h" #include "yb/master/master_defaults.h" +#include "yb/tablet/tablet.h" +#include "yb/tablet/tablet_peer.h" +#include "yb/tserver/mini_tablet_server.h" +#include "yb/tserver/tablet_server.h" #include "yb/tserver/ysql_advisory_lock_table.h" -#include "yb/client/meta_cache.h" - #include "yb/integration-tests/mini_cluster.h" #include "yb/integration-tests/yb_mini_cluster_test_base.h" +#include "yb/rpc/sidecars.h" +#include "yb/util/test_thread_holder.h" + DECLARE_int32(catalog_manager_bg_task_wait_ms); DECLARE_uint32(num_advisory_locks_tablets); DECLARE_bool(yb_enable_advisory_lock); namespace yb { -namespace client { const int kNumAdvisoryLocksTablets = 1; +const uint32_t kDBOid = 10000; + +void CheckNumIntents(MiniCluster* cluster, size_t expected_num_records, const TableId& id = "") { + auto peers = ListTableActiveTabletLeadersPeers(cluster, id); + bool found = false; + for (const auto& peer : peers) { + if (!peer->IsLeaderAndReady()) { + continue; + } + found = true; + auto count = ASSERT_RESULT( + peer->tablet()->TEST_CountDBRecords(docdb::StorageDbType::kIntents)); + LOG(INFO) << peer->LogPrefix() << "records: " << count; + ASSERT_EQ(count, expected_num_records); + } + ASSERT_TRUE(found) << "No active leader found"; +} + +bool IsStatusSkipLocking(const Status& s) { + if (s.ok() || !s.IsInternalError()) { + return false; + } + const TransactionError txn_err(s); + return txn_err.value() == TransactionErrorCode::kSkipLocking; +} class AdvisoryLockTest: public MiniClusterTestWithClient { public: @@ -43,39 +80,244 @@ class AdvisoryLockTest: public MiniClusterTestWithClient { ASSERT_OK(CreateClient()); if (ANNOTATE_UNPROTECTED_READ(FLAGS_yb_enable_advisory_lock)) { - ASSERT_OK(WaitForCreateTableToFinish()); + ASSERT_OK(WaitForCreateTableToFinishAndLoadTable()); } + sidecars_ = std::make_unique(); } - Status WaitForCreateTableToFinish() { - YBTableName table_name( + Status WaitForCreateTableToFinishAndLoadTable() { + client::YBTableName table_name( YQL_DATABASE_CQL, master::kSystemNamespaceName, kPgAdvisoryLocksTableName); - return client_->WaitForCreateTableToFinish( - table_name, CoarseMonoClock::Now() + 10s * kTimeMultiplier); + RETURN_NOT_OK(client_->WaitForCreateTableToFinish( + table_name, CoarseMonoClock::Now() + 10s * kTimeMultiplier)); + advisory_locks_table_ = GetYsqlAdvisoryLocksTable(); + table_ = VERIFY_RESULT(advisory_locks_table_->GetTable()); + return Status::OK(); } - Status CheckNumTablets(const YBTablePtr& table) { - auto future = client_->LookupAllTabletsFuture(table, CoarseMonoClock::Now() + 10s); - SCHECK_EQ(VERIFY_RESULT(future.get()).size(), + Status CheckNumTablets() { + SCHECK_EQ(VERIFY_RESULT(GetTablets()).size(), ANNOTATE_UNPROTECTED_READ(FLAGS_num_advisory_locks_tablets), IllegalState, "tablet number mismatch"); return Status::OK(); } + Result> GetTablets() { + CHECK_NOTNULL(table_.get()); + auto future = client_->LookupAllTabletsFuture(table_, CoarseMonoClock::Now() + 10s); + return future.get(); + } + std::unique_ptr GetYsqlAdvisoryLocksTable() { return std::make_unique(*client_.get()); } + Result GetTable() { + return GetYsqlAdvisoryLocksTable()->GetTable(); + } + + Result StartTransaction( + IsolationLevel level = IsolationLevel::SNAPSHOT_ISOLATION) { + auto* server = cluster_->mini_tablet_server(0)->server(); + auto& pool = server->TransactionPool(); + auto txn = VERIFY_RESULT(pool.TakeAndInit(SNAPSHOT_ISOLATION, TransactionRpcDeadline())); + RETURN_NOT_OK(txn->SetPgTxnStart(server->Clock()->Now().GetPhysicalValueMicros())); + return txn; + } + + Status Commit(client::YBTransactionPtr txn) { + auto commit_future = txn->CommitFuture(TransactionRpcDeadline()); + return commit_future.get(); + } + protected: virtual void SetFlags() { ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_advisory_lock) = true; ANNOTATE_UNPROTECTED_WRITE(FLAGS_num_advisory_locks_tablets) = kNumAdvisoryLocksTablets; } + + std::unique_ptr sidecars_; + std::unique_ptr advisory_locks_table_; + client::YBTablePtr table_; }; TEST_F(AdvisoryLockTest, TestAdvisoryLockTableCreated) { - auto table = GetYsqlAdvisoryLocksTable(); - ASSERT_OK(CheckNumTablets(ASSERT_RESULT(table->GetTable()))); + ASSERT_OK(CheckNumTablets()); +} + +TEST_F(AdvisoryLockTest, AcquireXactExclusiveLock_Int8) { + auto session = NewSession(); + auto txn = ASSERT_RESULT(StartTransaction()); + session->SetTransaction(txn); + ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 3, table_->id()); + + // Acquire the same lock in the same session with non blocking mode. + ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 5, table_->id()); + ASSERT_OK(Commit(txn)); +} + +TEST_F(AdvisoryLockTest, AcquireXactExclusiveLock_Int4) { + auto session = NewSession(); + auto txn = ASSERT_RESULT(StartTransaction()); + session->SetTransaction(txn); + ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 1, 1, 2, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 3, table_->id()); + + // Acquire the same lock in the same session with non blocking mode. + ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 1, 1, 2, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 5, table_->id()); + ASSERT_OK(Commit(txn)); +} + +TEST_F(AdvisoryLockTest, TryAcquireXactExclusiveLock) { + auto session = NewSession(); + auto txn = ASSERT_RESULT(StartTransaction()); + session->SetTransaction(txn); + ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 3, table_->id()); + + auto session2 = NewSession(); + auto txn2 = ASSERT_RESULT(StartTransaction()); + session2->SetTransaction(txn2); + // Acquire the same lock in a different session with non blocking mode. + auto s = session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ false, sidecars_.get()))); + ASSERT_TRUE(IsStatusSkipLocking(s)) << s; + + ASSERT_OK(Commit(txn)); + ASSERT_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ false, sidecars_.get())))); + + ASSERT_OK(Commit(txn2)); +} + +TEST_F(AdvisoryLockTest, AcquireAdvisoryLockWithoutTransaction) { + auto session = NewSession(); + auto s = session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get()))); + LOG(INFO) << s; + ASSERT_NOK(s); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STR_CONTAINS(s.message().ToBuffer(), "No transaction found in write batch"); + CheckNumIntents(cluster_.get(), 0, table_->id()); +} + +TEST_F(AdvisoryLockTest, AcquireLocksInDifferentDBs) { + // Locks acquired in different DBs shouldn't block each other. + auto session = NewSession(); + auto txn = ASSERT_RESULT(StartTransaction()); + session->SetTransaction(txn); + ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 3, table_->id()); + + auto session2 = NewSession(); + auto txn2 = ASSERT_RESULT(StartTransaction()); + session2->SetTransaction(txn2); + ASSERT_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid + 1, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 6, table_->id()); +} + +TEST_F(AdvisoryLockTest, ShareLocks) { + // Locks acquired in different DBs shouldn't block each other. + auto session = NewSession(); + auto txn = ASSERT_RESULT(StartTransaction()); + session->SetTransaction(txn); + ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_SHARE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 3, table_->id()); + + auto session2 = NewSession(); + auto txn2 = ASSERT_RESULT(StartTransaction()); + session2->SetTransaction(txn2); + ASSERT_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_SHARE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 6, table_->id()); + + auto s = session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ false, sidecars_.get()))); + ASSERT_TRUE(IsStatusSkipLocking(s)) << s; +} + +TEST_F(AdvisoryLockTest, WaitOnConflict) { + auto session = NewSession(); + auto txn = ASSERT_RESULT(StartTransaction()); + session->SetTransaction(txn); + ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 3, table_->id()); + + auto session2 = NewSession(); + auto txn2 = ASSERT_RESULT(StartTransaction()); + session2->SetTransaction(txn2); + TestThreadHolder thread_holder; + std::atomic_bool session2_locked{false}; + thread_holder.AddThreadFunctor([session2, this, &session2_locked] { + ASSERT_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + session2_locked.store(true); + }); + SleepFor(1s); + ASSERT_FALSE(session2_locked.load()); + ASSERT_OK(Commit(txn)); + thread_holder.JoinAll(); +} + +TEST_F(AdvisoryLockTest, LeaderChange) { + // Acquired locks should be found after leader change. + auto session = NewSession(); + auto txn = ASSERT_RESULT(StartTransaction()); + session->SetTransaction(txn); + ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ true, sidecars_.get())))); + CheckNumIntents(cluster_.get(), 3, table_->id()); + + auto tablets = ASSERT_RESULT(GetTablets()); + ASSERT_EQ(tablets.size(), 1); + auto id = tablets[0]->tablet_id(); + auto peer = ASSERT_RESULT(GetLeaderPeerForTablet(cluster_.get(), id)); + + // Stepdown the leader. + auto map = ASSERT_RESULT(itest::CreateTabletServerMap( + ASSERT_RESULT(cluster_->GetLeaderMasterProxy()), + &client_->proxy_cache())); + ASSERT_OK(itest::LeaderStepDown( + map[peer->permanent_uuid()].get(), id, /* new_leader= */ nullptr, 10s)); + + // Another session shouldn't be able to acquire the lock. + auto session2 = NewSession(); + auto txn2 = ASSERT_RESULT(StartTransaction()); + session2->SetTransaction(txn2); + // Acquire the same lock in a different session with non blocking mode. + auto s = session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp( + kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, + /* wait= */ false, sidecars_.get()))); + ASSERT_TRUE(IsStatusSkipLocking(s)) << s; + CheckNumIntents(cluster_.get(), 3, table_->id()); } class AdvisoryLocksDisabledTest : public AdvisoryLockTest { @@ -87,16 +329,14 @@ class AdvisoryLocksDisabledTest : public AdvisoryLockTest { }; TEST_F(AdvisoryLocksDisabledTest, ToggleAdvisoryLockFlag) { - auto table = GetYsqlAdvisoryLocksTable(); // Wait for the background task to run a few times. SleepFor(FLAGS_catalog_manager_bg_task_wait_ms * kTimeMultiplier * 3ms); - auto res = table->GetTable(); + auto res = GetTable(); ASSERT_NOK(res); ASSERT_TRUE(res.status().IsNotSupported()); ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_advisory_lock) = true; - ASSERT_OK(WaitForCreateTableToFinish()); - ASSERT_OK(CheckNumTablets(ASSERT_RESULT(table->GetTable()))); + ASSERT_OK(WaitForCreateTableToFinishAndLoadTable()); + ASSERT_OK(CheckNumTablets()); } -} // namespace client } // namespace yb diff --git a/src/yb/client/async_rpc.cc b/src/yb/client/async_rpc.cc index b9aa2d08b7aa..efc64348218f 100644 --- a/src/yb/client/async_rpc.cc +++ b/src/yb/client/async_rpc.cc @@ -306,7 +306,8 @@ void AsyncRpc::Failed(const Status& status) { break; } case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED; - case YBOperation::Type::PGSQL_WRITE: { + case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED; + case YBOperation::Type::PGSQL_LOCK: { PgsqlResponsePB* resp = down_cast(yb_op)->mutable_response(); resp->set_status(status.IsTryAgain() ? PgsqlResponsePB::PGSQL_STATUS_RESTART_REQUIRED_ERROR : PgsqlResponsePB::PGSQL_STATUS_RUNTIME_ERROR); @@ -585,24 +586,33 @@ WriteRpc::WriteRpc(const AsyncRpcData& data) TRACE_TO(trace_, "WriteRpc initiated"); VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString()); - // Add the rows - switch (table()->table_type()) { - case YBTableType::REDIS_TABLE_TYPE: - FillOps( - ops_, YBOperation::Type::REDIS_WRITE, &req_, req_.mutable_redis_write_batch()); - break; - case YBTableType::YQL_TABLE_TYPE: - FillOps( - ops_, YBOperation::Type::QL_WRITE, &req_, req_.mutable_ql_write_batch()); - break; - case YBTableType::PGSQL_TABLE_TYPE: - FillOps( - ops_, YBOperation::Type::PGSQL_WRITE, &req_, req_.mutable_pgsql_write_batch()); - break; - case YBTableType::UNKNOWN_TABLE_TYPE: - case YBTableType::TRANSACTION_STATUS_TABLE_TYPE: - LOG(DFATAL) << "Unsupported table type: " << table()->ToString(); - break; + if (data.ops.front().yb_op->group() == OpGroup::kLock) { + FillOps( + ops_, YBOperation::Type::PGSQL_LOCK, &req_, req_.mutable_pgsql_lock_batch()); + // Set wait policy for non-blocking lock requests. + if (!down_cast(data.ops.front().yb_op.get())->mutable_request()->wait()) { + req_.mutable_write_batch()->set_wait_policy(WAIT_SKIP); + } + } else { + // Add the rows + switch (table()->table_type()) { + case YBTableType::REDIS_TABLE_TYPE: + FillOps( + ops_, YBOperation::Type::REDIS_WRITE, &req_, req_.mutable_redis_write_batch()); + break; + case YBTableType::YQL_TABLE_TYPE: + FillOps( + ops_, YBOperation::Type::QL_WRITE, &req_, req_.mutable_ql_write_batch()); + break; + case YBTableType::PGSQL_TABLE_TYPE: + FillOps( + ops_, YBOperation::Type::PGSQL_WRITE, &req_, req_.mutable_pgsql_write_batch()); + break; + case YBTableType::UNKNOWN_TABLE_TYPE: + case YBTableType::TRANSACTION_STATUS_TABLE_TYPE: + LOG(DFATAL) << "Unsupported table type: " << table()->ToString(); + break; + } } VLOG(3) << "Created batch for " << data.tablet->tablet_id() << ":\n" @@ -645,6 +655,7 @@ WriteRpc::~WriteRpc() { ReleaseOps(req_.mutable_redis_write_batch()); ReleaseOps(req_.mutable_ql_write_batch()); ReleaseOps(req_.mutable_pgsql_write_batch()); + ReleaseOps(req_.mutable_pgsql_lock_batch()); } void WriteRpc::CallRemoteMethod() { @@ -722,6 +733,17 @@ Status WriteRpc::SwapResponses() { pgsql_idx++; break; } + case YBOperation::Type::PGSQL_LOCK: { + if (pgsql_idx >= resp_.pgsql_response_batch().size()) { + ++pgsql_idx; + continue; + } + // Restore pgsql lock request and extract response. + auto* pgsql_op = down_cast(yb_op); + pgsql_op->mutable_response()->Swap(resp_.mutable_pgsql_response_batch(pgsql_idx)); + ++pgsql_idx; + break; + } case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED; case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED; case YBOperation::Type::QL_READ: @@ -855,6 +877,7 @@ Status ReadRpc::SwapResponses() { case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED; case YBOperation::Type::REDIS_WRITE: FALLTHROUGH_INTENDED; case YBOperation::Type::QL_WRITE: + case YBOperation::Type::PGSQL_LOCK: LOG(FATAL) << "Not a read operation " << op.yb_op->type(); break; } diff --git a/src/yb/client/batcher.cc b/src/yb/client/batcher.cc index 7d525c1ed1d9..6b6ef460b085 100644 --- a/src/yb/client/batcher.cc +++ b/src/yb/client/batcher.cc @@ -377,6 +377,7 @@ std::pair, std::map> case YBOperation::QL_WRITE: FALLTHROUGH_INTENDED; case YBOperation::PGSQL_READ: FALLTHROUGH_INTENDED; case YBOperation::PGSQL_WRITE: FALLTHROUGH_INTENDED; + case YBOperation::PGSQL_LOCK: FALLTHROUGH_INTENDED; case YBOperation::REDIS_READ: FALLTHROUGH_INTENDED; case YBOperation::REDIS_WRITE: { partition_contains_row = partition.ContainsKey(partition_key); @@ -655,7 +656,8 @@ std::shared_ptr Batcher::CreateRpc( }; switch (op_group) { - case OpGroup::kWrite: + case OpGroup::kWrite: FALLTHROUGH_INTENDED; + case OpGroup::kLock: return std::make_shared(data); case OpGroup::kLeaderRead: return std::make_shared(data, YBConsistencyLevel::STRONG); diff --git a/src/yb/client/client_fwd.h b/src/yb/client/client_fwd.h index 44d9071e7d82..01850fddd26d 100644 --- a/src/yb/client/client_fwd.h +++ b/src/yb/client/client_fwd.h @@ -61,6 +61,7 @@ class YBqlWriteOp; class YBPgsqlOp; class YBPgsqlReadOp; class YBPgsqlWriteOp; +class YBPgsqlLockOp; class YBRedisOp; class YBRedisReadOp; @@ -114,6 +115,7 @@ using YBqlReadOpPtr = std::shared_ptr; using YBqlWriteOpPtr = std::shared_ptr; using YBPgsqlReadOpPtr = std::shared_ptr; using YBPgsqlWriteOpPtr = std::shared_ptr; +using YBPgsqlLockOpPtr = std::shared_ptr; enum class YBTableType; diff --git a/src/yb/client/ql-stress-test.cc b/src/yb/client/ql-stress-test.cc index c0d24c6d5fee..f651ca04caa0 100644 --- a/src/yb/client/ql-stress-test.cc +++ b/src/yb/client/ql-stress-test.cc @@ -280,7 +280,8 @@ bool QLStressTest::CheckRetryableRequestsCountsAndLeaders( if (!peer->tablet() || peer->tablet()->metadata()->table_id() != table_.table()->id()) { continue; } - const auto tablet_entries = EXPECT_RESULT(peer->tablet()->TEST_CountRegularDBRecords()); + const auto tablet_entries = EXPECT_RESULT(peer->tablet()->TEST_CountDBRecords( + docdb::StorageDbType::kRegular)); auto raft_consensus = EXPECT_RESULT(peer->GetRaftConsensus()); auto request_counts = raft_consensus->TEST_CountRetryableRequests(); LOG(INFO) << "T " << peer->tablet()->tablet_id() << " P " << peer->permanent_uuid() diff --git a/src/yb/client/yb_op.cc b/src/yb/client/yb_op.cc index 0b8733003237..ae8acdc7dd5f 100644 --- a/src/yb/client/yb_op.cc +++ b/src/yb/client/yb_op.cc @@ -1036,6 +1036,54 @@ Status YBPgsqlReadOp::GetPartitionKey(std::string* partition_key) const { return Status::OK(); } +//////////////////////////////////////////////////////////// +// YBPgsqlLockOp +//////////////////////////////////////////////////////////// + +YBPgsqlLockOp::YBPgsqlLockOp(const std::shared_ptr& table, rpc::Sidecars* sidecars) : + YBPgsqlOp(table, sidecars) { + request_holder_ = std::make_unique(); + request_ = request_holder_.get(); +} + +YBPgsqlLockOpPtr YBPgsqlLockOp::NewLock( + const std::shared_ptr& table, rpc::Sidecars* sidecars) { + auto op = std::make_shared(table, sidecars); + auto* req = op->mutable_request(); + req->set_is_lock(true); + req->set_client(YQL_CLIENT_PGSQL); + return op; +} + +bool YBPgsqlLockOp::succeeded() const { + return response().status() == PgsqlResponsePB::PGSQL_STATUS_OK; +} + +bool YBPgsqlLockOp::applied() { + return succeeded() && !response_->skipped(); +} + +OpGroup YBPgsqlLockOp::group() { + // TODO(advisory-lock #25195): We should use a different group for locks and unlocks. + return OpGroup::kLock; +} + +std::string YBPgsqlLockOp::ToString() const { + return Format("ADVISORY LOCK $0", request_->ShortDebugString()); +} + +void YBPgsqlLockOp::SetHashCode(uint16_t hash_code) { + request_->set_hash_code(hash_code); +} + +Status YBPgsqlLockOp::GetPartitionKey(std::string* partition_key) const { + std::string key; + RETURN_NOT_OK(table_->partition_schema().EncodeKey( + request_->lock_id().lock_partition_column_values(), &key)); + *partition_key = std::move(key); + return Status::OK(); +} + //////////////////////////////////////////////////////////// // YBNoOp //////////////////////////////////////////////////////////// diff --git a/src/yb/client/yb_op.h b/src/yb/client/yb_op.h index cbccf63361fe..188f620dff77 100644 --- a/src/yb/client/yb_op.h +++ b/src/yb/client/yb_op.h @@ -64,7 +64,7 @@ class YBSession; class YBStatusCallback; class YBTable; -YB_DEFINE_ENUM(OpGroup, (kWrite)(kLeaderRead)(kConsistentPrefixRead)); +YB_DEFINE_ENUM(OpGroup, (kWrite)(kLock)(kLeaderRead)(kConsistentPrefixRead)); // A write or read operation operates on a single table and partial row. // The YBOperation class itself allows the batcher to get to the @@ -89,6 +89,7 @@ class YBOperation { // Postgresql opcodes. PGSQL_WRITE = 8, PGSQL_READ = 9, + PGSQL_LOCK = 10, }; virtual ~YBOperation(); @@ -576,6 +577,31 @@ class YBPgsqlReadOp : public YBPgsqlOp { TabletId used_tablet_; }; +class YBPgsqlLockOp : public YBPgsqlOp { + public: + explicit YBPgsqlLockOp(const std::shared_ptr& table, rpc::Sidecars* sidecars); + + bool read_only() const override { return false; }; + bool returns_sidecar() override { return false; } + std::string ToString() const override; + bool succeeded() const override; + bool applied() override; + + void SetHashCode(uint16_t hash_code) override; + Status GetPartitionKey(std::string* partition_key) const override; + + PgsqlLockRequestPB* mutable_request() { return request_; } + + static YBPgsqlLockOpPtr NewLock(const std::shared_ptr& table, rpc::Sidecars* sidecars); + + private: + virtual Type type() const override { return PGSQL_LOCK; } + OpGroup group() override; + + PgsqlLockRequestPB* request_; + std::unique_ptr request_holder_; +}; + // This class is not thread-safe, though different YBNoOp objects on // different threads may share a single YBTable object. class YBNoOp { diff --git a/src/yb/common/pgsql_protocol.proto b/src/yb/common/pgsql_protocol.proto index 8210440f38d5..f278f9fd855b 100644 --- a/src/yb/common/pgsql_protocol.proto +++ b/src/yb/common/pgsql_protocol.proto @@ -677,3 +677,26 @@ message PgsqlResponsePB { // Metrics changes as a result of processing the request. optional PgsqlRequestMetricsPB metrics = 17; } + +message PgsqlAdvisoryLockPB { + repeated QLExpressionPB lock_partition_column_values = 1; + repeated QLExpressionPB lock_range_column_values = 2; +} + +message PgsqlLockRequestPB { + enum PgsqlAdvisoryLockMode { + PG_LOCK_SHARE = 1; + PG_LOCK_EXCLUSIVE = 2; + } + // Client info + optional QLClient client = 1; // required + + optional PgsqlAdvisoryLockPB lock_id = 2; // All locks if not specified + + optional PgsqlAdvisoryLockMode lock_mode = 3; + optional bool wait = 4; + optional bool is_lock = 5; // Lock or Unlock + + optional bytes partition_key = 6; + optional uint32 hash_code = 7; +} diff --git a/src/yb/docdb/CMakeLists.txt b/src/yb/docdb/CMakeLists.txt index c8ba06160f9c..0b6164bbbf06 100644 --- a/src/yb/docdb/CMakeLists.txt +++ b/src/yb/docdb/CMakeLists.txt @@ -23,7 +23,7 @@ YRPC_GENERATE( ADD_YB_LIBRARY(docdb_proto SRCS ${DOCDB_PROTO_SRCS} - DEPS protobuf yb_common_proto + DEPS protobuf yb_common_proto dockv_proto NONLINK_DEPS ${DOCDB_PROTO_TGTS}) include_directories(${YB_BUILD_ROOT}/postgres/include/server) @@ -144,6 +144,7 @@ ADD_YB_TEST(shared_lock_manager-test) ADD_YB_TEST(consensus_frontier-test) ADD_YB_TEST(compaction_file_filter-test) ADD_YB_TEST(usearch_vector_index-test) +ADD_YB_TEST(advisory_lock_doc_operation-test) if(YB_BUILD_FUZZ_TARGETS) # A library with common code shared between DocDB fuzz tests. diff --git a/src/yb/docdb/advisory_lock_doc_operation-test.cc b/src/yb/docdb/advisory_lock_doc_operation-test.cc new file mode 100644 index 000000000000..0ae231afb386 --- /dev/null +++ b/src/yb/docdb/advisory_lock_doc_operation-test.cc @@ -0,0 +1,108 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/common/transaction-test-util.h" +#include "yb/docdb/docdb_test_base.h" +#include "yb/docdb/pgsql_operation.h" + +namespace yb { +namespace docdb { + +const uint32_t kDBOid = 1111; + +class AdvisoryLockDocOperationTest : public DocDBTestBase { + public: + Schema CreateSchema() override { + SchemaBuilder builder; + CHECK_OK(builder.AddHashKeyColumn("dbid", DataType::UINT32)); + CHECK_OK(builder.AddKeyColumn("classid", DataType::UINT32)); + CHECK_OK(builder.AddKeyColumn("objid", DataType::UINT32)); + CHECK_OK(builder.AddKeyColumn("objsubid", DataType::UINT32)); + return builder.Build(); + } + + TransactionId StartTransaction() { + const auto txn_id = TransactionId::GenerateRandom(); + txn_op_context_ = std::make_unique( + txn_id, &txn_status_manager_); + SetCurrentTransactionId(txn_id); + return txn_id; + } + + Status Lock(uint32_t db_oid, uint32_t class_oid, uint32_t objid, uint32_t objsubid, + PgsqlLockRequestPB::PgsqlAdvisoryLockMode mode, bool wait) { + auto schema = CreateSchema(); + PgsqlLockRequestPB request; + PgsqlResponsePB response; + request.set_lock_mode(mode); + auto* lock_id = request.mutable_lock_id(); + lock_id->add_lock_partition_column_values()->mutable_value()->set_uint32_value(db_oid); + lock_id->add_lock_range_column_values()->mutable_value()->set_uint32_value(class_oid); + lock_id->add_lock_range_column_values()->mutable_value()->set_uint32_value(objid); + lock_id->add_lock_range_column_values()->mutable_value()->set_uint32_value(objsubid); + request.set_wait(wait); + request.set_is_lock(true); + + PgsqlLockOperation op( + request, txn_op_context_ ? *txn_op_context_ : kNonTransactionalOperationContext); + RETURN_NOT_OK(op.Init( + &response, std::make_shared(DocReadContext::TEST_Create(schema)))); + auto doc_write_batch = MakeDocWriteBatch(); + HybridTime restart_read_ht; + RETURN_NOT_OK(op.Apply({ + .doc_write_batch = &doc_write_batch, + .read_operation_data = ReadOperationData(), + .restart_read_ht = &restart_read_ht, + .schema_packing_provider = nullptr, + })); + return WriteToRocksDB(doc_write_batch, HybridTime::kMax); + } + + protected: + TransactionStatusManagerMock txn_status_manager_; + std::unique_ptr txn_op_context_; +}; + +TEST_F(AdvisoryLockDocOperationTest, ExclusiveLock) { + auto txn_id = StartTransaction(); + ASSERT_OK(Lock(kDBOid, 2, 3, 4, + PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, true)); + std::unordered_set locks; + DocDBDebugDumpToContainer(&locks); + LOG(INFO) << CollectionToString(locks); + ASSERT_TRUE(locks.contains(Format( + "SubDocKey(DocKey(0x0000, [1111], [2, 3, 4]), []) [kStrongRead, kStrongWrite] " + "HT -> TransactionId($0) WriteId(0) l", + txn_id.ToString()))) << CollectionToString(locks); +} + +TEST_F(AdvisoryLockDocOperationTest, ShareLock) { + auto txn_id = StartTransaction(); + ASSERT_OK(Lock(kDBOid, 2, 3, 4, + PgsqlLockRequestPB::PG_LOCK_SHARE, true)); + ASSERT_OK(Lock(kDBOid, 4, 5, 6, + PgsqlLockRequestPB::PG_LOCK_SHARE, true)); + std::unordered_set locks; + DocDBDebugDumpToContainer(&locks); + ASSERT_TRUE(locks.contains(Format( + "SubDocKey(DocKey(0x0000, [1111], [2, 3, 4]), []) [kStrongRead] " + "HT -> TransactionId($0) WriteId(0) l", + txn_id.ToString()))) << CollectionToString(locks); + ASSERT_TRUE(locks.contains(Format( + "SubDocKey(DocKey(0x0000, [1111], [4, 5, 6]), []) [kStrongRead] " + "HT -> TransactionId($0) WriteId(1) l", + txn_id.ToString()))) << CollectionToString(locks); +} + +} // namespace docdb +} // namespace yb diff --git a/src/yb/docdb/conflict_resolution.cc b/src/yb/docdb/conflict_resolution.cc index d134c479034b..f3b0b58caf97 100644 --- a/src/yb/docdb/conflict_resolution.cc +++ b/src/yb/docdb/conflict_resolution.cc @@ -847,9 +847,11 @@ class DocPathProcessor { Result GetWriteRequestIntents( const DocOperations& doc_ops, KeyBytes* buffer, PartialRangeKeyIntents partial, IsolationLevel isolation_level) { + bool is_lock_batch = !doc_ops.empty() && + doc_ops[0]->OpType() == DocOperationType::PGSQL_LOCK_OPERATION; static const dockv::IntentTypeSet kStrongReadIntentTypeSet{dockv::IntentType::kStrongRead}; dockv::IntentTypeSet intent_types; - if (isolation_level != IsolationLevel::NON_TRANSACTIONAL) { + if (isolation_level != IsolationLevel::NON_TRANSACTIONAL && !is_lock_batch) { intent_types = dockv::GetIntentTypesForWrite(isolation_level); } @@ -862,7 +864,7 @@ Result GetWriteRequestIntents( RETURN_NOT_OK(doc_op->GetDocPaths(GetDocPathsMode::kIntents, &doc_paths, &op_isolation)); RETURN_NOT_OK(processor( &doc_paths, - intent_types.None() ? dockv::GetIntentTypesForWrite(op_isolation) : intent_types)); + intent_types.None() ? doc_op->GetIntentTypes(op_isolation) : intent_types)); RETURN_NOT_OK( doc_op->GetDocPaths(GetDocPathsMode::kStrongReadIntents, &doc_paths, &op_isolation)); diff --git a/src/yb/docdb/doc_operation.h b/src/yb/docdb/doc_operation.h index 05b012107fd7..7105056f4b9b 100644 --- a/src/yb/docdb/doc_operation.h +++ b/src/yb/docdb/doc_operation.h @@ -21,6 +21,7 @@ #include "yb/docdb/docdb_fwd.h" #include "yb/docdb/read_operation_data.h" +#include "yb/dockv/intent.h" #include "yb/util/monotime.h" #include "yb/util/ref_cnt_buffer.h" @@ -53,7 +54,8 @@ using DocPathsToLock = boost::container::small_vector_base; YB_DEFINE_ENUM(GetDocPathsMode, (kLock)(kIntents)(kStrongReadIntents)); YB_DEFINE_ENUM(DocOperationType, - (PGSQL_WRITE_OPERATION)(QL_WRITE_OPERATION)(REDIS_WRITE_OPERATION)); + (PGSQL_WRITE_OPERATION)(QL_WRITE_OPERATION)(REDIS_WRITE_OPERATION) + (PGSQL_LOCK_OPERATION)); YB_STRONGLY_TYPED_BOOL(SingleOperation); class DocOperation { @@ -82,6 +84,10 @@ class DocOperation { virtual void ClearResponse() = 0; virtual std::string ToString() const = 0; + + virtual dockv::IntentTypeSet GetIntentTypes(IsolationLevel isolation_level) const { + return dockv::GetIntentTypesForWrite(isolation_level); + } }; template diff --git a/src/yb/docdb/doc_write_batch.cc b/src/yb/docdb/doc_write_batch.cc index 29bc12dfa0d0..e4fc4e81c225 100644 --- a/src/yb/docdb/doc_write_batch.cc +++ b/src/yb/docdb/doc_write_batch.cc @@ -872,22 +872,21 @@ void DocWriteBatch::Clear() { } // TODO(lw_uc) allocate entries on the same arena, then just reference them. -void DocWriteBatch::MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) { +void DocWriteBatch::MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const { for (auto& entry : put_batch_) { auto* kv_pair = kv_pb->add_write_pairs(); kv_pair->dup_key(entry.key); kv_pair->dup_value(entry.value); } - if (has_ttl()) { - kv_pb->set_ttl(ttl_ns()); - } -} - -void DocWriteBatch::TEST_CopyToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const { - for (auto& entry : put_batch_) { - auto* kv_pair = kv_pb->add_write_pairs(); - kv_pair->dup_key(entry.key); - kv_pair->dup_value(entry.value); + for (const auto& entry : lock_batch_) { + auto* lock_pair = kv_pb->add_lock_pairs(); + lock_pair->mutable_lock()->dup_key(entry.lock.key); + lock_pair->mutable_lock()->dup_value(entry.lock.value); + lock_pair->set_mode( + entry.mode == PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE + ? dockv::DocdbLockMode::DOCDB_LOCK_EXCLUSIVE + : dockv::DocdbLockMode::DOCDB_LOCK_SHARE); + lock_pair->set_is_lock(true); } if (has_ttl()) { kv_pb->set_ttl(ttl_ns()); diff --git a/src/yb/docdb/doc_write_batch.h b/src/yb/docdb/doc_write_batch.h index 439050cf9641..a69f50fc690d 100644 --- a/src/yb/docdb/doc_write_batch.h +++ b/src/yb/docdb/doc_write_batch.h @@ -17,6 +17,7 @@ #include "yb/common/constants.h" #include "yb/common/hybrid_time.h" +#include "yb/common/pgsql_protocol.pb.h" #include "yb/common/read_hybrid_time.h" #include "yb/docdb/doc_write_batch_cache.h" @@ -151,6 +152,11 @@ struct DocWriteBatchEntry { std::string value; }; +struct DocLockBatchEntry { + DocWriteBatchEntry lock; + PgsqlLockRequestPB::PgsqlAdvisoryLockMode mode; +}; + // The DocWriteBatch class is used to build a RocksDB write batch for a DocDB batch of operations // that may include a mix of write (set) or delete operations. It may read from RocksDB while // writing, and builds up an internal rocksdb::WriteBatch while handling the operations. @@ -248,19 +254,15 @@ class DocWriteBatch { UserTimeMicros user_timestamp = dockv::ValueControlFields::kInvalidTimestamp); void Clear(); - bool IsEmpty() const { return put_batch_.empty(); } + bool IsEmpty() const { return put_batch_.empty() && lock_batch_.empty(); } - size_t size() const { return put_batch_.size(); } + size_t size() const { return put_batch_.size() + lock_batch_.size(); } const std::vector& key_value_pairs() const { return put_batch_; } - void MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb); - - // This method has worse performance comparing to MoveToWriteBatchPB and intented to be used in - // testing. Consider using MoveToWriteBatchPB in production code. - void TEST_CopyToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const; + void MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const; // This is used in tests when measuring the number of seeks that a given update to this batch // performs. The internal seek count is reset. @@ -280,6 +282,11 @@ class DocWriteBatch { return put_batch_.back(); } + DocLockBatchEntry& AddLock() { + lock_batch_.emplace_back(); + return lock_batch_.back(); + } + void UpdateMaxValueTtl(const MonoDelta& ttl); int64_t ttl_ns() const { @@ -352,6 +359,7 @@ class DocWriteBatch { std::atomic* monotonic_counter_; std::vector put_batch_; + std::vector lock_batch_; // Taken from internal_doc_iterator dockv::KeyBytes key_prefix_; diff --git a/src/yb/docdb/docdb-ttl-test.cc b/src/yb/docdb/docdb-ttl-test.cc index 125231ba0f7b..a878f848d9d9 100644 --- a/src/yb/docdb/docdb-ttl-test.cc +++ b/src/yb/docdb/docdb-ttl-test.cc @@ -1057,32 +1057,32 @@ TEST_P(DocDBTestWrapper, TestUpdateDocWriteBatchTTL) { auto dwb = MakeDocWriteBatch(); ThreadSafeArena arena; LWKeyValueWriteBatchPB kv_pb(&arena); - dwb.TEST_CopyToWriteBatchPB(&kv_pb); + dwb.MoveToWriteBatchPB(&kv_pb); ASSERT_FALSE(kv_pb.has_ttl()); // Write a subdoc with kMaxTtl, which should not show up in the the kv ttl. ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, ValueControlFields::kMaxTtl)); - dwb.TEST_CopyToWriteBatchPB(&kv_pb); + dwb.MoveToWriteBatchPB(&kv_pb); ASSERT_FALSE(kv_pb.has_ttl()); // Write a subdoc with 10s TTL, which should show up in the the kv ttl. ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, 10s)); - dwb.TEST_CopyToWriteBatchPB(&kv_pb); + dwb.MoveToWriteBatchPB(&kv_pb); ASSERT_EQ(kv_pb.ttl(), 10 * MonoTime::kNanosecondsPerSecond); // Write a subdoc with 5s TTL, which should make the kv ttl unchanged. ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, 5s)); - dwb.TEST_CopyToWriteBatchPB(&kv_pb); + dwb.MoveToWriteBatchPB(&kv_pb); ASSERT_EQ(kv_pb.ttl(), 10 * MonoTime::kNanosecondsPerSecond); // Write a subdoc with 15s TTL, which should show up in the the kv ttl. ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, 15s)); - dwb.TEST_CopyToWriteBatchPB(&kv_pb); + dwb.MoveToWriteBatchPB(&kv_pb); ASSERT_EQ(kv_pb.ttl(), 15 * MonoTime::kNanosecondsPerSecond); // Write a subdoc with kMaxTTL, which should make the kv ttl unchanged. ASSERT_OK(InsertToWriteBatchWithTTL(&dwb, ValueControlFields::kMaxTtl)); - dwb.TEST_CopyToWriteBatchPB(&kv_pb); + dwb.MoveToWriteBatchPB(&kv_pb); ASSERT_EQ(kv_pb.ttl(), 15 * MonoTime::kNanosecondsPerSecond); } diff --git a/src/yb/docdb/docdb.cc b/src/yb/docdb/docdb.cc index 5e1b6e9a0e34..d98ed6d6254d 100644 --- a/src/yb/docdb/docdb.cc +++ b/src/yb/docdb/docdb.cc @@ -132,9 +132,11 @@ Result> DetermineKeysToLock( } const auto require_read_snapshot = doc_op->RequireReadSnapshot(); result.need_read_snapshot |= require_read_snapshot; - auto intent_types = dockv::GetIntentTypesForWrite(level); + auto intent_types = doc_op->GetIntentTypes(level); if (isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION && require_read_snapshot) { + SCHECK_NE(doc_op->OpType(), DocOperationType::PGSQL_LOCK_OPERATION, + IllegalState, "LOCK operations shouldn't require read snapshot"); intent_types = dockv::IntentTypeSet( {dockv::IntentType::kStrongRead, dockv::IntentType::kStrongWrite}); } diff --git a/src/yb/docdb/docdb.proto b/src/yb/docdb/docdb.proto index 14576dd9088b..92ebd3634e98 100644 --- a/src/yb/docdb/docdb.proto +++ b/src/yb/docdb/docdb.proto @@ -17,6 +17,7 @@ package yb.docdb; import "yb/common/common.proto"; import "yb/common/opid.proto"; import "yb/common/transaction.proto"; +import "yb/dockv/dockv.proto"; option java_package = "org.yb.docdb"; @@ -43,6 +44,12 @@ message TableSchemaVersionPB { optional uint32 schema_version = 2; } +message LockPairPB { + optional KeyValuePairPB lock = 1; + optional dockv.DocdbLockMode mode = 2; + optional bool is_lock = 3; // lock or unlock. +} + // A set of key/value pairs to be written into RocksDB. message KeyValueWriteBatchPB { repeated KeyValuePairPB write_pairs = 1; @@ -73,6 +80,9 @@ message KeyValueWriteBatchPB { // Reserved for deprecated enable_replicate_transaction_status_table field. reserved 12; + + // Advisory locks. + repeated LockPairPB lock_pairs = 13; } message PerDbFilterPB { diff --git a/src/yb/docdb/docdb_util.cc b/src/yb/docdb/docdb_util.cc index ea393fb4a17a..0bb652e2af98 100644 --- a/src/yb/docdb/docdb_util.cc +++ b/src/yb/docdb/docdb_util.cc @@ -171,7 +171,7 @@ Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch( } ThreadSafeArena arena; LWKeyValueWriteBatchPB kv_write_batch(&arena); - dwb.TEST_CopyToWriteBatchPB(&kv_write_batch); + dwb.MoveToWriteBatchPB(&kv_write_batch); TransactionalWriter writer( kv_write_batch, hybrid_time, *current_txn_id_, txn_isolation_level_, partial_range_key_intents, /* replicated_batches_state= */ Slice(), intra_txn_write_id_); diff --git a/src/yb/docdb/pgsql_operation.cc b/src/yb/docdb/pgsql_operation.cc index fd74f977c848..b62b5cefe489 100644 --- a/src/yb/docdb/pgsql_operation.cc +++ b/src/yb/docdb/pgsql_operation.cc @@ -2543,4 +2543,76 @@ Status GetIntents( return Status::OK(); } +PgsqlLockOperation::PgsqlLockOperation( + std::reference_wrapper request, + const TransactionOperationContext& txn_op_context) + : DocOperationBase(request), txn_op_context_(txn_op_context) { +} + +Status PgsqlLockOperation::Init( + PgsqlResponsePB* response, const DocReadContextPtr& doc_read_context) { + response_ = response; + + auto& schema = doc_read_context->schema(); + + dockv::KeyEntryValues hashed_components; + dockv::KeyEntryValues range_components; + RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues( + request_.lock_id().lock_partition_column_values(), schema, 0, + schema.num_hash_key_columns(), + &hashed_components)); + RETURN_NOT_OK(QLKeyColumnValuesToPrimitiveValues( + request_.lock_id().lock_range_column_values(), schema, + schema.num_hash_key_columns(), + schema.num_range_key_columns(), + &range_components)); + SCHECK(!hashed_components.empty(), InvalidArgument, "No hashed column values provided"); + doc_key_ = DocKey( + schema, request_.hash_code(), std::move(hashed_components), std::move(range_components)); + encoded_doc_key_ = doc_key_.EncodeAsRefCntPrefix(); + return Status::OK(); +} + +Status PgsqlLockOperation::GetDocPaths(GetDocPathsMode mode, + DocPathsToLock *paths, IsolationLevel *level) const { + // kStrongReadIntents is used for acquring locks on the entire row. + // It's duplicate with the primary intent for the advisory lock. + if (mode != GetDocPathsMode::kStrongReadIntents) { + paths->emplace_back(encoded_doc_key_); + } + return Status::OK(); +} + +std::string PgsqlLockOperation::ToString() const { + return Format("$0 $1, original request: $2", + request_.is_lock() ? "LOCK" : "UNLOCK", + doc_key_.ToString(), + request_.ShortDebugString()); +} + +void PgsqlLockOperation::ClearResponse() { + if (response_) { + response_->Clear(); + } +} + +Status PgsqlLockOperation::Apply(const DocOperationApplyData& data) { + Slice value(&(dockv::ValueEntryTypeAsChar::kRowLock), 1); + auto& entry = data.doc_write_batch->AddLock(); + entry.lock.key = encoded_doc_key_.as_slice(); + entry.lock.value = value; + entry.mode = request_.lock_mode(); + return Status::OK(); +} + +dockv::IntentTypeSet PgsqlLockOperation::GetIntentTypes(IsolationLevel isolation_level) const { + switch (request_.lock_mode()) { + case PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE: + return dockv::GetIntentTypesForLock(dockv::DocdbLockMode::DOCDB_LOCK_EXCLUSIVE); + case PgsqlLockRequestPB::PG_LOCK_SHARE: + return dockv::GetIntentTypesForLock(dockv::DocdbLockMode::DOCDB_LOCK_SHARE); + } + FATAL_INVALID_ENUM_VALUE(PgsqlLockRequestPB::PgsqlAdvisoryLockMode, request_.lock_mode()); +} + } // namespace yb::docdb diff --git a/src/yb/docdb/pgsql_operation.h b/src/yb/docdb/pgsql_operation.h index 0b384f4f1b70..be9b21e91dc8 100644 --- a/src/yb/docdb/pgsql_operation.h +++ b/src/yb/docdb/pgsql_operation.h @@ -238,4 +238,44 @@ Status GetIntents( const PgsqlReadRequestPB& request, const Schema& schema, IsolationLevel level, LWKeyValueWriteBatchPB* out); +class PgsqlLockOperation : + public DocOperationBase { + public: + PgsqlLockOperation(std::reference_wrapper request, + const TransactionOperationContext& txn_op_context); + + bool RequireReadSnapshot() const override { + return false; + } + + const PgsqlLockRequestPB& request() const { return request_; } + PgsqlResponsePB* response() const { return response_; } + + // Init doc_key_ and encoded_doc_key_. + Status Init(PgsqlResponsePB* response, const DocReadContextPtr& doc_read_context); + + Status Apply(const DocOperationApplyData& data) override; + + // Reading path to operate on. + Status GetDocPaths(GetDocPathsMode mode, + DocPathsToLock *paths, + IsolationLevel *level) const override; + + std::string ToString() const override; + + dockv::IntentTypeSet GetIntentTypes(IsolationLevel isolation_level) const override; + + private: + void ClearResponse() override; + + const TransactionOperationContext txn_op_context_; + + // Input arguments. + PgsqlResponsePB* response_ = nullptr; + + // The key of the advisory lock to be locked. + dockv::DocKey doc_key_; + RefCntPrefix encoded_doc_key_; +}; + } // namespace yb::docdb diff --git a/src/yb/docdb/rocksdb_writer.cc b/src/yb/docdb/rocksdb_writer.cc index 17107d6ad8f7..90895ddfade3 100644 --- a/src/yb/docdb/rocksdb_writer.cc +++ b/src/yb/docdb/rocksdb_writer.cc @@ -272,6 +272,15 @@ Status TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { partial_range_key_intents_)); } + // Apply advisory locks. + for (auto& lock_pair : put_batch_.lock_pairs()) { + RSTATUS_DCHECK(lock_pair.is_lock(), InvalidArgument, "Unexpected unlock operation"); + KeyBytes key(lock_pair.lock().key()); + RETURN_NOT_OK((*this)(dockv::GetIntentTypesForLock(lock_pair.mode()), + dockv::AncestorDocKey::kFalse, dockv::FullDocKey::kFalse, + lock_pair.lock().value(), &key, dockv::LastKey::kTrue)); + } + if (!put_batch_.read_pairs().empty()) { RETURN_NOT_OK(EnumerateIntents( put_batch_.read_pairs(), diff --git a/src/yb/dockv/dockv.proto b/src/yb/dockv/dockv.proto index bbe249d59fe3..da1dd52e01d3 100644 --- a/src/yb/dockv/dockv.proto +++ b/src/yb/dockv/dockv.proto @@ -36,3 +36,8 @@ message SchemaPackingPB { // Columns that present in schema but don't participate in packing. repeated uint32 skipped_column_ids = 3; } + +enum DocdbLockMode { + DOCDB_LOCK_EXCLUSIVE = 1; + DOCDB_LOCK_SHARE = 2; +} diff --git a/src/yb/dockv/intent.cc b/src/yb/dockv/intent.cc index dc619e4d172e..039a1f2decf6 100644 --- a/src/yb/dockv/intent.cc +++ b/src/yb/dockv/intent.cc @@ -176,6 +176,16 @@ IntentTypeSet GetIntentTypesForWrite(IsolationLevel level) { return GetIntentTypes(level, /*is_for_write=*/true); } +IntentTypeSet GetIntentTypesForLock(DocdbLockMode mode) { + switch (mode) { + case DocdbLockMode::DOCDB_LOCK_SHARE: + return IntentTypeSet({IntentType::kStrongRead}); + case DocdbLockMode::DOCDB_LOCK_EXCLUSIVE: + return IntentTypeSet({IntentType::kStrongWrite, IntentType::kStrongRead}); + } + FATAL_INVALID_ENUM_VALUE(DocdbLockMode, mode); +} + bool HasStrong(IntentTypeSet inp) { static const IntentTypeSet all_strong_intents{IntentType::kStrongRead, IntentType::kStrongWrite}; return (inp & all_strong_intents).Any(); diff --git a/src/yb/dockv/intent.h b/src/yb/dockv/intent.h index a421bdb64903..e2b2e6cd1d44 100644 --- a/src/yb/dockv/intent.h +++ b/src/yb/dockv/intent.h @@ -21,6 +21,7 @@ #include "yb/common/doc_hybrid_time.h" #include "yb/common/transaction.h" +#include "yb/dockv/dockv.pb.h" #include "yb/dockv/dockv_fwd.h" #include "yb/util/result.h" @@ -127,6 +128,8 @@ struct ReadIntentTypeSets { [[nodiscard]] IntentTypeSet GetIntentTypesForWrite(IsolationLevel level); +[[nodiscard]] IntentTypeSet GetIntentTypesForLock(DocdbLockMode mode); + YB_STRONGLY_TYPED_BOOL(IsRowLock); [[nodiscard]] inline IntentTypeSet GetIntentTypes( diff --git a/src/yb/integration-tests/packed_row_test_base.cc b/src/yb/integration-tests/packed_row_test_base.cc index 4f20f8b6d16b..5c89d54726cb 100644 --- a/src/yb/integration-tests/packed_row_test_base.cc +++ b/src/yb/integration-tests/packed_row_test_base.cc @@ -52,7 +52,8 @@ void CheckNumRecords(MiniCluster* cluster, size_t expected_num_records) { if (!peer->tablet()->regular_db()) { continue; } - auto count = ASSERT_RESULT(peer->tablet()->TEST_CountRegularDBRecords()); + auto count = ASSERT_RESULT(peer->tablet()->TEST_CountDBRecords( + docdb::StorageDbType::kRegular)); LOG(INFO) << peer->LogPrefix() << "records: " << count; ASSERT_EQ(count, expected_num_records); } diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index adbcbf43ee7c..a00836e17853 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -1646,7 +1646,7 @@ Status Tablet::ApplyKeyValueRowOperations( docdb::ConsensusFrontiers* frontiers, HybridTime write_hybrid_time, HybridTime batch_hybrid_time, AlreadyAppliedToRegularDB already_applied_to_regular_db) { if (put_batch.write_pairs().empty() && put_batch.read_pairs().empty() && - put_batch.apply_external_transactions().empty()) { + put_batch.lock_pairs().empty() && put_batch.apply_external_transactions().empty()) { return Status::OK(); } @@ -4000,11 +4000,12 @@ void Tablet::TEST_DocDBDumpToLog(IncludeIntents include_intents) { } } -Result Tablet::TEST_CountRegularDBRecords() { - if (!regular_db_) return 0; +Result Tablet::TEST_CountDBRecords(docdb::StorageDbType db_type) { + auto* db = db_type == docdb::StorageDbType::kRegular ? regular_db_.get() : intents_db_.get(); + if (!db) return 0; rocksdb::ReadOptions read_opts; read_opts.query_id = rocksdb::kDefaultQueryId; - docdb::BoundedRocksDbIterator iter(regular_db_.get(), read_opts, &key_bounds_); + docdb::BoundedRocksDbIterator iter(db, read_opts, &key_bounds_); size_t result = 0; for (iter.SeekToFirst(); iter.Valid(); iter.Next()) { diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 73cf86a4e3e8..061b8261e571 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -98,6 +98,7 @@ namespace tablet { YB_STRONGLY_TYPED_BOOL(BlockingRocksDbShutdownStart); YB_STRONGLY_TYPED_BOOL(FlushOnShutdown); YB_STRONGLY_TYPED_BOOL(IncludeIntents); +YB_STRONGLY_TYPED_BOOL(CheckRegularDB) YB_DEFINE_ENUM(Direction, (kForward)(kBackward)); inline FlushFlags operator|(FlushFlags lhs, FlushFlags rhs) { @@ -653,7 +654,7 @@ class Tablet : public AbstractTablet, // Dumps DocDB contents to log, every record as a separate log message, with the given prefix. void TEST_DocDBDumpToLog(IncludeIntents include_intents); - Result TEST_CountRegularDBRecords(); + Result TEST_CountDBRecords(docdb::StorageDbType db_type); Status CreateReadIntents( IsolationLevel level, diff --git a/src/yb/tablet/write_query.cc b/src/yb/tablet/write_query.cc index 80f851cfc987..8ab524ef721a 100644 --- a/src/yb/tablet/write_query.cc +++ b/src/yb/tablet/write_query.cc @@ -181,6 +181,7 @@ enum class WriteQuery::ExecuteMode { kRedis, kCql, kPgsql, + kPgsqlLock, }; WriteQuery::WriteQuery( @@ -372,6 +373,9 @@ void WriteQuery::ExecuteDone(const Status& status) { case ExecuteMode::kPgsql: PgsqlExecuteDone(status); return; + case ExecuteMode::kPgsqlLock: + PgsqlLockExecuteDone(status); + return; } FATAL_INVALID_ENUM_VALUE(ExecuteMode, execute_mode_); } @@ -396,6 +400,10 @@ Result WriteQuery::PrepareExecute() { return PgsqlPrepareExecute(); } + if (!client_request_->pgsql_lock_batch().empty()) { + return PgsqlPrepareLock(); + } + if (client_request_->has_write_batch() && client_request_->has_external_hybrid_time()) { return false; } @@ -550,6 +558,45 @@ Result WriteQuery::PgsqlPrepareExecute() { return true; } +Result WriteQuery::PgsqlPrepareLock() { + auto& write_batch = client_request_->write_batch(); + // Transaction should be specified in the write batch. + // TODO(advisory-lock #24709) - virtual transaction if it's session level. + SCHECK(write_batch.has_transaction(), InvalidArgument, "No transaction found in write batch"); + + auto tablet = VERIFY_RESULT(tablet_safe()); + RETURN_NOT_OK(InitExecute(ExecuteMode::kPgsqlLock)); + + const auto& pgsql_lock_batch = client_request_->pgsql_lock_batch(); + + SCHECK(!pgsql_lock_batch.empty(), InvalidArgument, "lock batch is empty"); + auto& metadata = *tablet->metadata(); + const auto table_info = VERIFY_RESULT(metadata.GetTableInfo("")); + + doc_ops_.reserve(pgsql_lock_batch.size()); + + TransactionOperationContext txn_op_ctx; + VLOG_WITH_FUNC(2) << "transaction=" << write_batch.transaction().DebugString(); + for (const auto& req : pgsql_lock_batch) { + VLOG_WITH_FUNC(4) << req.DebugString(); + // TODO(advisory-lock #25195): Remove this check when supporting release lock. + SCHECK(req.is_lock(), InvalidArgument, "Unexpected unlock operation"); + if (doc_ops_.empty()) { + txn_op_ctx = VERIFY_RESULT(tablet->CreateTransactionOperationContext( + write_batch.transaction(), + /* is_ysql_catalog_table */ false, + &write_batch.subtransaction())); + } + PgsqlResponsePB* resp = response_->add_pgsql_response_batch(); + auto lock_op = std::make_unique( + req, + txn_op_ctx); + RETURN_NOT_OK(lock_op->Init(resp, table_info->doc_read_context)); + doc_ops_.emplace_back(std::move(lock_op)); + } + return true; +} + void WriteQuery::Execute(std::unique_ptr query) { auto* query_ptr = query.get(); query_ptr->self_ = std::move(query); @@ -574,12 +621,14 @@ void WriteQuery::Execute(std::unique_ptr query) { // The conflict management policy (as defined in conflict_resolution.h) to be used is determined // based on the following - -// 1. For explicit row level locking, YSQL sets the "wait_policy" field which maps to a -// corresponding ConflictManagementPolicy as detailed in the WaitPolicy enum in common.proto. +// 1. For explicit row level locking or advisory locks (is_advisory_lock_request == true): +// YSQL sets the "wait_policy" field which maps to a corresponding ConflictManagementPolicy +// as detailed in the WaitPolicy enum in common.proto. // 2. For everything else, either the WAIT_ON_CONFLICT or the FAIL_ON_CONFLICT policy is used // based on whether wait queues are enabled or not. docdb::ConflictManagementPolicy GetConflictManagementPolicy( - const docdb::WaitQueue* wait_queue, const docdb::LWKeyValueWriteBatchPB& write_batch) { + const docdb::WaitQueue* wait_queue, const docdb::LWKeyValueWriteBatchPB& write_batch, + bool is_advisory_lock_request) { // Either write_batch.read_pairs is not empty or doc_ops is non empty. Both can't be non empty // together. This is because read_pairs is filled only in case of a read operation that has a // row mark or is part of a serializable txn. @@ -590,7 +639,7 @@ docdb::ConflictManagementPolicy GetConflictManagementPolicy( auto conflict_management_policy = wait_queue ? docdb::WAIT_ON_CONFLICT : docdb::FAIL_ON_CONFLICT; const auto& pairs = write_batch.read_pairs(); - if (!pairs.empty() && write_batch.has_wait_policy()) { + if ((is_advisory_lock_request || !pairs.empty()) && write_batch.has_wait_policy()) { switch (write_batch.wait_policy()) { case WAIT_BLOCK: if (wait_queue) { @@ -621,6 +670,8 @@ docdb::ConflictManagementPolicy GetConflictManagementPolicy( Status WriteQuery::DoExecute() { auto tablet = VERIFY_RESULT(tablet_safe()); auto& write_batch = *request().mutable_write_batch(); + // Lock request must have non-empty pgsql_lock_batch. + bool is_advisory_lock_request = client_request_ && !client_request_->pgsql_lock_batch().empty(); isolation_level_ = VERIFY_RESULT(tablet->GetIsolationLevelFromPB(write_batch)); const RowMarkType row_mark_type = GetRowMarkTypeFromPB(write_batch); const auto& metadata = *tablet->metadata(); @@ -670,8 +721,11 @@ Status WriteQuery::DoExecute() { auto request_id = request().has_request_id() ? request().request_id() : -1; if (isolation_level_ == IsolationLevel::NON_TRANSACTIONAL) { + SCHECK(!is_advisory_lock_request, InvalidArgument, + "Unexpected advisory lock request with NON_TRANSACTIONAL isolation level"); auto now = tablet->clock()->Now(); - auto conflict_management_policy = GetConflictManagementPolicy(wait_queue, write_batch); + auto conflict_management_policy = GetConflictManagementPolicy( + wait_queue, write_batch, is_advisory_lock_request); { // TODO(#19498): Enable the below check if possible. Right now we can't enable it because the @@ -715,7 +769,8 @@ Status WriteQuery::DoExecute() { } } - auto conflict_management_policy = GetConflictManagementPolicy(wait_queue, write_batch); + auto conflict_management_policy = GetConflictManagementPolicy( + wait_queue, write_batch, is_advisory_lock_request); // TODO(wait-queues): Ensure that wait_queue respects deadline() during conflict resolution. return docdb::ResolveTransactionConflicts( @@ -999,6 +1054,7 @@ Result WriteQuery::ExecuteSchemaVersionCheck() { switch (execute_mode_) { case ExecuteMode::kSimple: FALLTHROUGH_INTENDED; case ExecuteMode::kRedis: + case ExecuteMode::kPgsqlLock: return true; case ExecuteMode::kCql: // For cql, the requests in client_request_->ql_write_batch() could have the field @@ -1315,6 +1371,10 @@ void WriteQuery::SimpleExecuteDone(const Status& status) { StartSynchronization(std::move(self_), status); } +void WriteQuery::PgsqlLockExecuteDone(const Status& status) { + StartSynchronization(std::move(self_), status); +} + void WriteQuery::IncrementActiveWriteQueryObjectsBy(int64_t value) { auto res = tablet_safe(); if (res.ok() && (*res)->metrics()) { diff --git a/src/yb/tablet/write_query.h b/src/yb/tablet/write_query.h index fd12b68e2ef7..dc6706f41622 100644 --- a/src/yb/tablet/write_query.h +++ b/src/yb/tablet/write_query.h @@ -154,11 +154,13 @@ class WriteQuery { Result CqlRePrepareExecuteIfNecessary(); Result CqlPrepareExecute(); Result PgsqlPrepareExecute(); + Result PgsqlPrepareLock(); void SimpleExecuteDone(const Status& status); void RedisExecuteDone(const Status& status); void CqlExecuteDone(const Status& status); void PgsqlExecuteDone(const Status& status); + void PgsqlLockExecuteDone(const Status& status); using IndexOps = std::vector, docdb::QLWriteOperation*>>; diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index 12733d857f8d..b48a9f7e5d61 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -2271,6 +2271,7 @@ Status TabletServiceImpl::PerformWrite( bool has_operations = req->ql_write_batch_size() != 0 || req->redis_write_batch_size() != 0 || req->pgsql_write_batch_size() != 0 || + req->pgsql_lock_batch_size() != 0 || (req->has_external_hybrid_time() && !EmptyWriteBatch(req->write_batch())); if (!has_operations && tablet.tablet->table_type() != TableType::REDIS_TABLE_TYPE) { // An empty request. This is fine, can just exit early with ok status instead of working hard. diff --git a/src/yb/tserver/tserver.proto b/src/yb/tserver/tserver.proto index 6a36ffee6f47..dec5cfb340fa 100644 --- a/src/yb/tserver/tserver.proto +++ b/src/yb/tserver/tserver.proto @@ -72,6 +72,9 @@ message WriteRequestPB { // A batch of Pgsql operations. repeated PgsqlWriteRequestPB pgsql_write_batch = 13; + // A batch of Pgsql locks. + repeated PgsqlLockRequestPB pgsql_lock_batch = 25; + // A hybrid_time obtained by the client from a previous request. // TODO crypto sign this and propagate the signature along with // the hybrid_time. diff --git a/src/yb/tserver/ysql_advisory_lock_table.cc b/src/yb/tserver/ysql_advisory_lock_table.cc index 07b575a62e8d..a7c3a39c78c3 100644 --- a/src/yb/tserver/ysql_advisory_lock_table.cc +++ b/src/yb/tserver/ysql_advisory_lock_table.cc @@ -16,12 +16,25 @@ #include #include "yb/client/yb_table_name.h" #include "yb/client/client.h" +#include "yb/client/yb_op.h" #include "yb/master/master_defaults.h" DECLARE_bool(yb_enable_advisory_lock); namespace yb { +namespace { + +void SetLockId(PgsqlAdvisoryLockPB& lock, uint32_t db_oid, uint32_t class_oid, + uint32_t objid, uint32_t objsubid) { + lock.add_lock_partition_column_values()->mutable_value()->set_uint32_value(db_oid); + lock.add_lock_range_column_values()->mutable_value()->set_uint32_value(class_oid); + lock.add_lock_range_column_values()->mutable_value()->set_uint32_value(objid); + lock.add_lock_range_column_values()->mutable_value()->set_uint32_value(objsubid); +} + +} // namespace + YsqlAdvisoryLocksTable::YsqlAdvisoryLocksTable(client::YBClient& client) : client_(client) {} @@ -39,4 +52,14 @@ Result YsqlAdvisoryLocksTable::GetTable() { return table_; } +Result YsqlAdvisoryLocksTable::CreateLockOp( + uint32_t db_oid, uint32_t class_oid, uint32_t objid, uint32_t objsubid, + PgsqlLockRequestPB::PgsqlAdvisoryLockMode mode, bool wait, rpc::Sidecars* sidecars) { + auto lock = client::YBPgsqlLockOp::NewLock(VERIFY_RESULT(GetTable()), sidecars); + SetLockId(*lock->mutable_request()->mutable_lock_id(), db_oid, class_oid, objid, objsubid); + lock->mutable_request()->set_lock_mode(mode); + lock->mutable_request()->set_wait(wait); + return lock; +} + } // namespace yb diff --git a/src/yb/tserver/ysql_advisory_lock_table.h b/src/yb/tserver/ysql_advisory_lock_table.h index 87e69192174f..455c51f7a5ac 100644 --- a/src/yb/tserver/ysql_advisory_lock_table.h +++ b/src/yb/tserver/ysql_advisory_lock_table.h @@ -16,6 +16,8 @@ #pragma once #include "yb/client/client_fwd.h" +#include "yb/rpc/rpc_fwd.h" +#include "yb/common/pgsql_protocol.pb.h" namespace yb { @@ -27,9 +29,16 @@ class YsqlAdvisoryLocksTable { explicit YsqlAdvisoryLocksTable(client::YBClient& client); ~YsqlAdvisoryLocksTable(); - Result GetTable() EXCLUDES(mutex_); + Result CreateLockOp( + uint32_t db_oid, uint32_t class_oid, uint32_t objid, uint32_t objsubid, + PgsqlLockRequestPB::PgsqlAdvisoryLockMode mode, bool wait, + rpc::Sidecars* sidecars) EXCLUDES(mutex_); private: + friend class AdvisoryLockTest; + + Result GetTable() EXCLUDES(mutex_); + std::mutex mutex_; client::YBTablePtr table_ GUARDED_BY(mutex_);; client::YBClient& client_; diff --git a/src/yb/yql/cql/ql/util/statement_result.cc b/src/yb/yql/cql/ql/util/statement_result.cc index 8b58714b7859..4d4d972f18b6 100644 --- a/src/yb/yql/cql/ql/util/statement_result.cc +++ b/src/yb/yql/cql/ql/util/statement_result.cc @@ -94,7 +94,8 @@ shared_ptr> GetColumnSchemasFromOp(const YBqlOp& op, const case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED; case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED; case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED; - case YBOperation::Type::REDIS_WRITE: + case YBOperation::Type::REDIS_WRITE: FALLTHROUGH_INTENDED; + case YBOperation::Type::PGSQL_LOCK: break; // default: fallthrough } @@ -112,7 +113,8 @@ QLClient GetClientFromOp(const YBqlOp& op) { case YBOperation::Type::PGSQL_READ: FALLTHROUGH_INTENDED; case YBOperation::Type::PGSQL_WRITE: FALLTHROUGH_INTENDED; case YBOperation::Type::REDIS_READ: FALLTHROUGH_INTENDED; - case YBOperation::Type::REDIS_WRITE: + case YBOperation::Type::REDIS_WRITE: FALLTHROUGH_INTENDED; + case YBOperation::Type::PGSQL_LOCK: break; // default: fallthrough }