Skip to content

Commit

Permalink
[#19341] docdb: GetTableKeyRanges API
Browse files Browse the repository at this point in the history
Summary:
For YSQL parallel scans we need a way to break table tablets keyspaces into ranges of approximately the same defined data size, so parallel scan of these ranges takes about the same time.

This revision implements the following changes to support this:
- Tablet::GetTabletKeyRanges function that breaks tablet keyspace into ranges
- TabletServerService::GetTabletKeyRanges API that uses Tablet::GetTabletKeyRanges
- PGGate YBCGetTableKeyRanges function that break table keyspace into ranges
- Intermediate layers from PGGate YBCGetTableKeyRanges to Tablet::GetTabletKeyRanges including PgClientSession::GetTableKeyRanges that combines subsequent tablets' responses in case we cross tablet boundaries.

This revision scope is limited to support for range-based sharded tables and forward scan.

Added QLTabletRf1TestToggleEnablePackedRow.GetTabletKeyRanges tablet-level unit test for Tablet::GetTabletKeyRanges functionality and PggateTestSelect.GetTableKeyRanges, PggateTestSelect.GetColocatedTableKeyRanges for pggate-level functionality.
Jira: DB-8143

Test Plan: QLTabletRf1TestToggleEnablePackedRow.GetTabletKeyRanges, PggateTestSelect.GetTableKeyRanges, PggateTestSelect.GetColocatedTableKeyRanges

Reviewers: amartsinchyk, sergei

Reviewed By: amartsinchyk, sergei

Subscribers: yql, ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D26978
  • Loading branch information
ttyusupov committed Sep 29, 2023
1 parent b78a268 commit b3aeb64
Show file tree
Hide file tree
Showing 62 changed files with 1,685 additions and 126 deletions.
177 changes: 176 additions & 1 deletion src/yb/client/ql-tablet-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "yb/docdb/consensus_frontier.h"
#include "yb/dockv/doc_key.h"
#include "yb/docdb/docdb_rocksdb_util.h"
#include "yb/docdb/docdb_test_util.h"

#include "yb/gutil/casts.h"
Expand Down Expand Up @@ -85,9 +86,10 @@ using std::string;
using namespace std::literals; // NOLINT

DECLARE_string(compression_type);
DECLARE_int64(db_block_size_bytes);
DECLARE_int64(db_write_buffer_size);
DECLARE_uint64(initial_seqno);
DECLARE_int32(leader_lease_duration_ms);
DECLARE_int64(db_write_buffer_size);
DECLARE_string(time_source);
DECLARE_int32(retryable_request_timeout_secs);
DECLARE_bool(enable_lease_revocation);
Expand Down Expand Up @@ -1920,5 +1922,178 @@ TEST_F_EX(QLTabletTest, TruncateTableDuringLongRead, QLTabletRf1Test) {
});
}

namespace {

Status CalcKeysDistributionAcrossWorkers(
tablet::Tablet* tablet, std::vector<std::string> range_end_keys, const size_t num_workers,
const double min_max_keys_ratio_limit) {
std::vector<size_t> keys_per_worker(num_workers);

auto iter = CreateRocksDBIterator(
tablet->doc_db().regular, tablet->doc_db().key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none, rocksdb::kDefaultQueryId);
iter.SeekToFirst();
std::string current_range_start_key;

size_t current_range_idx = 0;
size_t entries_in_current_range = 0;
while (iter.Valid() || current_range_idx < range_end_keys.size()) {
while (current_range_idx < range_end_keys.size() &&
(!iter.Valid() || (!range_end_keys[current_range_idx].empty() &&
iter.key() >= range_end_keys[current_range_idx]))) {
LOG(INFO) << "Range #" << current_range_idx << "["
<< Slice(current_range_start_key).ToDebugHexString() << ", "
<< Slice(range_end_keys[current_range_idx]).ToDebugHexString() << ") has "
<< entries_in_current_range << " rocksdb records";
keys_per_worker[current_range_idx % num_workers] += entries_in_current_range;
current_range_start_key = range_end_keys[current_range_idx];
++current_range_idx;
entries_in_current_range = 0;
}
if (iter.Valid()) {
++entries_in_current_range;
iter.Next();
}
}
RETURN_NOT_OK(iter.status());

LOG(INFO) << "Keys per worker: " << AsString(keys_per_worker);

const auto [min_keys_per_worker, max_keys_per_worker] =
std::minmax_element(keys_per_worker.begin(), keys_per_worker.end());
const double ratio = 1.0 * *min_keys_per_worker / *max_keys_per_worker;

SCHECK_GE(
ratio, min_max_keys_ratio_limit, InternalError,
Format(
"Expected min/max keys per worker ratio to be not less than $0 but got $1, "
"min_keys_per_worker: $2, max_keys_per_worker: $3",
min_max_keys_ratio_limit, ratio, *min_keys_per_worker, *max_keys_per_worker));
return Status::OK();
}

} // namespace

TEST_P(QLTabletRf1TestToggleEnablePackedRow, GetTabletKeyRanges) {
constexpr auto kNumSstFiles = 4;
constexpr auto kNumFlushes = 15;
constexpr auto kNumWorkers = 5;
constexpr auto kMinMaxKeysRatioLimit = 0.75;

FLAGS_db_block_size_bytes = 4_KB;
FLAGS_db_write_buffer_size = 200_KB;

TestWorkload workload(cluster_.get());
workload.set_table_name(kTable1Name);
workload.set_write_timeout_millis(30000);
workload.set_num_tablets(1);
workload.set_num_write_threads(2);
workload.set_write_batch_size(1);
workload.set_payload_bytes(16);
workload.Setup();

LOG(INFO) << "Starting workload ...";
Stopwatch s(Stopwatch::ALL_THREADS);
s.start();
workload.Start();

const auto peers = ListTabletPeers(cluster_.get(), ListPeersFilter::kLeaders);
ASSERT_EQ(peers.size(), 1);
const auto tablet = ASSERT_NOTNULL(peers[0]->shared_tablet());
auto* db = tablet->regular_db();

while (std_util::cmp_less(
db->GetCurrentVersionSstFilesUncompressedSize(),
kNumFlushes * FLAGS_db_write_buffer_size) ||
db->GetCurrentVersionNumSSTFiles() < kNumSstFiles) {
std::this_thread::sleep_for(100ms);
}

workload.StopAndJoin();
s.stop();
LOG(INFO) << "Workload stopped, it took: " << AsString(s.elapsed());

LOG(INFO) << "Rows inserted: " << workload.rows_inserted();
LOG(INFO) << "Number of SST files: " << db->GetCurrentVersionNumSSTFiles();
LOG(INFO) << "SST data size: " << db->GetCurrentVersionSstFilesUncompressedSize();

const auto range_size_bytes = db->GetCurrentVersionSstFilesUncompressedSize() / 50;

for (uint32_t max_key_length : {1024, 16, 8, 4, 2}) {
LOG(INFO) << "max_key_length: " << max_key_length;

std::vector<std::string> range_end_keys;
auto add_range_end_key = [&range_end_keys](Slice key) {
LOG(INFO) << "Got range end key: " << key.ToDebugHexString();
range_end_keys.push_back(key.ToBuffer());
};

ASSERT_OK(tablet->TEST_GetTabletKeyRanges(
Slice(), Slice(), std::numeric_limits<uint64_t>::max(), range_size_bytes,
tablet::IsForward::kTrue, max_key_length, add_range_end_key));

LOG(INFO) << "Ranges count: " << range_end_keys.size();

// We should have at least 1 range.
ASSERT_GE(range_end_keys.size(), 1);

ASSERT_EQ(range_end_keys.back(), "");
for (size_t i = 0; i + 2 < range_end_keys.size(); ++i) {
ASSERT_LT(range_end_keys[i], range_end_keys[i + 1]);
}

// TODO(get_table_key_ranges): For now we are returning full DocKeys and skipping ones that
// are longer than max_key_length, because truncated DocKeys are not supported as lower/upper
// bounds for scans. So, if DocKeys are longer than max_key_length we can have very uneven
// distribution across ranges/workers.
const auto min_max_keys_ratio_limit = max_key_length > 8 ? kMinMaxKeysRatioLimit : 0;

ASSERT_OK(CalcKeysDistributionAcrossWorkers(
tablet.get(), range_end_keys, kNumWorkers, min_max_keys_ratio_limit));

// Get ranges in multiple batches, verify it covers the whole space.
constexpr auto kNumRangesPerBatch = 5;
std::string lower_bound = "";

std::vector<string> range_end_keys_from_batches;
for (;;) {
std::vector<string> range_end_keys_batch;
LOG(INFO) << "Getting tablet key ranges starting from: "
<< Slice(lower_bound).ToDebugHexString();

ASSERT_OK(tablet->TEST_GetTabletKeyRanges(
lower_bound, Slice(), kNumRangesPerBatch, range_size_bytes, tablet::IsForward::kTrue,
max_key_length, [&range_end_keys_batch](Slice key) {
LOG(INFO) << "Got range end key: " << key.ToDebugHexString();
range_end_keys_batch.push_back(key.ToBuffer());
}));

ASSERT_LE(range_end_keys_batch.size(), kNumRangesPerBatch);

// We should have at least 1 range.
ASSERT_GE(range_end_keys_batch.size(), 1);

for (const auto& key : range_end_keys_batch) {
range_end_keys_from_batches.push_back(key);
}

if (range_end_keys_batch.back().empty()) {
// We've reached the end.
break;
}

ASSERT_EQ(range_end_keys_batch.size(), kNumRangesPerBatch);

// Use last returned key as lower bound for next batch of ranges.
lower_bound = range_end_keys_batch.back();
}

ASSERT_OK(CalcKeysDistributionAcrossWorkers(
tablet.get(), range_end_keys_from_batches, kNumWorkers, min_max_keys_ratio_limit));
}
// TODO(get_table_key_ranges): test getting ranges in reverse order.
}


} // namespace client
} // namespace yb
14 changes: 14 additions & 0 deletions src/yb/common/pgsql_protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,16 @@ enum PgsqlMetricsCaptureType {
PGSQL_METRICS_CAPTURE_ALL = 1;
}

// TODO(get_table_key_ranges): deprecate after/if separate GetTabletKeyRanges RPC to the tablet
// leader is used.
message GetTabletKeyRangesEmbeddedRequestPB {
optional bytes lower_bound_key = 1;
optional bytes upper_bound_key = 2;
optional uint64 range_size_bytes = 3;
optional bool is_forward = 4 [ default = true ];
optional uint32 max_key_length = 5;
}

// TODO(neil) The protocol for select needs to be changed accordingly when we introduce and cache
// execution plan in tablet server.
message PgsqlReadRequestPB {
Expand Down Expand Up @@ -512,6 +522,10 @@ message PgsqlReadRequestPB {

// What metric changes to return in response.
optional PgsqlMetricsCaptureType metrics_capture = 38;

// TODO(get_table_key_ranges): deprecate this field after separate separate GetTabletKeyRanges
// RPC to the tablet leader is used.
optional GetTabletKeyRangesEmbeddedRequestPB get_tablet_key_ranges_request = 41;
}

//--------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/yb/docdb/doc_read_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void DocReadContext::UpdateKeyPrefix() {
BigEndian::Store32(out, schema_.colocation_id());
out += sizeof(ColocationId);
}
key_prefix_encoded_len_ = out - shared_key_prefix_buffer_.data();
key_prefix_encoded_len_ = table_key_prefix_len_ = out - shared_key_prefix_buffer_.data();
bool use_inplace_increment_for_upperbound = false;
if (schema_.num_hash_key_columns()) {
*out++ = dockv::KeyEntryTypeAsChar::kUInt16Hash;
Expand Down
7 changes: 7 additions & 0 deletions src/yb/docdb/doc_read_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ struct DocReadContext {
return Slice(upperbound_buffer_.data(), upperbound_len_);
}

Slice table_key_prefix() const {
return Slice(shared_key_prefix_buffer_.data(), table_key_prefix_len_);
}

void TEST_SetDefaultTimeToLive(uint64_t ttl_msec) {
schema_.SetDefaultTimeToLive(ttl_msec);
}
Expand Down Expand Up @@ -135,6 +139,9 @@ struct DocReadContext {
// While key_prefix_encoded_len_ will have 3 bytes for it, i.e. full encoded hash code.
size_t key_prefix_encoded_len_ = 0;

// Includes cotable_id and colocation_id.
size_t table_key_prefix_len_ = 0;

std::string log_prefix_;
};

Expand Down
52 changes: 33 additions & 19 deletions src/yb/dockv/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -457,27 +457,41 @@ uint16_t PartitionSchema::DecodeMultiColumnHashRightBound(Slice partition_key) {
return value - 1;
}

Result<std::string> PartitionSchema::GetEncodedKeyPrefix(
namespace {

Result<std::string> GetEncodedHashPartitionKey(const std::string& partition_key) {
const auto doc_key_hash = PartitionSchema::DecodeMultiColumnHashValue(partition_key);

// Following the standard flow to get the hash part of a key instead of simple `AppendHash` call
// in order to be guarded from any possible future update in the flow.
KeyBytes prefix_bytes;
DocKeyEncoderAfterTableIdStep(&prefix_bytes).Hash(doc_key_hash, KeyEntryValues());
const auto prefix_size =
VERIFY_RESULT(DocKey::EncodedSize(prefix_bytes, DocKeyPart::kUpToHashCode));
if (PREDICT_FALSE((prefix_size == 0))) {
// Sanity check, should not happen for normal state.
return STATUS(IllegalState,
Format("Failed to get encoded size of a hash key, key: $0", prefix_bytes));
}
prefix_bytes.Truncate(prefix_size);
return prefix_bytes.ToStringBuffer();
}

} // namespace

Result<std::string> PartitionSchema::GetEncodedPartitionKey(const std::string& partition_key) {
if (!IsHashPartitioning() || partition_key.empty()) {
return partition_key;
}
return GetEncodedHashPartitionKey(partition_key);
}

Result<std::string> PartitionSchema::GetEncodedPartitionKey(
const std::string& partition_key, const PartitionSchemaPB& partition_schema) {
if (partition_schema.has_hash_schema()) {
const auto doc_key_hash = PartitionSchema::DecodeMultiColumnHashValue(partition_key);

// Following the standard flow to get the hash part of a key instead of simple `AppendHash` call
// in order to be guarded from any possible future update in the flow.
KeyBytes prefix_bytes;
DocKeyEncoderAfterTableIdStep(&prefix_bytes)
.Hash(doc_key_hash, KeyEntryValues());
const auto prefix_size = VERIFY_RESULT(DocKey::EncodedSize(
prefix_bytes, DocKeyPart::kUpToHashCode));
if (PREDICT_FALSE((prefix_size == 0))) {
// Sanity check, should not happen for normal state.
return STATUS(IllegalState,
Format("Failed to get encoded size of a hash key, key: $0", prefix_bytes));
}
prefix_bytes.Truncate(prefix_size);
return prefix_bytes.ToStringBuffer();
if (!IsHashPartitioning(partition_schema) || partition_key.empty()) {
return partition_key;
}
return partition_key;
return GetEncodedHashPartitionKey(partition_key);
}

Status PartitionSchema::IsValidHashPartitionRange(const string& partition_key_start,
Expand Down
14 changes: 11 additions & 3 deletions src/yb/dockv/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,17 @@ class PartitionSchema {
static void ProcessHashKeyEntry(const LWPgsqlExpressionPB& expr, std::string* out);
static void ProcessHashKeyEntry(const PgsqlExpressionPB& expr, std::string* out);

// Encoded (sub)doc keys that belong to partition with partition_key lower bound
// are starting with this prefix or greater than it.
static Result<std::string> GetEncodedKeyPrefix(
// For range-based sharded tables, encoded partition key is the same as partition_key.
// For hash-based sharded tables, encoded partition key consists of KeyEntryType::kUInt16Hash
// prefix followed by partition_key.
//
// Encoded (sub)doc keys that belong to partition are starting with encoded partition_key_start
// or greater than it and lower than encoded partition_key_end.
//
// If partition_key is empty, encoded partition key is also empty and that means corresponding
// partition boundary is absent (tablet is the first/last in table key range).
Result<std::string> GetEncodedPartitionKey(const std::string& partition_key);
static Result<std::string> GetEncodedPartitionKey(
const std::string& partition_key, const PartitionSchemaPB& partition_schema);

// Returns inclusive min and max hash code for the partition.
Expand Down
8 changes: 4 additions & 4 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3006,10 +3006,10 @@ Status CatalogManager::DoSplitTablet(
split_partition_key = child_partition.partition_key_start();
}
}
// Re-compute the encoded key
// to ensure we use the same partition boundary for both child tablets
split_encoded_key = VERIFY_RESULT(PartitionSchema::GetEncodedKeyPrefix(
split_partition_key, source_table_lock->pb.partition_schema()));
// Re-compute the encoded key to ensure we use the same partition boundary for both child
// tablets.
split_encoded_key = VERIFY_RESULT(PartitionSchema::GetEncodedPartitionKey(
split_partition_key, source_table_lock->pb.partition_schema()));
}

LOG(INFO) << "Starting tablet split: " << source_tablet_info->ToString()
Expand Down
1 change: 1 addition & 0 deletions src/yb/rocksdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ set(ROCKSDB_SRCS
table/full_filter_block.cc
table/get_context.cc
table/index_builder.cc
table/index_iterator.cc
table/index_reader.cc
table/iterator.cc
table/merger.cc
Expand Down
11 changes: 11 additions & 0 deletions src/yb/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class WriteBatch;
class Env;
class EventListener;

YB_STRONGLY_TYPED_BOOL(SkipLastEntry);

extern const char kDefaultColumnFamilyName[];

Expand Down Expand Up @@ -313,6 +314,16 @@ class DB {
virtual Iterator* NewIterator(const ReadOptions& options) {
return NewIterator(options, DefaultColumnFamily());
}

virtual std::unique_ptr<Iterator> NewIndexIterator(
const ReadOptions& options, SkipLastEntry skip_last_index_entry,
ColumnFamilyHandle* column_family) = 0;

std::unique_ptr<Iterator> NewIndexIterator(
const ReadOptions& options, SkipLastEntry skip_last_index_entry) {
return NewIndexIterator(options, skip_last_index_entry, DefaultColumnFamily());
}

// Returns iterators from a consistent database state across multiple
// column families. Iterators are heap allocated and need to be deleted
// before the db is deleted
Expand Down
Loading

0 comments on commit b3aeb64

Please sign in to comment.