Skip to content

Commit

Permalink
kafka/txn_reader: proper cleanup in finally
Browse files Browse the repository at this point in the history
call finally on the underlyig reader for a proper cleanup, else bad
things can happen on log reader's closure.

(cherry picked from commit 349bda8)
  • Loading branch information
bharathv committed Nov 26, 2024
1 parent 8e3f8f2 commit c9e35e3
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 0 deletions.
55 changes: 55 additions & 0 deletions src/v/transform/tests/txn_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "model/timeout_clock.h"
#include "random/generators.h"
#include "storage/record_batch_builder.h"
#include "test_utils/test.h"
#include "transform/txn_reader.h"

#include <seastar/core/chunked_fifo.hh>
Expand Down Expand Up @@ -367,6 +368,60 @@ TEST_F(TransactionReaderTest, OutsideRange) {
}
}

TEST_F_CORO(seastar_test, UncleanDestroy) {
// A simple reader that ensures resources are cleaned up before
// destroying
class asserting_reader final : public model::record_batch_reader::impl {
public:
asserting_reader()
: _resources_initalized(true) {}

~asserting_reader() {
vassert(
!_resources_initalized,
"Closing reader while resources are still initialized");
}

bool is_end_of_stream() const final { return false; };

ss::future<model::record_batch_reader::storage_t>
do_load_slice(model::timeout_clock::time_point) final {
auto result = model::record_batch_reader::data_t{};
result.push_back(
model::test::make_random_batch(model::test::record_batch_spec{}));
return ss::make_ready_future<model::record_batch_reader::storage_t>(
std::move(result));
}

void print(std::ostream&) final {}

ss::future<> finally() noexcept final {
_resources_initalized = false;
return ss::now();
}

private:
bool _resources_initalized = false;
};

struct throwing_consumer {
ss::future<ss::stop_iteration> operator()(model::record_batch) {
return ss::make_exception_future<ss::stop_iteration>(
std::runtime_error("Injected failure"));
}
void end_of_stream() {}
};

auto reader = model::make_record_batch_reader<read_committed_reader>(
nullptr,
model::record_batch_reader(std::make_unique<asserting_reader>()));
using namespace std::chrono_literals;
EXPECT_THROW(
co_await std::move(reader).consume(
throwing_consumer(), model::timeout_clock::now() + 5s),
std::runtime_error);
}

class RandomizedTransactionReaderTest
: public TransactionReaderTest
, public testing::WithParamInterface<test_case> {};
Expand Down
4 changes: 4 additions & 0 deletions src/v/transform/txn_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ bool read_committed_reader::is_end_of_stream() const {
return _underlying->is_end_of_stream();
}

ss::future<> read_committed_reader::finally() noexcept {
return _underlying->finally();
}

ss::future<model::record_batch_reader::storage_t>
read_committed_reader::do_load_slice(
model::timeout_clock::time_point deadline) {
Expand Down
2 changes: 2 additions & 0 deletions src/v/transform/txn_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class read_committed_reader : public model::record_batch_reader::impl {

void print(std::ostream& os) override;

ss::future<> finally() noexcept final;

private:
std::unique_ptr<aborted_transaction_tracker> _tracker;
std::unique_ptr<model::record_batch_reader::impl> _underlying;
Expand Down

0 comments on commit c9e35e3

Please sign in to comment.