Skip to content

Commit

Permalink
[#21580]: docdb: Revert "[#21877] docdb: Revert "[#21580] docdb: Filt…
Browse files Browse the repository at this point in the history
…er intent SST files only retained for CDC""

Summary:
This reverts commit D34245 / 89316bd, which reverted
D33131 / fb7c86c due to a segmentation fault introduced due to
`min_running_ht` being initialized too early; this issue is now fixed with
D34389 / 138b81a.
Jira: DB-10466, DB-10780

Test Plan: Jenkins

Reviewers: yyan, sergei

Reviewed By: yyan

Subscribers: rthallam, ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D34745
  • Loading branch information
es1024 committed May 6, 2024
1 parent bf17bb5 commit dbf690f
Show file tree
Hide file tree
Showing 16 changed files with 114 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/yb/ash/wait_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ DEFINE_RUNTIME_PG_PREVIEW_FLAG(bool, yb_enable_ash, false,
"and various background activities. This does nothing if "
"ysql_yb_enable_ash_infra is disabled.");

DEFINE_test_flag(bool, export_wait_state_names, yb::IsDebug(),
DEFINE_test_flag(bool, export_wait_state_names, yb::kIsDebug,
"Exports wait-state name as a human understandable string.");
DEFINE_test_flag(bool, trace_ash_wait_code_updates, yb::IsDebug(),
DEFINE_test_flag(bool, trace_ash_wait_code_updates, yb::kIsDebug,
"Add a trace line whenever the wait state code is updated.");
DEFINE_test_flag(uint32, yb_ash_sleep_at_wait_state_ms, 0,
"How long to sleep/delay when entering a particular wait state.");
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/ql-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#include "yb/tserver/ts_tablet_manager.h"

#include "yb/util/backoff_waiter.h"
#include "yb/util/debug-util.h"
#include "yb/util/debug.h"
#include "yb/util/format.h"
#include "yb/util/metrics.h"
#include "yb/util/random_util.h"
Expand Down Expand Up @@ -830,7 +830,7 @@ void QLStressTest::AddWriter(
}

void QLStressTest::TestWriteRejection() {
constexpr int kWriters = IsDebug() ? 10 : 20;
constexpr int kWriters = kIsDebug ? 10 : 20;
constexpr int kKeyBase = 10000;

std::array<std::atomic<int>, kWriters> keys;
Expand Down
21 changes: 10 additions & 11 deletions src/yb/docdb/conflict_resolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {

// Reads conflicts for specified intent from DB.
Status ReadIntentConflicts(IntentTypeSet type, KeyBytes* intent_key_prefix) {
EnsureIntentIteratorCreated();
if (!CreateIntentIteratorIfNecessary()) {
return Status::OK();
}

const auto conflicting_intent_types = kIntentTypeSetConflicts[type.ToUIntPtr()];

Expand Down Expand Up @@ -303,17 +305,12 @@ class ConflictResolver : public std::enable_shared_from_this<ConflictResolver> {
return intent_iter_.status();
}

void EnsureIntentIteratorCreated() {
bool CreateIntentIteratorIfNecessary() {
if (!intent_iter_.Initialized()) {
intent_iter_ = CreateRocksDBIterator(
doc_db_.intents,
doc_db_.key_bounds,
BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none /* user_key_for_filter */,
rocksdb::kDefaultQueryId,
nullptr /* file_filter */,
&intent_key_upperbound_);
intent_iter_ = CreateIntentsIteratorWithHybridTimeFilter(
doc_db_.intents, &status_manager(), doc_db_.key_bounds, &intent_key_upperbound_);
}
return intent_iter_.Initialized();
}

Result<IntentTypesContainer> GetLockStatusInfo() {
Expand Down Expand Up @@ -1147,7 +1144,9 @@ class TransactionConflictResolverContext : public ConflictResolverContextBase {
// This is to prevent the case when we create an iterator on the regular DB where a
// provisional record has not yet been applied, and then create an iterator the intents
// DB where the provisional record has already been removed.
resolver->EnsureIntentIteratorCreated();
// Even in the case where there are no intents to iterate over, the following loop must be
// run, so we cannot return early if the following call returns false.
resolver->CreateIntentIteratorIfNecessary();

for (const auto& i : container) {
const Slice intent_key = i.first.AsSlice();
Expand Down
12 changes: 12 additions & 0 deletions src/yb/docdb/doc_ql_filefilter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@

#include "yb/rocksdb/db/compaction.h"

#include "yb/util/debug.h"

DEFINE_RUNTIME_bool(docdb_ht_filter_intents, yb::kIsDebug,
"Use hybrid time SST filter when scanning intents.");

namespace yb::docdb {

rocksdb::UserBoundaryTag TagForRangeComponent(size_t index);
Expand Down Expand Up @@ -147,4 +152,11 @@ std::shared_ptr<rocksdb::ReadFileFilter> CreateHybridTimeFileFilter(HybridTime m
return std::make_shared<HybridTimeFileFilter>(min_hybrid_time);
}

std::shared_ptr<rocksdb::ReadFileFilter> CreateIntentHybridTimeFileFilter(
HybridTime min_running_ht) {
return GetAtomicFlag(&FLAGS_docdb_ht_filter_intents) && min_running_ht != HybridTime::kMin
? std::make_shared<HybridTimeFileFilter>(min_running_ht)
: nullptr;
}

} // namespace yb::docdb
5 changes: 5 additions & 0 deletions src/yb/docdb/doc_ql_filefilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "yb/common/hybrid_time.h"
#include "yb/common/transaction.h"

#include "yb/qlexpr/qlexpr_fwd.h"

Expand All @@ -25,5 +26,9 @@ namespace yb::docdb {

std::shared_ptr<rocksdb::ReadFileFilter> CreateFileFilter(const qlexpr::YQLScanSpec& scan_spec);
std::shared_ptr<rocksdb::ReadFileFilter> CreateHybridTimeFileFilter(HybridTime min_hybrid_Time);
// Create a file filter for intentsdb using the given min running hybrid time. Filtering is done
// based on intent hybrid time stored in the intent key, not commit time of the transaction.
std::shared_ptr<rocksdb::ReadFileFilter> CreateIntentHybridTimeFileFilter(
HybridTime min_running_ht);

} // namespace yb::docdb
22 changes: 22 additions & 0 deletions src/yb/docdb/docdb_rocksdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,28 @@ unique_ptr<IntentAwareIterator> CreateIntentAwareIterator(
statistics ? statistics->IntentsDBStatistics() : nullptr);
}

BoundedRocksDbIterator CreateIntentsIteratorWithHybridTimeFilter(
rocksdb::DB* intentsdb,
const TransactionStatusManager* status_manager,
const KeyBounds* docdb_key_bounds,
const Slice* iterate_upper_bound,
rocksdb::Statistics* statistics) {
auto min_running_ht = status_manager->MinRunningHybridTime();
if (min_running_ht == HybridTime::kMax) {
VLOG(4) << "No transactions running";
return {};
}
return CreateRocksDBIterator(
intentsdb,
docdb_key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none /* user_key_for_filter */,
rocksdb::kDefaultQueryId,
CreateIntentHybridTimeFileFilter(min_running_ht),
iterate_upper_bound,
statistics);
}

namespace {

std::mutex rocksdb_flags_mutex;
Expand Down
7 changes: 7 additions & 0 deletions src/yb/docdb/docdb_rocksdb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ std::unique_ptr<IntentAwareIterator> CreateIntentAwareIterator(
const Slice* iterate_upper_bound = nullptr,
const DocDBStatistics* statistics = nullptr);

BoundedRocksDbIterator CreateIntentsIteratorWithHybridTimeFilter(
rocksdb::DB* intentsdb,
const TransactionStatusManager* status_manager,
const KeyBounds* docdb_key_bounds,
const Slice* iterate_upper_bound = nullptr,
rocksdb::Statistics* statistics = nullptr);

std::shared_ptr<rocksdb::RocksDBPriorityThreadPoolMetrics> CreateRocksDBPriorityThreadPoolMetrics(
scoped_refptr<yb::MetricEntity> entity);

Expand Down
16 changes: 4 additions & 12 deletions src/yb/docdb/intent_aware_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "yb/common/hybrid_time.h"
#include "yb/common/transaction.h"

#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb_fwd.h"
#include "yb/docdb/conflict_resolution.h"
#include "yb/docdb/docdb-internal.h"
Expand Down Expand Up @@ -139,18 +140,9 @@ IntentAwareIterator::IntentAwareIterator(
<< ", txn_op_context: " << txn_op_context_;

if (txn_op_context) {
if (txn_op_context.txn_status_manager->MinRunningHybridTime() != HybridTime::kMax) {
intent_iter_ = docdb::CreateRocksDBIterator(doc_db.intents,
doc_db.key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none,
rocksdb::kDefaultQueryId,
nullptr /* file_filter */,
&intent_upperbound_,
intentsdb_statistics);
} else {
VLOG(4) << "No transactions running";
}
intent_iter_ = docdb::CreateIntentsIteratorWithHybridTimeFilter(
doc_db.intents, txn_op_context.txn_status_manager, doc_db.key_bounds, &intent_upperbound_,
intentsdb_statistics);
}
// WARNING: Is is important for regular DB iterator to be created after intents DB iterator,
// otherwise consistency could break, for example in following scenario:
Expand Down
15 changes: 3 additions & 12 deletions src/yb/docdb/intent_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "yb/docdb/intent_iterator.h"

#include "yb/docdb/conflict_resolution.h"
#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb-internal.h"
#include "yb/docdb/docdb_rocksdb_util.h"
#include "yb/docdb/iter_util.h"
Expand Down Expand Up @@ -74,18 +75,8 @@ IntentIterator::IntentIterator(
VLOG(4) << "IntentIterator, read_time: " << read_time << ", txn_op_context: " << txn_op_context_;

if (txn_op_context) {
if (txn_op_context.txn_status_manager->MinRunningHybridTime() != HybridTime::kMax) {
intent_iter_ = docdb::CreateRocksDBIterator(
intents_db,
docdb_key_bounds,
docdb::BloomFilterMode::DONT_USE_BLOOM_FILTER,
boost::none,
rocksdb::kDefaultQueryId,
nullptr /* file_filter */,
&upperbound_);
} else {
VLOG(4) << "No transactions running";
}
intent_iter_ = docdb::CreateIntentsIteratorWithHybridTimeFilter(
intents_db, txn_op_context.txn_status_manager, docdb_key_bounds, &upperbound_);
}
VTRACE(2, "Created intent iterator - initialized? - $0", intent_iter_.Initialized());
}
Expand Down
10 changes: 8 additions & 2 deletions src/yb/docdb/rocksdb_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "yb/common/row_mark.h"

#include "yb/docdb/conflict_resolution.h"
#include "yb/docdb/doc_ql_filefilter.h"
#include "yb/docdb/docdb.messages.h"
#include "yb/docdb/docdb_compaction_context.h"
#include "yb/docdb/docdb_rocksdb_util.h"
Expand Down Expand Up @@ -437,15 +438,18 @@ IntentsWriterContext::IntentsWriterContext(const TransactionId& transaction_id)
}

IntentsWriter::IntentsWriter(const Slice& start_key,
HybridTime file_filter_ht,
rocksdb::DB* intents_db,
IntentsWriterContext* context)
: start_key_(start_key), intents_db_(intents_db), context_(*context) {
AppendTransactionKeyPrefix(context_.transaction_id(), &txn_reverse_index_prefix_);
txn_reverse_index_prefix_.AppendKeyEntryType(dockv::KeyEntryType::kMaxByte);
reverse_index_upperbound_ = txn_reverse_index_prefix_.AsSlice();

reverse_index_iter_ = CreateRocksDBIterator(
intents_db_, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound_);
rocksdb::kDefaultQueryId, CreateIntentHybridTimeFileFilter(file_filter_ht),
&reverse_index_upperbound_);
}

Status IntentsWriter::Apply(rocksdb::DirectWriteHandler* handler) {
Expand Down Expand Up @@ -502,6 +506,7 @@ ApplyIntentsContext::ApplyIntentsContext(
const SubtxnSet& aborted,
HybridTime commit_ht,
HybridTime log_ht,
HybridTime file_filter_ht,
const KeyBounds* key_bounds,
SchemaPackingProvider* schema_packing_provider,
rocksdb::DB* intents_db)
Expand All @@ -519,7 +524,8 @@ ApplyIntentsContext::ApplyIntentsContext(
key_bounds_(key_bounds),
intent_iter_(CreateRocksDBIterator(
intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
rocksdb::kDefaultQueryId)) {
rocksdb::kDefaultQueryId,
CreateIntentHybridTimeFileFilter(file_filter_ht))) {
}

Result<bool> ApplyIntentsContext::StoreApplyState(
Expand Down
2 changes: 2 additions & 0 deletions src/yb/docdb/rocksdb_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ class IntentsWriterContext {
class IntentsWriter : public rocksdb::DirectWriter {
public:
IntentsWriter(const Slice& start_key,
HybridTime file_filter_ht,
rocksdb::DB* intents_db,
IntentsWriterContext* context);

Expand Down Expand Up @@ -219,6 +220,7 @@ class ApplyIntentsContext : public IntentsWriterContext, public FrontierSchemaVe
const SubtxnSet& aborted,
HybridTime commit_ht,
HybridTime log_ht,
HybridTime file_filter_ht,
const KeyBounds* key_bounds,
SchemaPackingProvider* schema_packing_provider,
rocksdb::DB* intents_db);
Expand Down
11 changes: 8 additions & 3 deletions src/yb/tablet/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2067,6 +2067,8 @@ Status Tablet::ImportData(const std::string& source_dir) {
Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApplyData& data) {
VLOG_WITH_PREFIX(4) << __func__ << ": " << data.transaction_id;

HybridTime min_running_ht = transaction_participant_->MinRunningHybridTime();

// This flag enables tests to induce a situation where a transaction has committed but its intents
// haven't yet moved to regular db for a sufficiently long period. For example, it can help a test
// to reliably assert that conflict resolution/ concurrency control with a conflicting committed
Expand All @@ -2075,9 +2077,10 @@ Result<docdb::ApplyTransactionState> Tablet::ApplyIntents(const TransactionApply
AtomicFlagSleepMs(&FLAGS_TEST_inject_sleep_before_applying_intents_ms);
docdb::ApplyIntentsContext context(
data.transaction_id, data.apply_state, data.aborted, data.commit_ht, data.log_ht,
&key_bounds_, metadata_.get(), intents_db_.get());
min_running_ht, &key_bounds_, metadata_.get(), intents_db_.get());
docdb::IntentsWriter intents_writer(
data.apply_state ? data.apply_state->key : Slice(), intents_db_.get(), &context);
data.apply_state ? data.apply_state->key : Slice(), min_running_ht,
intents_db_.get(), &context);
rocksdb::WriteBatch regular_write_batch;
regular_write_batch.SetDirectWriter(&intents_writer);
// data.hybrid_time contains transaction commit time.
Expand All @@ -2099,12 +2102,14 @@ Status Tablet::RemoveIntentsImpl(
RETURN_NOT_OK(scoped_read_operation);

rocksdb::WriteBatch intents_write_batch;
HybridTime min_running_ht = CHECK_NOTNULL(transaction_participant_)->MinRunningHybridTime();
for (const auto& id : ids) {
boost::optional<docdb::ApplyTransactionState> apply_state;
for (;;) {
docdb::RemoveIntentsContext context(id, static_cast<uint8_t>(reason));
docdb::IntentsWriter writer(
apply_state ? apply_state->key : Slice(), intents_db_.get(), &context);
apply_state ? apply_state->key : Slice(), min_running_ht,
intents_db_.get(), &context);
intents_write_batch.SetDirectWriter(&writer);
docdb::ConsensusFrontiers frontiers;
auto frontiers_ptr = InitFrontiers(data, &frontiers);
Expand Down
6 changes: 3 additions & 3 deletions src/yb/tablet/transaction_participant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
#include "yb/util/async_util.h"
#include "yb/util/callsite_profiling.h"
#include "yb/util/countdown_latch.h"
#include "yb/util/debug-util.h"
#include "yb/util/debug.h"
#include "yb/util/flags.h"
#include "yb/util/format.h"
#include "yb/util/logging.h"
Expand Down Expand Up @@ -105,11 +105,11 @@ DEFINE_NON_RUNTIME_int32(wait_queue_poll_interval_ms, 100,
"active blockers.");

// TODO: this should be turned into an autoflag.
DEFINE_RUNTIME_bool(cdc_write_post_apply_metadata, yb::IsDebug(),
DEFINE_RUNTIME_bool(cdc_write_post_apply_metadata, yb::kIsDebug,
"Write post-apply transaction metadata to intentsdb for transaction that have been applied but "
" have not yet been streamed by CDC.");

DEFINE_RUNTIME_bool(cdc_immediate_transaction_cleanup, yb::IsDebug(),
DEFINE_RUNTIME_bool(cdc_immediate_transaction_cleanup, yb::kIsDebug,
"Clean up transactions from memory after apply, even if its changes have not yet been "
"streamed by CDC.");

Expand Down
4 changes: 2 additions & 2 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
#include "yb/tserver/tserver_service.pb.h"
#include "yb/tserver/tserver_service.proxy.h"

#include "yb/util/debug-util.h"
#include "yb/util/debug.h"
#include "yb/util/flags.h"
#include "yb/util/flags/flag_tags.h"
#include "yb/util/logging.h"
Expand All @@ -86,7 +86,7 @@ using namespace std::literals;
DEFINE_UNKNOWN_uint64(pg_client_session_expiration_ms, 60000,
"Pg client session expiration time in milliseconds.");

DEFINE_RUNTIME_bool(pg_client_use_shared_memory, yb::IsDebug(),
DEFINE_RUNTIME_bool(pg_client_use_shared_memory, yb::kIsDebug,
"Use shared memory for executing read and write pg client queries");

DEFINE_RUNTIME_int32(get_locks_status_max_retry_attempts, 2,
Expand Down
9 changes: 1 addition & 8 deletions src/yb/util/debug-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "yb/gutil/strings/fastmem.h"

#include "yb/util/debug.h"
#include "yb/util/enums.h"
#include "yb/util/slice.h"
#include "yb/util/stack_trace.h"
Expand Down Expand Up @@ -103,14 +104,6 @@ std::string GetLogFormatStackTraceHex();
// may invoke the dynamic loader.
void HexStackTraceToString(char* buf, size_t size);

constexpr bool IsDebug() {
#ifdef NDEBUG
return false;
#else
return true;
#endif
}

class NODISCARD_CLASS ScopeLogger {
public:
ScopeLogger(const std::string& msg, std::function<void()> on_scope_bounds);
Expand Down
Loading

0 comments on commit dbf690f

Please sign in to comment.