Skip to content

Commit

Permalink
[#3520] DocDB: Fix PITR in conjunctions with packed rows
Browse files Browse the repository at this point in the history
Summary:
This diff fixes the following issues related to PITR and packed rows.
1) Support reading pg catalog version from packed row.
2) Ignore tombstoned column updates during read and compaction of pgsql tables.

When patching pg catalog version we pick max version from all entries present in existing state, add 1 to it and add to write batch after all other entries.
The following cases are possible:

(1)
Restoring state:
`doc_key => (restoring_version1, last_breaking_version1)`
Existing state:
`doc_key => (existing_version1, last_breaking_version2)`
Here we will just pick existing_version1 as max found version.

(2)
Restoring state:
`doc_key => (restoring_version1, last_breaking_version1)`
`doc_key, column_id("current_version") => restoring_version2`
Exiting state:
`doc_key => (existing_version1, last_breaking_version2)`
The same as above, with exception that entry for restoring_version2 will also be added. But with lower write id, since we put patched pg catalog version after iteration.

(3)
Restoring state:
`doc_key => (restoring_version1, last_breaking_version1)`
Exiting state:
`doc_key => (existing_version1, last_breaking_version2)`
`doc_key, column_id("current_version") => existing_version1`
The max of existing_version2 and existing_version3 will be used.

Also updated FetchState to handle packed rows by maintaining key value stack.

Test Plan: YbAdminSnapshotScheduleTest.PgsqlDropDefaultWithPackedRow

Reviewers: mbautin, skedia

Reviewed By: mbautin, skedia

Subscribers: zdrudi, timur, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D17834
  • Loading branch information
spolitov committed Jul 15, 2022
1 parent 49e70b4 commit 79b1c13
Show file tree
Hide file tree
Showing 14 changed files with 356 additions and 206 deletions.
186 changes: 127 additions & 59 deletions ent/src/yb/master/restore_sys_catalog_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,120 @@ Status ValidateSysCatalogTables(
return Status::OK();
}

struct PgCatalogTableData {
std::array<uint8_t, kUuidSize + 1> prefix;
const TableName* name;
uint32_t pg_table_oid;

Status SetTableId(const TableId& table_id) {
Uuid cotable_id = VERIFY_RESULT(Uuid::FromHexString(table_id));
prefix[0] = docdb::KeyEntryTypeAsChar::kTableId;
cotable_id.EncodeToComparable(&prefix[1]);
pg_table_oid = VERIFY_RESULT(GetPgsqlTableOid(table_id));
return Status::OK();
}

bool IsPgYbCatalogMeta() const {
// We reset name to nullptr for pg_yb_catalog_meta table.
return name == nullptr;
}
};

class PgCatalogRestorePatch : public RestorePatch {
public:
PgCatalogRestorePatch(
FetchState* existing_state, FetchState* restoring_state,
docdb::DocWriteBatch* doc_batch, const PgCatalogTableData& table,
tablet::TableInfo* pg_yb_catalog_meta)
: RestorePatch(existing_state, restoring_state, doc_batch),
table_(table), pg_yb_catalog_meta_(pg_yb_catalog_meta) {}

Status Finish() {
if (!catalog_version_doc_path_) {
return Status::OK();
}
QLValuePB value_pb;
value_pb.set_int64_value(catalog_version_);
LOG(INFO) << "PITR: Incrementing pg_yb_catalog version to " << catalog_version_;
return DocBatch()->SetPrimitive(
*catalog_version_doc_path_, docdb::ValueRef(value_pb, SortingType::kNotSpecified));
}

private:
Status ProcessCommonEntry(
const Slice& key, const Slice& existing_value, const Slice& restoring_value) override {
if (!table_.IsPgYbCatalogMeta()) {
return RestorePatch::ProcessCommonEntry(key, existing_value, restoring_value);
}
return ProcessCatalogVersionEntry(key, existing_value);
}

Status ProcessExistingOnlyEntry(
const Slice& existing_key, const Slice& existing_value) override {
if (!table_.IsPgYbCatalogMeta()) {
return RestorePatch::ProcessExistingOnlyEntry(existing_key, existing_value);
}
return ProcessCatalogVersionEntry(existing_key, existing_value);
}

Result<bool> ShouldSkipEntry(const Slice& key, const Slice& value) override {
return false;
}

Status HandleSchemaVersionValue(
const docdb::DocKey& doc_key, ColumnId column_id, const Slice& existing_value) {
docdb::Value value;
RETURN_NOT_OK(value.Decode(existing_value));
auto new_version = value.primitive_value().GetInt64() + 1;
if (!catalog_version_doc_path_ || catalog_version_ < new_version) {
catalog_version_doc_path_.emplace(
doc_key.Encode(), docdb::KeyEntryValue::MakeColumnId(column_id));
catalog_version_ = new_version;
}
return Status::OK();
}

Status ProcessCatalogVersionEntry(const Slice& key, const Slice& full_value) {
docdb::SubDocKey sub_doc_key;
RETURN_NOT_OK(sub_doc_key.FullyDecodeFrom(key, docdb::HybridTimeRequired::kFalse));
if (sub_doc_key.subkeys().empty()) {
auto value_slice = full_value;
RETURN_NOT_OK(docdb::ValueControlFields::Decode(&value_slice));
SCHECK(value_slice.TryConsumeByte(docdb::ValueEntryTypeAsChar::kPackedRow),
Corruption, "Packed row expected: $0", full_value.ToDebugHexString());
const docdb::SchemaPacking& packing = VERIFY_RESULT(
pg_yb_catalog_meta_->doc_read_context->schema_packing_storage.GetPacking(&value_slice));
auto column_id = VERIFY_RESULT(
pg_yb_catalog_meta_->schema().ColumnIdByName(kCurrentVersionColumnName));
auto value = packing.GetValue(column_id, value_slice);
if (!value) {
return STATUS_FORMAT(
Corruption, "$0 missing in $1", kCurrentVersionColumnName,
full_value.ToDebugHexString());
}
return HandleSchemaVersionValue(sub_doc_key.doc_key(), column_id, *value);
}
SCHECK_EQ(sub_doc_key.subkeys().size(), 1U, Corruption, "Wrong number of subdoc keys");
const auto& first_subkey = sub_doc_key.subkeys()[0];
if (first_subkey.type() == docdb::KeyEntryType::kColumnId) {
auto column_id = first_subkey.GetColumnId();
const ColumnSchema& column = VERIFY_RESULT(pg_yb_catalog_meta_->schema().column_by_id(
column_id));
if (column.name() == kCurrentVersionColumnName) {
return HandleSchemaVersionValue(sub_doc_key.doc_key(), column_id, full_value);
}
}
return Status::OK();
}

// Should be alive while this object is alive.
const PgCatalogTableData& table_;
// Should be alive while this object is alive.
tablet::TableInfo* pg_yb_catalog_meta_;
std::optional<docdb::DocPath> catalog_version_doc_path_;
int64_t catalog_version_;
};

} // namespace

RestoreSysCatalogState::RestoreSysCatalogState(SnapshotScheduleRestoration* restoration)
Expand Down Expand Up @@ -625,20 +739,6 @@ void RestoreSysCatalogState::WriteToRocksDB(
yb::WriteToRocksDB(write_batch, write_time, op_id, tablet, restore_kv);
}

struct PgCatalogTableData {
std::array<uint8_t, kUuidSize + 1> prefix;
const TableName* name;
uint32_t pg_table_oid;

Status SetTableId(const TableId& table_id) {
Uuid cotable_id = VERIFY_RESULT(Uuid::FromHexString(table_id));
prefix[0] = docdb::KeyEntryTypeAsChar::kTableId;
cotable_id.EncodeToComparable(&prefix[1]);
pg_table_oid = VERIFY_RESULT(GetPgsqlTableOid(table_id));
return Status::OK();
}
};

Status RestoreSysCatalogState::IncrementLegacyCatalogVersion(
const docdb::DocReadContext& doc_read_context, const docdb::DocDB& doc_db,
docdb::DocWriteBatch* write_batch) {
Expand Down Expand Up @@ -728,17 +828,22 @@ Status RestoreSysCatalogState::ProcessPgCatalogRestores(

RETURN_NOT_OK(restore_patch.PatchCurrentStateFromRestoringState());

RETURN_NOT_OK(restore_patch.Finish());

size_t total_changes = restore_patch.TotalTickerCount();

if (total_changes != 0 || VLOG_IS_ON(3)) {
LOG(INFO) << "PITR: Pg system table: " << AsString(table.name) << ", "
<< restore_patch.TickersToString();
}
if (table.pg_table_oid == kPgYbMigrationTableOid && total_changes != 0) {
LOG(INFO) << "PITR: YSQL upgrade was performed since the restore time"
<< ", total changes: " << total_changes;
LOG_IF(INFO, total_changes || VLOG_IS_ON(3))
<< "PITR: Pg system table " << AsString(table.name) << ": "
<< restore_patch.TickersToString();

// During migration we insert a new row to migration table, so it is enough to check number of
// rows in this table to understand whether it was modified or not.
if (table.pg_table_oid == kPgYbMigrationTableOid &&
restoring_state.num_rows() != existing_state.num_rows()) {
LOG(INFO) << "PITR: YSQL upgrade was performed since the restore time, restoring rows: "
<< restoring_state.num_rows() << ", existing rows: " << existing_state.num_rows();
return STATUS(
NotSupported, "Unable to restore as YSQL upgrade was performed since the restore time.");
NotSupported, "Unable to restore as YSQL upgrade was performed since the restore time");
}
}

Expand All @@ -756,42 +861,5 @@ bool RestoreSysCatalogState::IsYsqlRestoration() {
return false;
}

Status PgCatalogRestorePatch::ProcessEqualEntries(
const Slice& existing_key, const Slice& existing_value,
const Slice& restoring_key, const Slice& restoring_value) {
if (table_.name != nullptr) {
if (restoring_value.compare(existing_value)) {
IncrementTicker(RestoreTicker::kUpdates);
AddKeyValue(restoring_key, restoring_value, DocBatch());
}
} else {
docdb::SubDocKey sub_doc_key;
RETURN_NOT_OK(sub_doc_key.FullyDecodeFrom(
restoring_key, docdb::HybridTimeRequired::kFalse));
SCHECK_EQ(sub_doc_key.subkeys().size(), 1U, Corruption, "Wrong number of subdoc keys");
if (sub_doc_key.subkeys()[0].type() == docdb::KeyEntryType::kColumnId) {
auto column_id = sub_doc_key.subkeys()[0].GetColumnId();
const ColumnSchema& column = VERIFY_RESULT(pg_yb_catalog_meta_->schema().column_by_id(
column_id));
if (column.name() == "current_version") {
docdb::Value value;
RETURN_NOT_OK(value.Decode(existing_value));
docdb::DocPath path(sub_doc_key.doc_key().Encode(), sub_doc_key.subkeys());
QLValuePB value_pb;
value_pb.set_int64_value(value.primitive_value().GetInt64() + 1);
LOG(INFO) << "PITR: Incrementing pg_yb_catalog version to "
<< value.primitive_value().GetInt64() + 1;
RETURN_NOT_OK(DocBatch()->SetPrimitive(
path, docdb::ValueRef(value_pb, SortingType::kNotSpecified)));
}
}
}
return Status::OK();
}

Result<bool> PgCatalogRestorePatch::ShouldSkipEntry(const Slice& key, const Slice& value) {
return false;
}

} // namespace master
} // namespace yb
22 changes: 0 additions & 22 deletions ent/src/yb/master/restore_sys_catalog_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
namespace yb {
namespace master {

struct PgCatalogTableData;

// Utility class to restore sys catalog.
// Initially we load tables and tablets into it, then match schedule filter.
class RestoreSysCatalogState {
Expand Down Expand Up @@ -163,26 +161,6 @@ class RestoreSysCatalogState {
RetainedExistingTables retained_existing_tables_;
};

class PgCatalogRestorePatch : public RestorePatch {
public:
PgCatalogRestorePatch(
FetchState* existing_state, FetchState* restoring_state,
docdb::DocWriteBatch* doc_batch, const PgCatalogTableData& table,
tablet::TableInfo* pg_yb_catalog_meta)
: RestorePatch(existing_state, restoring_state, doc_batch),
table_(table), pg_yb_catalog_meta_(pg_yb_catalog_meta) {}

private:
Status ProcessEqualEntries(
const Slice& existing_key, const Slice& existing_value,
const Slice& restoring_key, const Slice& restoring_value) override;

Result<bool> ShouldSkipEntry(const Slice& key, const Slice& value) override;

const PgCatalogTableData& table_;
tablet::TableInfo* pg_yb_catalog_meta_;
};

} // namespace master
} // namespace yb

Expand Down
45 changes: 29 additions & 16 deletions src/yb/docdb/doc_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ Result<boost::optional<SubDocument>> TEST_GetSubDocument(
iter->read_time().ToString());
iter->SeekToLastDocKey();
SchemaPackingStorage schema_packing_storage;
DocDBTableReader doc_reader(iter.get(), deadline, projection, schema_packing_storage);
DocDBTableReader doc_reader(
iter.get(), deadline, projection, TableType::YQL_TABLE_TYPE, schema_packing_storage);
RETURN_NOT_OK(doc_reader.UpdateTableTombstoneTime(sub_doc_key));

iter->Seek(sub_doc_key);
Expand All @@ -158,10 +159,12 @@ Result<boost::optional<SubDocument>> TEST_GetSubDocument(
DocDBTableReader::DocDBTableReader(
IntentAwareIterator* iter, CoarseTimePoint deadline,
const std::vector<KeyEntryValue>* projection,
TableType table_type,
std::reference_wrapper<const SchemaPackingStorage> schema_packing_storage)
: iter_(iter),
deadline_info_(deadline),
projection_(projection),
table_type_(table_type),
schema_packing_storage_(schema_packing_storage) {
if (projection_) {
auto projection_size = projection_->size();
Expand Down Expand Up @@ -355,9 +358,8 @@ class DocDBTableReader::GetHelper {
}
}

if (!packed_column_data_ || packed_column_data_.row->doc_ht < key_result.write_time) {
RETURN_NOT_OK(ProcessEntry(
subkeys, reader_.iter_->value(), key_result.write_time, check_exist_only));
if (VERIFY_RESULT(ProcessEntry(
subkeys, reader_.iter_->value(), key_result.write_time, check_exist_only))) {
packed_column_data_.row = nullptr;
}
if (check_exist_only && Found()) {
Expand All @@ -382,14 +384,15 @@ class DocDBTableReader::GetHelper {
}

// Process DB entry.
Status ProcessEntry(
// Return true if entry value was accepted.
Result<bool> ProcessEntry(
Slice subkeys, Slice value_slice, const DocHybridTime& write_time,
CheckExistOnly check_exist_only) {
subkeys = CleanupState(subkeys);
if (state_.back().write_time >= write_time) {
VLOG_WITH_PREFIX_AND_FUNC(4)
<< "State: " << AsString(state_) << ", write_time: " << write_time;
return Status::OK();
return false;
}
auto control_fields = VERIFY_RESULT(ValueControlFields::Decode(&value_slice));
RETURN_NOT_OK(AllocateNewStateEntries(
Expand Down Expand Up @@ -427,21 +430,31 @@ class DocDBTableReader::GetHelper {
return Status::OK();
}

Status ApplyEntryValue(
// Return true if entry value was accepted.
Result<bool> ApplyEntryValue(
const Slice& value_slice, const ValueControlFields& control_fields,
CheckExistOnly check_exist_only) {
VLOG_WITH_FUNC(4)
VLOG_WITH_PREFIX_AND_FUNC(4)
<< "State: " << AsString(state_) << ", value: " << value_slice.ToDebugHexString();
auto& current = state_.back();
if (!IsObsolete(current.expiration, reader_.iter_->read_time().read) &&
VERIFY_RESULT(TryDecodeValue(
reader_.iter_->read_time().read, control_fields, current.write_time.hybrid_time(),
current.expiration, value_slice, current.out))) {
last_found_ = column_index_;
} else if (!check_exist_only && state_.size() > (reader_.projection_ ? 2 : 1)) {
if (!IsObsolete(current.expiration, reader_.iter_->read_time().read)) {
if (VERIFY_RESULT(TryDecodeValue(
reader_.iter_->read_time().read, control_fields, current.write_time.hybrid_time(),
current.expiration, value_slice, current.out))) {
last_found_ = column_index_;
return true;
}
if (reader_.table_type_ == TableType::PGSQL_TABLE_TYPE) {
return false;
}
}

// When projection is specified we should always report projection columns, even when they are
// nulls.
if (!check_exist_only && state_.size() > (reader_.projection_ ? 2 : 1)) {
state_[state_.size() - 2].out->DeleteChild(current.key_value);
}
return Status::OK();
return true;
}

Result<bool> NextColumn() {
Expand Down Expand Up @@ -541,7 +554,7 @@ class DocDBTableReader::GetHelper {
return PackedColumnData();
}

VLOG_WITH_PREFIX_AND_FUNC(4) << "Packed row: " << slice->ToDebugHexString();
VLOG_WITH_PREFIX_AND_FUNC(4) << "Packed row " << column_id << ": " << slice->ToDebugHexString();
return PackedColumnData {
.row = &packed_row_data_,
.encoded_value = slice->empty() ? NullSlice() : *slice,
Expand Down
3 changes: 3 additions & 0 deletions src/yb/docdb/doc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "yb/docdb/docdb_fwd.h"
#include "yb/rocksdb/cache.h"

#include "yb/common/common_types.pb.h"
#include "yb/common/doc_hybrid_time.h"
#include "yb/common/read_hybrid_time.h"
#include "yb/common/transaction.h"
Expand Down Expand Up @@ -76,6 +77,7 @@ class DocDBTableReader {
DocDBTableReader(
IntentAwareIterator* iter, CoarseTimePoint deadline,
const std::vector<KeyEntryValue>* projection,
TableType table_type,
std::reference_wrapper<const SchemaPackingStorage> schema_packing_storage);

// Updates expiration/overwrite data based on table tombstone time. If the table is not a
Expand All @@ -102,6 +104,7 @@ class DocDBTableReader {
IntentAwareIterator* iter_;
DeadlineInfo deadline_info_;
const std::vector<KeyEntryValue>* projection_;
const TableType table_type_;
const SchemaPackingStorage& schema_packing_storage_;

std::vector<KeyBytes> encoded_projection_;
Expand Down
Loading

0 comments on commit 79b1c13

Please sign in to comment.