From 349bda8008940fd6a1c82ad077c9c1b5b00b949e Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 11 Nov 2024 08:28:53 -0800 Subject: [PATCH] kafka/txn_reader: proper cleanup in finally call finally on the underlyig reader for a proper cleanup, else bad things can happen on log reader's closure. --- src/v/kafka/utils/tests/txn_reader_test.cc | 55 ++++++++++++++++++++++ src/v/kafka/utils/txn_reader.cc | 4 ++ src/v/kafka/utils/txn_reader.h | 2 + 3 files changed, 61 insertions(+) diff --git a/src/v/kafka/utils/tests/txn_reader_test.cc b/src/v/kafka/utils/tests/txn_reader_test.cc index f70baf87cc80..f736e96bcb95 100644 --- a/src/v/kafka/utils/tests/txn_reader_test.cc +++ b/src/v/kafka/utils/tests/txn_reader_test.cc @@ -20,6 +20,7 @@ #include "model/timeout_clock.h" #include "random/generators.h" #include "storage/record_batch_builder.h" +#include "test_utils/test.h" #include #include @@ -367,6 +368,60 @@ TEST_F(TransactionReaderTest, OutsideRange) { } } +TEST_F_CORO(seastar_test, UncleanDestroy) { + // A simple reader that ensures resources are cleaned up before + // destroying + class asserting_reader final : public model::record_batch_reader::impl { + public: + asserting_reader() + : _resources_initalized(true) {} + + ~asserting_reader() { + vassert( + !_resources_initalized, + "Closing reader while resources are still initialized"); + } + + bool is_end_of_stream() const final { return false; }; + + ss::future + do_load_slice(model::timeout_clock::time_point) final { + auto result = model::record_batch_reader::data_t{}; + result.push_back( + model::test::make_random_batch(model::test::record_batch_spec{})); + return ss::make_ready_future( + std::move(result)); + } + + void print(std::ostream&) final {} + + ss::future<> finally() noexcept final { + _resources_initalized = false; + return ss::now(); + } + + private: + bool _resources_initalized = false; + }; + + struct throwing_consumer { + ss::future operator()(model::record_batch) { + return ss::make_exception_future( + std::runtime_error("Injected failure")); + } + void end_of_stream() {} + }; + + auto reader = model::make_record_batch_reader( + nullptr, + model::record_batch_reader(std::make_unique())); + using namespace std::chrono_literals; + EXPECT_THROW( + co_await std::move(reader).consume( + throwing_consumer(), model::timeout_clock::now() + 5s), + std::runtime_error); +} + class RandomizedTransactionReaderTest : public TransactionReaderTest , public testing::WithParamInterface {}; diff --git a/src/v/kafka/utils/txn_reader.cc b/src/v/kafka/utils/txn_reader.cc index 4864bb229d49..6d80c0b8a9e0 100644 --- a/src/v/kafka/utils/txn_reader.cc +++ b/src/v/kafka/utils/txn_reader.cc @@ -195,6 +195,10 @@ bool read_committed_reader::is_end_of_stream() const { return _underlying->is_end_of_stream(); } +ss::future<> read_committed_reader::finally() noexcept { + return _underlying->finally(); +} + ss::future read_committed_reader::do_load_slice( model::timeout_clock::time_point deadline) { diff --git a/src/v/kafka/utils/txn_reader.h b/src/v/kafka/utils/txn_reader.h index e73ecff7d1e3..51925ae656ce 100644 --- a/src/v/kafka/utils/txn_reader.h +++ b/src/v/kafka/utils/txn_reader.h @@ -73,6 +73,8 @@ class read_committed_reader : public model::record_batch_reader::impl { void print(std::ostream& os) override; + ss::future<> finally() noexcept final; + private: std::unique_ptr _tracker; std::unique_ptr _underlying;