Skip to content

Commit

Permalink
[#3520] DocDB: Packed row configuration flags
Browse files Browse the repository at this point in the history
Summary:
This diff adds the following flags to configure packed row behaviour.
And also adds logic to handle size based packed row limit.
* ysql_enable_packed_row - Whether packed row is enabled for YSQL.
* ysql_packed_row_size_limit - Packed row size limit for YSQL. 0 to pick this value from block size.
* ycql_enable_packed_row - Whether packed row is enabled for YCQL.
* ycql_packed_row_size_limit - Packed row size limit for YCQL. 0 to pick this value from block size.

When packed row is over limit, then remaining columns will be stored as separate key values.

Test Plan: PgPackedRowTest.BigValue

Reviewers: mbautin

Reviewed By: mbautin

Subscribers: kannan, rthallam, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D17409
  • Loading branch information
spolitov committed Jun 8, 2022
1 parent 74618ff commit 5900613
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 95 deletions.
16 changes: 11 additions & 5 deletions src/yb/docdb/cql_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ DEFINE_bool(ycql_disable_index_updating_optimization, false,
"the index data.");
TAG_FLAG(ycql_disable_index_updating_optimization, advanced);

DECLARE_int32(max_packed_row_columns);
DEFINE_bool(ycql_enable_packed_row, false, "Whether packed row is enabled for YCQL.");

DEFINE_uint64(
ycql_packed_row_size_limit, 0,
"Packed row size limit for YCQL in bytes. 0 to make this equal to SSTable block size.");

namespace yb {
namespace docdb {
Expand Down Expand Up @@ -740,8 +744,9 @@ Status QLWriteOperation::InsertScalar(
bfql::TSOpcode op_code,
RowPacker* row_packer) {
ValueRef value_ref(value, column_schema.sorting_type(), op_code);
if (row_packer && value_ref.IsTombstoneOrPrimitive()) {
return row_packer->AddValue(column_id, value);
if (row_packer && value_ref.IsTombstoneOrPrimitive() &&
VERIFY_RESULT(row_packer->AddValue(column_id, value))) {
return Status::OK();
}

return data.doc_write_batch->InsertSubDocument(
Expand Down Expand Up @@ -965,10 +970,11 @@ Status QLWriteOperation::ApplyUpsert(
IntraTxnWriteId packed_row_write_id = 0;

if (is_insert && encoded_pk_doc_key_) {
if (request_.column_values().size() <= FLAGS_max_packed_row_columns) {
if (FLAGS_ycql_enable_packed_row) {
const SchemaPacking& schema_packing = VERIFY_RESULT(
doc_read_context_->schema_packing_storage.GetPacking(request_.schema_version()));
row_packer.emplace(request_.schema_version(), schema_packing);
row_packer.emplace(
request_.schema_version(), schema_packing, FLAGS_ycql_packed_row_size_limit);
packed_row_write_id = data.doc_write_batch->ReserveWriteId();
} else {
const DocPath sub_path(encoded_pk_doc_key_.as_slice(), KeyEntryValue::kLivenessColumn);
Expand Down
57 changes: 49 additions & 8 deletions src/yb/docdb/docdb_compaction_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ using std::unique_ptr;
using std::unordered_set;
using rocksdb::VectorToString;

DECLARE_int32(max_packed_row_columns);
DECLARE_bool(ycql_enable_packed_row);
DECLARE_bool(ysql_enable_packed_row);

DECLARE_uint64(ycql_packed_row_size_limit);
DECLARE_uint64(ysql_packed_row_size_limit);

namespace yb {
namespace docdb {
Expand Down Expand Up @@ -182,10 +186,19 @@ class PackedRowData {
// Column was deleted.
return true;
}

size_t tail_size = 0; // As usual, when not specified size is in bytes.
if (!old_value_slice_.empty()) {
auto old_value = old_packing_.schema_packing->GetValue(column_id, old_value_slice_);
if (old_value) {
tail_size = old_value_slice_.end() - old_value->end();
}
}

// TODO(packed_row) update control fields
VLOG(4) << "Update value: " << column_id << ", " << value.ToDebugHexString();
RETURN_NOT_OK(packer_->AddValue(column_id, value));
return true;
VLOG(4) << "Update value: " << column_id << ", " << value.ToDebugHexString() << ", tail size: "
<< tail_size;
return packer_->AddValue(column_id, value, tail_size);
}

Status StartRepacking() {
Expand All @@ -200,7 +213,8 @@ class PackedRowData {
void InitPacker() {
packing_started_ = true;
if (!packer_) {
packer_.emplace(new_packing_.schema_version, *new_packing_.schema_packing);
packer_.emplace(
new_packing_.schema_version, *new_packing_.schema_packing, new_packing_.pack_limit());
} else {
packer_->Restart();
}
Expand Down Expand Up @@ -247,7 +261,9 @@ class PackedRowData {
column_value = Slice();
}
VLOG(4) << "Keep value for column " << column_id << ": " << column_value->ToDebugHexString();
return packer_->AddValue(column_id, *column_value);
auto result = VERIFY_RESULT(packer_->AddValue(column_id, *column_value, /* tail_size= */ 0));
RSTATUS_DCHECK(result, Corruption, "Unable to pack old value for $0", column_id);
return Status::OK();
}

void UsedSchemaVersion(SchemaVersion version) {
Expand Down Expand Up @@ -287,8 +303,7 @@ class PackedRowData {
active_coprefix_ = coprefix;
active_coprefix_dropped_ = false;
new_packing_ = *packing;
can_start_packing_ =
make_signed(new_packing_.schema_packing->columns()) <= FLAGS_max_packed_row_columns;
can_start_packing_ = packing->enabled();
used_schema_versions_it_ = used_schema_versions_.find(new_packing_.cotable_id);
return Status::OK();
}
Expand Down Expand Up @@ -971,6 +986,32 @@ HybridTime MinHybridTime(const std::vector<rocksdb::FileMetaData*>& inputs) {

} // namespace

bool CompactionSchemaInfo::enabled() const {
switch (table_type) {
case TableType::YQL_TABLE_TYPE:
return FLAGS_ycql_enable_packed_row;
case TableType::PGSQL_TABLE_TYPE:
return FLAGS_ysql_enable_packed_row;
case TableType::REDIS_TABLE_TYPE: [[fallthrough]];
case TableType::TRANSACTION_STATUS_TABLE_TYPE:
return false;
}
FATAL_INVALID_ENUM_VALUE(TableType, table_type);
}

size_t CompactionSchemaInfo::pack_limit() const {
switch (table_type) {
case TableType::YQL_TABLE_TYPE:
return FLAGS_ycql_packed_row_size_limit;
case TableType::PGSQL_TABLE_TYPE:
return FLAGS_ysql_packed_row_size_limit;
case TableType::REDIS_TABLE_TYPE: [[fallthrough]];
case TableType::TRANSACTION_STATUS_TABLE_TYPE:
return false;
}
FATAL_INVALID_ENUM_VALUE(TableType, table_type);
}

// ------------------------------------------------------------------------------------------------

std::shared_ptr<rocksdb::CompactionContextFactory> CreateCompactionContextFactory(
Expand Down
5 changes: 5 additions & 0 deletions src/yb/docdb/docdb_compaction_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <boost/functional/hash.hpp>

#include "yb/common/column_id.h"
#include "yb/common/common_types.pb.h"
#include "yb/common/hybrid_time.h"

#include "yb/docdb/expiration.h"
Expand Down Expand Up @@ -61,10 +62,14 @@ struct HistoryRetentionDirective {
};

struct CompactionSchemaInfo {
TableType table_type;
uint32_t schema_version = std::numeric_limits<uint32_t>::max();
std::shared_ptr<const docdb::SchemaPacking> schema_packing;
Uuid cotable_id;
ColumnIds deleted_cols;

bool enabled() const;
size_t pack_limit() const; // As usual, when not specified size is in bytes.
};

// Used to query latest possible schema version.
Expand Down
2 changes: 1 addition & 1 deletion src/yb/docdb/packed_row-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void TestRowPacking(const Schema& schema, const std::vector<QLValuePB>& values)
ASSERT_EQ(schema.num_columns() - schema.num_key_columns(), values.size());
constexpr int kVersion = 1;
SchemaPacking schema_packing(schema);
RowPacker packer(kVersion, schema_packing);
RowPacker packer(kVersion, schema_packing, std::numeric_limits<uint64_t>::max());
size_t idx = schema.num_key_columns();
for (const auto& value : values) {
auto column_id = schema.column_id(idx);
Expand Down
123 changes: 64 additions & 59 deletions src/yb/docdb/packed_row.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
#include "yb/util/status.h"
#include "yb/util/tostring.h"

DECLARE_int64(db_block_size_bytes);

namespace yb {
namespace docdb {

Expand All @@ -39,14 +41,34 @@ bool IsVarlenColumn(const ColumnSchema& column_schema) {
return column_schema.is_nullable() || column_schema.type_info()->var_length();
}

size_t EncodedValueSize(const ColumnSchema& column_schema) {
size_t EncodedColumnSize(const ColumnSchema& column_schema) {
if (column_schema.type_info()->type == DataType::BOOL) {
// Boolean values are encoded as value type only.
return 1;
}
return 1 + column_schema.type_info()->size;
}

bool IsNull(const Slice& slice) {
return slice.empty();
}

void PackValue(const QLValuePB& value, ValueBuffer* result) {
AppendEncodedValue(value, result);
}

size_t PackedValueSize(const QLValuePB& value) {
return EncodedValueSize(value);
}

void PackValue(const Slice& value, ValueBuffer* result) {
result->Append(value);
}

size_t PackedValueSize(const Slice& value) {
return value.size();
}

} // namespace

ColumnPackingData ColumnPackingData::FromPB(const ColumnPackingPB& pb) {
Expand Down Expand Up @@ -89,15 +111,15 @@ SchemaPacking::SchemaPacking(const Schema& schema) {
.id = schema.column_id(i),
.num_varlen_columns_before = varlen_columns_count_,
.offset_after_prev_varlen_column = offset_after_prev_varlen_column,
.size = varlen ? 0 : EncodedValueSize(column_schema),
.size = varlen ? 0 : EncodedColumnSize(column_schema),
.nullable = column_schema.is_nullable(),
});

if (varlen) {
++varlen_columns_count_;
offset_after_prev_varlen_column = 0;
} else {
offset_after_prev_varlen_column += EncodedValueSize(column_schema);
offset_after_prev_varlen_column += EncodedColumnSize(column_schema);
}
}
}
Expand Down Expand Up @@ -228,8 +250,11 @@ bool SchemaPackingStorage::HasVersionBelow(SchemaVersion version) const {
return false;
}

RowPacker::RowPacker(SchemaVersion version, std::reference_wrapper<const SchemaPacking> packing)
: packing_(packing) {
RowPacker::RowPacker(
SchemaVersion version, std::reference_wrapper<const SchemaPacking> packing,
size_t packed_size_limit)
: packing_(packing),
packed_size_limit_(packed_size_limit ? packed_size_limit : FLAGS_db_block_size_bytes) {
size_t prefix_len = packing_.prefix_len();
result_.Reserve(1 + kMaxVarint32Length + prefix_len);
result_.PushBack(ValueEntryTypeAsChar::kPackedRow);
Expand All @@ -247,84 +272,65 @@ void RowPacker::Restart() {
result_.Truncate(prefix_end_);
}

Status RowPacker::AddValue(ColumnId column_id, const QLValuePB& value) {
return DoAddValue(column_id, value);
Result<bool> RowPacker::AddValue(ColumnId column_id, const QLValuePB& value) {
return DoAddValue(column_id, value, 0);
}

Status RowPacker::AddValue(ColumnId column_id, const Slice& value) {
return DoAddValue(column_id, value);
Result<bool> RowPacker::AddValue(ColumnId column_id, const Slice& value, size_t tail_size) {
return DoAddValue(column_id, value, tail_size);
}

namespace {

bool IsNull(const Slice& slice) {
return slice.empty();
}

void PackValue(const QLValuePB& value, ValueBuffer* result) {
AppendEncodedValue(value, result);
}

void PackValue(const Slice& value, ValueBuffer* result) {
result->Append(value);
}

} // namespace

template <class Value>
Status RowPacker::DoAddValue(ColumnId column_id, const Value& value) {
if (idx_ >= packing_.columns()) {
CHECK(false);
return STATUS_FORMAT(
InvalidArgument, "Add extra column $0, while already have $1 of $2 columns",
column_id, idx_, packing_.columns());
}
Result<bool> RowPacker::DoAddValue(ColumnId column_id, const Value& value, size_t tail_size) {
RSTATUS_DCHECK(
idx_ < packing_.columns(),
InvalidArgument, "Add extra column $0, while already have $1 of $2 columns",
column_id, idx_, packing_.columns());

bool result = true;
for (;;) {
const auto& column_data = packing_.column_packing_data(idx_);
if (column_data.id > column_id) {
return STATUS_FORMAT(InvalidArgument, "Add unexpected column $0, while $1 is expected",
column_id, column_data.id);
}
RSTATUS_DCHECK(
column_data.id <= column_id, InvalidArgument,
"Add unexpected column $0, while $1 is expected", column_id, column_data.id);

++idx_;
size_t prev_size = result_.size();
if (column_data.id < column_id) {
if (!column_data.nullable) {
return STATUS_FORMAT(
InvalidArgument, "Missing value for non nullable column $0, while adding $1",
column_data.id, column_id);
}
RSTATUS_DCHECK(
column_data.nullable, InvalidArgument,
"Missing value for non nullable column $0, while adding $1", column_data.id, column_id);
} else if (!column_data.nullable || !IsNull(value)) {
PackValue(value, &result_);
if (column_data.varlen() &&
prev_size + PackedValueSize(value) + tail_size > packed_size_limit_) {
result = false;
} else {
PackValue(value, &result_);
}
}
if (column_data.varlen()) {
LittleEndian::Store32(result_.mutable_data() + varlen_write_pos_,
narrow_cast<uint32_t>(result_.size() - prefix_end_));
varlen_write_pos_ += sizeof(uint32_t);
} else if (prev_size + column_data.size != result_.size()) {
return STATUS_FORMAT(Corruption, "Wrong encoded size: $0 vs $1",
result_.size() - prev_size, column_data.size);
} else {
RSTATUS_DCHECK(
prev_size + column_data.size == result_.size(), Corruption,
"Wrong encoded size: $0 vs $1", result_.size() - prev_size, column_data.size);
}

if (column_data.id == column_id) {
break;
}
}

return Status::OK();
return result;
}

Result<Slice> RowPacker::Complete() {
if (idx_ != packing_.columns()) {
return STATUS_FORMAT(
InvalidArgument, "Not all columns packed: $0 vs $1",
idx_, packing_.columns());
}
if (varlen_write_pos_ != prefix_end_) {
return STATUS_FORMAT(
InvalidArgument, "Not all varlen columns packed: $0 vs $1",
varlen_write_pos_, prefix_end_);
}
RSTATUS_DCHECK_EQ(
idx_, packing_.columns(), InvalidArgument, "Not all columns packed");
RSTATUS_DCHECK_EQ(
varlen_write_pos_, prefix_end_, InvalidArgument, "Not all varlen columns packed");
return result_.AsSlice();
}

Expand All @@ -333,9 +339,8 @@ ColumnId RowPacker::NextColumnId() const {
}

Result<const ColumnPackingData&> RowPacker::NextColumnData() const {
if (idx_ >= packing_.columns()) {
return STATUS(IllegalState, "All columns already packed");
}
RSTATUS_DCHECK(
idx_ < packing_.columns(), IllegalState, "All columns already packed");
return packing_.column_packing_data(idx_);
}

Expand Down
Loading

0 comments on commit 5900613

Please sign in to comment.