Skip to content

Commit

Permalink
[#3520] DocDB: GC old schema packing
Browse files Browse the repository at this point in the history
Summary:
To read packed row, we need schema packing information.
So we store all schema packings that are still in use in the tablet metadata.

But rows could be repacked during compaction to new schema version.
And old unused schema packings could be removed from tablet metadata after the compaction.

This diff implements necessary logic for such GC.
Schema versions used by an SST file are stored in user frontiers.
Such versions are recalculated after compaction.
When compaction is finished, old schema versions are removed from tablet metadata.

Test Plan: PgPackedRowTest.SchemaGC

Reviewers: timur, mbautin

Reviewed By: timur, mbautin

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D16626
  • Loading branch information
spolitov committed Apr 28, 2022
1 parent cbf6b66 commit fbd3bac
Show file tree
Hide file tree
Showing 51 changed files with 655 additions and 255 deletions.
3 changes: 1 addition & 2 deletions ent/src/yb/master/restore_sys_catalog_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,7 @@ struct PgCatalogTableData {
uint32_t pg_table_oid;

CHECKED_STATUS SetTableId(const TableId& table_id) {
Uuid cotable_id;
RETURN_NOT_OK(cotable_id.FromHexString(table_id));
Uuid cotable_id = VERIFY_RESULT(Uuid::FromHexString(table_id));
prefix[0] = docdb::KeyEntryTypeAsChar::kTableId;
cotable_id.EncodeToComparable(&prefix[1]);
pg_table_oid = VERIFY_RESULT(GetPgsqlTableOid(table_id));
Expand Down
3 changes: 1 addition & 2 deletions ent/src/yb/tserver/twodc_write_implementations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ CHECKED_STATUS CombineExternalIntents(

auto txn_id = VERIFY_RESULT(FullyDecodeTransactionId(transaction_state.transaction_id()));
SCHECK_EQ(transaction_state.tablets().size(), 1, InvalidArgument, "Wrong tablets number");
Uuid status_tablet;
RETURN_NOT_OK(status_tablet.FromHexString(transaction_state.tablets()[0]));
auto status_tablet = VERIFY_RESULT(Uuid::FromHexString(transaction_state.tablets()[0]));

Provider provider(status_tablet, &pairs, out->Add());
docdb::CombineExternalIntents(txn_id, &provider);
Expand Down
8 changes: 2 additions & 6 deletions src/yb/bfql/bfunc_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -984,9 +984,7 @@ CHECKED_STATUS ConvertBlobToUuid(PTypePtr source, RTypePtr target) {
if (blob.size() != kSizeUuid) {
return STATUS(QLError, "The blob string is not valid for UUID type.");
}
Uuid target_val;
RETURN_NOT_OK(target_val.FromBytes(blob));
QLValue::set_uuid_value(target_val, &*target);
QLValue::set_uuid_value(VERIFY_RESULT(Uuid::FromSlice(blob)), &*target);
}
return Status::OK();
}
Expand All @@ -1000,9 +998,7 @@ CHECKED_STATUS ConvertBlobToTimeuuid(PTypePtr source, RTypePtr target) {
if (blob.size() != kSizeUuid) {
return STATUS(QLError, "The blob string is not valid for UUID type.");
}
Uuid target_val;
RETURN_NOT_OK(target_val.FromBytes(blob));
QLValue::set_timeuuid_value(target_val, &*target);
QLValue::set_timeuuid_value(VERIFY_RESULT(Uuid::FromSlice(blob)), &*target);
}
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/common/common_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct TransactionOperationContext;
struct TransactionStatusResult;

using ColocationId = uint32_t;
using SchemaVersion = uint32_t;

using QLTypePtr = std::shared_ptr<QLType>;

Expand Down
3 changes: 3 additions & 0 deletions src/yb/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ enum SortingType : uint8_t {
kDescendingNullsLast // DESC, NULLS LAST
};

static const char* const kObsoleteShortPrimaryTableId = "sys.catalog.uuid";


} // namespace yb

#endif // YB_COMMON_CONSTANTS_H
21 changes: 6 additions & 15 deletions src/yb/common/ql_value.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,16 +374,13 @@ Status QLValue::Deserialize(
case UUID: {
string bytes;
RETURN_NOT_OK(CQLDecodeBytes(len, data, &bytes));
Uuid uuid;
RETURN_NOT_OK(uuid.FromBytes(bytes));
set_uuid_value(uuid);
set_uuid_value(VERIFY_RESULT(Uuid::FromSlice(bytes)));
return Status::OK();
}
case TIMEUUID: {
string bytes;
RETURN_NOT_OK(CQLDecodeBytes(len, data, &bytes));
Uuid uuid;
RETURN_NOT_OK(uuid.FromBytes(bytes));
Uuid uuid = VERIFY_RESULT(Uuid::FromSlice(bytes));
RETURN_NOT_OK(uuid.IsTimeUuid());
set_timeuuid_value(uuid);
return Status::OK();
Expand Down Expand Up @@ -704,29 +701,23 @@ InetAddress QLValue::inetaddress_value(const LWQLValuePB& pb) {
}

Uuid QLValue::timeuuid_value(const QLValuePB& pb) {
Uuid timeuuid;
CHECK_OK(timeuuid.FromBytes(timeuuid_value_pb(pb)));
Uuid timeuuid = CHECK_RESULT(Uuid::FromSlice(timeuuid_value_pb(pb)));
CHECK_OK(timeuuid.IsTimeUuid());
return timeuuid;
}

Uuid QLValue::timeuuid_value(const LWQLValuePB& pb) {
Uuid timeuuid;
CHECK_OK(timeuuid.FromSlice(timeuuid_value_pb(pb)));
Uuid timeuuid = CHECK_RESULT(Uuid::FromSlice(timeuuid_value_pb(pb)));
CHECK_OK(timeuuid.IsTimeUuid());
return timeuuid;
}

Uuid QLValue::uuid_value(const QLValuePB& pb) {
Uuid uuid;
CHECK_OK(uuid.FromBytes(uuid_value_pb(pb)));
return uuid;
return CHECK_RESULT(Uuid::FromSlice(uuid_value_pb(pb)));
}

Uuid QLValue::uuid_value(const LWQLValuePB& pb) {
Uuid uuid;
CHECK_OK(uuid.FromSlice(uuid_value_pb(pb)));
return uuid;
return CHECK_RESULT(Uuid::FromSlice(uuid_value_pb(pb)));
}

util::VarInt QLValue::varint_value(const QLValuePB& pb) {
Expand Down
9 changes: 3 additions & 6 deletions src/yb/common/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "yb/gutil/singleton.h"

#include "yb/util/net/inetaddress.h"
#include "yb/util/result.h"
#include "yb/util/status.h"
#include "yb/util/uuid.h"

Expand Down Expand Up @@ -119,16 +120,12 @@ void DataTypeTraits<INET>::AppendDebugStringForValue(const void *val, std::strin

void DataTypeTraits<UUID>::AppendDebugStringForValue(const void *val, std::string *str) {
const Slice *s = reinterpret_cast<const Slice *>(val);
Uuid uuid;
DCHECK(uuid.FromSlice(*s).ok());
str->append(uuid.ToString());
str->append(CHECK_RESULT(Uuid::FromSlice(*s)).ToString());
}

void DataTypeTraits<TIMEUUID>::AppendDebugStringForValue(const void *val, std::string *str) {
const Slice *s = reinterpret_cast<const Slice *>(val);
Uuid uuid;
DCHECK(uuid.FromSlice(*s).ok());
str->append(uuid.ToString());
str->append(CHECK_RESULT(Uuid::FromSlice(*s)).ToString());
}

} // namespace yb
20 changes: 13 additions & 7 deletions src/yb/docdb/consensus_frontier-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ std::string PbToString(const ConsensusFrontierPB& pb) {
google::protobuf::Any any;
any.PackFrom(pb);
ConsensusFrontier frontier;
frontier.FromPB(any);
CHECK_OK(frontier.FromPB(any));
return frontier.ToString();
}

Expand All @@ -44,7 +44,8 @@ TEST_F(ConsensusFrontierTest, TestUpdates) {
EXPECT_TRUE(frontier.Equals(frontier));
EXPECT_EQ(
"{ op_id: 0.0 hybrid_time: <invalid> history_cutoff: <invalid> "
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> }",
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> "
"primary_schema_version: <NULL> cotable_schema_versions: [] }",
frontier.ToString());
EXPECT_TRUE(frontier.IsUpdateValid(frontier, UpdateUserValueType::kLargest));
EXPECT_TRUE(frontier.IsUpdateValid(frontier, UpdateUserValueType::kSmallest));
Expand All @@ -57,7 +58,8 @@ TEST_F(ConsensusFrontierTest, TestUpdates) {
ConsensusFrontier frontier{{1, 1}, 1000_usec_ht, 500_usec_ht};
EXPECT_EQ(
"{ op_id: 1.1 hybrid_time: { physical: 1000 } history_cutoff: { physical: 500 } "
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> }",
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> "
"primary_schema_version: <NULL> cotable_schema_versions: [] }",
frontier.ToString());
ConsensusFrontier higher_idx{{1, 2}, 1000_usec_ht, 500_usec_ht};
ConsensusFrontier higher_ht{{1, 1}, 1001_usec_ht, 500_usec_ht};
Expand Down Expand Up @@ -115,27 +117,31 @@ TEST_F(ConsensusFrontierTest, TestUpdates) {
EXPECT_EQ(
PbToString(pb),
"{ op_id: 0.0 hybrid_time: <min> history_cutoff: <invalid> "
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> }");
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> "
"primary_schema_version: <NULL> cotable_schema_versions: [] }");

pb.mutable_op_id()->set_term(2);
pb.mutable_op_id()->set_index(3);
EXPECT_EQ(
PbToString(pb),
"{ op_id: 2.3 hybrid_time: <min> history_cutoff: <invalid> "
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> }");
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> "
"primary_schema_version: <NULL> cotable_schema_versions: [] }");

pb.set_hybrid_time(100000);
EXPECT_EQ(
PbToString(pb),
"{ op_id: 2.3 hybrid_time: { physical: 24 logical: 1696 } history_cutoff: <invalid> "
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> }");
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> "
"primary_schema_version: <NULL> cotable_schema_versions: [] }");

pb.set_history_cutoff(200000);
EXPECT_EQ(
PbToString(pb),
"{ op_id: 2.3 hybrid_time: { physical: 24 logical: 1696 } "
"history_cutoff: { physical: 48 logical: 3392 } "
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> }");
"hybrid_time_filter: <invalid> max_value_level_ttl_expiration_time: <invalid> "
"primary_schema_version: <NULL> cotable_schema_versions: [] }");
}

TEST_F(ConsensusFrontierTest, TestUpdateExpirationTime) {
Expand Down
106 changes: 98 additions & 8 deletions src/yb/docdb/consensus_frontier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,62 @@
#include <google/protobuf/any.pb.h>

#include "yb/docdb/docdb.pb.h"

#include "yb/gutil/casts.h"
#include "yb/gutil/stl_util.h"

#include "yb/util/tostring.h"

namespace yb {
namespace docdb {

ConsensusFrontier::~ConsensusFrontier() {
namespace {

template <class T>
void MakeAtLeast(const T& other_value, T* this_value) {
this_value->MakeAtLeast(other_value);
}

template <class T>
void MakeAtMost(const T& other_value, T* this_value) {
this_value->MakeAtMost(other_value);
}

template <class T>
void MakeAtLeast(const std::optional<T>& other_value, std::optional<T>* this_value) {
if (other_value && (!*this_value || **this_value < *other_value)) {
*this_value = *other_value;
}
}

template <class T>
void MakeAtMost(const std::optional<T>& other_value, std::optional<T>* this_value) {
if (other_value && (!*this_value || **this_value > *other_value)) {
*this_value = *other_value;
}
}

void MakeAtLeast(uint32_t other_value, uint32_t* this_value) {
*this_value = std::max(*this_value, other_value);
}

void MakeAtMost(uint32_t other_value, uint32_t* this_value) {
*this_value = std::min(*this_value, other_value);
}

} // namespace

ConsensusFrontier::~ConsensusFrontier() = default;

bool ConsensusFrontier::Equals(const UserFrontier& pre_rhs) const {
const ConsensusFrontier& rhs = down_cast<const ConsensusFrontier&>(pre_rhs);
return op_id_ == rhs.op_id_ &&
hybrid_time_ == rhs.hybrid_time_ &&
history_cutoff_ == rhs.history_cutoff_ &&
hybrid_time_filter_ == rhs.hybrid_time_filter_ &&
max_value_level_ttl_expiration_time_ == rhs.max_value_level_ttl_expiration_time_;
max_value_level_ttl_expiration_time_ == rhs.max_value_level_ttl_expiration_time_ &&
primary_schema_version_ == rhs.primary_schema_version_ &&
cotable_schema_versions_ == rhs.cotable_schema_versions_;
}

void ConsensusFrontier::ToPB(google::protobuf::Any* any) const {
Expand All @@ -43,12 +83,20 @@ void ConsensusFrontier::ToPB(google::protobuf::Any* any) const {
pb.set_hybrid_time_filter(hybrid_time_filter_.ToUint64());
}
pb.set_max_value_level_ttl_expiration_time(max_value_level_ttl_expiration_time_.ToUint64());
if (primary_schema_version_) {
AddTableSchemaVersion(Uuid::Nil(), *primary_schema_version_, &pb);
}
for (const auto& p : cotable_schema_versions_) {
AddTableSchemaVersion(p.first, p.second, &pb);
}
any->PackFrom(pb);
}

void ConsensusFrontier::FromPB(const google::protobuf::Any& any) {
Status ConsensusFrontier::FromPB(const google::protobuf::Any& any) {
ConsensusFrontierPB pb;
any.UnpackTo(&pb);
if (!any.UnpackTo(&pb)) {
return STATUS(Corruption, "Unable to unpack consensus frontier");
}
op_id_ = OpId::FromPB(pb.op_id());
hybrid_time_ = HybridTime(pb.hybrid_time());
history_cutoff_ = NormalizeHistoryCutoff(HybridTime(pb.history_cutoff()));
Expand All @@ -59,6 +107,15 @@ void ConsensusFrontier::FromPB(const google::protobuf::Any& any) {
}
max_value_level_ttl_expiration_time_ =
HybridTime::FromPB(pb.max_value_level_ttl_expiration_time());
for (const auto& p : pb.table_schema_version()) {
if (p.table_id().empty()) {
primary_schema_version_ = p.schema_version();
} else {
cotable_schema_versions_.emplace(
VERIFY_RESULT(Uuid::FromSlice(p.table_id())), p.schema_version());
}
}
return Status::OK();
}

void ConsensusFrontier::FromOpIdPBDeprecated(const OpIdPB& pb) {
Expand All @@ -67,7 +124,8 @@ void ConsensusFrontier::FromOpIdPBDeprecated(const OpIdPB& pb) {

std::string ConsensusFrontier::ToString() const {
return YB_CLASS_TO_STRING(
op_id, hybrid_time, history_cutoff, hybrid_time_filter, max_value_level_ttl_expiration_time);
op_id, hybrid_time, history_cutoff, hybrid_time_filter, max_value_level_ttl_expiration_time,
primary_schema_version, cotable_schema_versions);
}

namespace {
Expand Down Expand Up @@ -96,10 +154,10 @@ void UpdateField(
T* this_value, const T& new_value, rocksdb::UpdateUserValueType update_type) {
switch (update_type) {
case rocksdb::UpdateUserValueType::kLargest:
this_value->MakeAtLeast(new_value);
MakeAtLeast(new_value, this_value);
return;
case rocksdb::UpdateUserValueType::kSmallest:
this_value->MakeAtMost(new_value);
MakeAtMost(new_value, this_value);
return;
}
FATAL_INVALID_ENUM_VALUE(rocksdb::UpdateUserValueType, update_type);
Expand All @@ -116,7 +174,16 @@ void ConsensusFrontier::Update(
// Reset filter after compaction.
hybrid_time_filter_ = HybridTime();
UpdateField(&max_value_level_ttl_expiration_time_,
rhs.max_value_level_ttl_expiration_time_, update_type);
rhs.max_value_level_ttl_expiration_time_, update_type);
UpdateField(&primary_schema_version_, rhs.primary_schema_version_, update_type);
for (const auto& p : rhs.cotable_schema_versions_) {
auto it = cotable_schema_versions_.find(p.first);
if (it == cotable_schema_versions_.end()) {
cotable_schema_versions_.emplace(p);
} else {
UpdateField(&it->second, p.second, update_type);
}
}
}

Slice ConsensusFrontier::Filter() const {
Expand All @@ -136,5 +203,28 @@ bool ConsensusFrontier::IsUpdateValid(
IsUpdateValidForField(hybrid_time_, rhs.hybrid_time_, update_type);
}

void ConsensusFrontier::AddSchemaVersion(const Uuid& table_id, SchemaVersion version) {
if (table_id.IsNil()) {
primary_schema_version_ = version;
} else {
cotable_schema_versions_[table_id] = version;
}
}

void ConsensusFrontier::ResetSchemaVersion() {
primary_schema_version_.reset();
cotable_schema_versions_.clear();
}

void ConsensusFrontier::MakeExternalSchemaVersionsAtMost(
std::unordered_map<Uuid, SchemaVersion, UuidHash>* min_schema_versions) const {
if (primary_schema_version_) {
yb::MakeAtMost(Uuid::Nil(), *primary_schema_version_, min_schema_versions);
}
for (const auto& p : cotable_schema_versions_) {
yb::MakeAtMost(p.first, p.second, min_schema_versions);
}
}

} // namespace docdb
} // namespace yb
Loading

0 comments on commit fbd3bac

Please sign in to comment.