From 6943ed0fb823a3a12627a8bc8c3e6ff13159eac1 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Fri, 26 Apr 2024 11:44:31 -0700 Subject: [PATCH 1/3] retention: conditionally enable retention for CO topic This a special configuration some select users can use to enable retention on CO topic because the compaction logic is ineffective and they would like to use retention as a stop gap until that is fixed. This configuration will be eprecated once we address the compaction gaps. Should be untouched for all practical purposes. (cherry picked from commit 53f93c66e05c0a939be11b4e8fe53f1ea5d3dccd) --- src/v/config/configuration.cc | 10 +++++++++- src/v/config/configuration.h | 3 +++ src/v/storage/disk_log_impl.cc | 8 +++++++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 4683b848d2a5..847e622d000c 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3138,7 +3138,15 @@ configuration::configuration() "Minimum number of active producers per virtual cluster", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, std::numeric_limits::max(), - {.min = 1}) {} + {.min = 1}) + , unsafe_enable_consumer_offsets_delete_retention( + *this, + "unsafe_enable_consumer_offsets_delete_retention", + "Enables delete retention of consumer offsets topic. This is an " + "internal-only configuration and should be enabled only after consulting " + "with Redpanda Support or engineers.", + {.needs_restart = needs_restart::yes, .visibility = visibility::user}, + false) {} configuration::error_map_t configuration::load(const YAML::Node& root_node) { if (!root_node["redpanda"]) { diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 159004eef8b6..5ef4d6017249 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -583,6 +583,9 @@ struct configuration final : public config_store { property enable_mpx_extensions; bounded_property virtual_cluster_min_producer_ids; + // temporary - to be deprecated + property unsafe_enable_consumer_offsets_delete_retention; + configuration(); error_map_t load(const YAML::Node& root_node); diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index e5cc5ca78d5d..b6aa1ef56fd1 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -90,6 +90,12 @@ namespace storage { * from normal cluster operation (leadershp movement) and this cost is not * driven / constrained by historical reads. Similarly for transactions and * idempotence. Controller topic should space can be managed by snapshots. + * + * Note on unsafe_enable_consumer_offsets_delete_retention: This a special + * configuration some select users can use to enable retention on CO topic + * because the compaction logic is ineffective and they would like to use + * retention as a stop gap until that is fixed. This configuration will be + * deprecated once we fix the compaction gaps. */ bool deletion_exempt(const model::ntp& ntp) { bool is_internal_namespace = ntp.ns() == model::redpanda_ns @@ -101,7 +107,7 @@ bool deletion_exempt(const model::ntp& ntp) { && ntp.tp.topic == model::kafka_consumer_offsets_nt.tp; return (!is_tx_manager_ntp && is_internal_namespace) - || is_consumer_offsets_ntp; + || (is_consumer_offsets_ntp && !config::shard_local_cfg().unsafe_enable_consumer_offsets_delete_retention()); } disk_log_impl::disk_log_impl( From be026b55bc6a8f2edca48f714482fe4fb4e98165 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 29 Apr 2024 09:38:23 -0700 Subject: [PATCH 2/3] gc/retention: test for consumer offsets retention. (cherry picked from commit 0e708054374a3d4f54177b4141a4b7da4cb79ec3) --- .../server/tests/consumer_groups_test.cc | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/src/v/kafka/server/tests/consumer_groups_test.cc b/src/v/kafka/server/tests/consumer_groups_test.cc index df6f42bf14fe..50afa0058104 100644 --- a/src/v/kafka/server/tests/consumer_groups_test.cc +++ b/src/v/kafka/server/tests/consumer_groups_test.cc @@ -14,6 +14,7 @@ #include "kafka/protocol/errors.h" #include "kafka/protocol/find_coordinator.h" #include "kafka/protocol/join_group.h" +#include "kafka/protocol/offset_commit.h" #include "kafka/protocol/schemata/join_group_request.h" #include "kafka/types.h" #include "model/fundamental.h" @@ -21,6 +22,7 @@ #include "model/timeout_clock.h" #include "redpanda/tests/fixture.h" #include "test_utils/async.h" +#include "test_utils/scoped_config.h" #include "utils/base64.h" #include @@ -101,6 +103,74 @@ FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) { }).get(); } +FIXTURE_TEST(conditional_retention_test, consumer_offsets_fixture) { + scoped_config cfg; + cfg.get("group_topic_partitions").set_value(1); + // setting to true to begin with, so log_eviction_stm is attached to + // the partition. + cfg.get("unsafe_enable_consumer_offsets_delete_retention").set_value(true); + add_topic( + model::topic_namespace_view{model::kafka_namespace, model::topic{"foo"}}) + .get(); + kafka::group_instance_id gr("instance-1"); + wait_for_consumer_offsets_topic(gr); + // load some data into the topic via offset_commit requests. + auto client = make_kafka_client().get0(); + auto deferred = ss::defer([&client] { + client.stop().then([&client] { client.shutdown(); }).get(); + }); + client.connect().get(); + auto offset = 0; + auto rand_offset_commit = [&] { + auto req_part = offset_commit_request_partition{ + .partition_index = model::partition_id{0}, + .committed_offset = model::offset{offset++}}; + auto topic = offset_commit_request_topic{ + .name = model::topic{"foo"}, .partitions = {std::move(req_part)}}; + + return offset_commit_request{.data{ + .group_id = kafka::group_id{fmt::format("foo-{}", offset)}, + .topics = {std::move(topic)}}}; + }; + for (int i = 0; i < 10; i++) { + auto req = rand_offset_commit(); + req.data.group_instance_id = gr; + auto resp = client.dispatch(std::move(req)).get(); + BOOST_REQUIRE(!resp.data.errored()); + } + auto part = app.partition_manager.local().get(model::ntp{ + model::kafka_namespace, + model::kafka_consumer_offsets_topic, + model::partition_id{0}}); + BOOST_REQUIRE(part); + auto log = part->log(); + storage::ntp_config::default_overrides ov; + ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion + | model::cleanup_policy_bitflags::compaction; + log->update_configuration(ov).get(); + log->flush().get(); + log->force_roll(ss::default_priority_class()).get(); + for (auto retention_enabled : {false, true}) { + // number of partitions of CO topic. + cfg.get("unsafe_enable_consumer_offsets_delete_retention") + .set_value(retention_enabled); + // attempt a GC on the partition log. + // evict the first segment. + storage::gc_config gc_cfg{model::timestamp::max(), 1}; + log->gc(gc_cfg).get(); + // Check if retention works + try { + tests::cooperative_spin_wait_with_timeout(5s, [&] { + return log.get()->offsets().start_offset > model::offset{0}; + }).get(); + } catch (const ss::timed_out_error& e) { + if (retention_enabled) { + std::rethrow_exception(std::make_exception_ptr(e)); + } + } + } +} + SEASTAR_THREAD_TEST_CASE(consumer_group_decode) { { // snatched from a log message after a franz-go client joined From f62e927ab781c40458eb2ebb90a34f525c6f3041 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Mon, 29 Apr 2024 23:18:59 -0700 Subject: [PATCH 3/3] tx/ducktape: add test for consumer offsets delete retention (cherry picked from commit af09e1141800a905fa8e1f6f9ff6cae698959081) --- tests/rptest/tests/transactions_test.py | 100 ++++++++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/tests/rptest/tests/transactions_test.py b/tests/rptest/tests/transactions_test.py index f9156d8df2c0..d780710047a3 100644 --- a/tests/rptest/tests/transactions_test.py +++ b/tests/rptest/tests/transactions_test.py @@ -857,6 +857,106 @@ def _produce_one(producer, idx): assert num_consumed == expected_records +class TransactionsStreamsTest(RedpandaTest, TransactionsMixin): + topics = (TopicSpec(partition_count=1, replication_factor=3), + TopicSpec(partition_count=1, replication_factor=3)) + + def __init__(self, test_context): + extra_rp_conf = { + 'unsafe_enable_consumer_offsets_delete_retention': True, + 'group_topic_partitions': 1, # to reduce log noise + 'log_segment_size_min': 99, + # to be able to make changes to CO + 'kafka_nodelete_topics': [], + 'kafka_noproduce_topics': [], + } + super(TransactionsStreamsTest, + self).__init__(test_context=test_context, + extra_rp_conf=extra_rp_conf) + self.input_t = self.topics[0] + self.output_t = self.topics[1] + + def setup_consumer_offsets(self, rpk: RpkTool): + # initialize consumer groups topic + rpk.consume(topic=self.input_t.name, n=1, group="test-group") + topic = "__consumer_offsets" + # Aggressive roll settings to clear multiple small segments + rpk.alter_topic_config(topic, TopicSpec.PROPERTY_CLEANUP_POLICY, + TopicSpec.CLEANUP_DELETE) + rpk.alter_topic_config(topic, TopicSpec.PROPERTY_SEGMENT_SIZE, 100) + + @cluster(num_nodes=3) + def consumer_offsets_retention_test(self): + """Ensure consumer offsets replays correctly after transactional offset commits""" + input_records = 10000 + self.generate_data(self.input_t, input_records) + rpk = RpkTool(self.redpanda) + self.setup_consumer_offsets(rpk) + # Populate consumer offsets with transactional offset commits/aborts + producer_conf = { + 'bootstrap.servers': self.redpanda.brokers(), + 'transactional.id': 'streams', + 'transaction.timeout.ms': 10000, + } + producer = ck.Producer(producer_conf) + consumer_conf = { + 'bootstrap.servers': self.redpanda.brokers(), + 'group.id': "test", + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': False, + } + consumer = ck.Consumer(consumer_conf) + consumer.subscribe([self.input_t]) + + producer.init_transactions() + consumed = 0 + while consumed != input_records: + records = self.consume(consumer) + producer.begin_transaction() + for record in records: + producer.produce(self.output_t.name, + record.value(), + record.key(), + on_delivery=self.on_delivery) + + producer.send_offsets_to_transaction( + consumer.position(consumer.assignment()), + consumer.consumer_group_metadata()) + + producer.flush() + + if random.randint(0, 9) < 5: + producer.commit_transaction() + else: + producer.abort_transaction() + consumed += len(records) + + admin = Admin(self.redpanda) + co_topic = "__consumer_offsets" + + def get_offsets(): + topic_info = list(rpk.describe_topic(co_topic))[0] + assert topic_info + return (topic_info.start_offset, topic_info.high_watermark) + + # trim prefix, change leadership and validate the log is replayed successfully on + # the new leader. + attempts = 30 + truncate_offset = 100 + while attempts > 0: + (start, end) = get_offsets() + self.redpanda.logger.debug(f"Current offsets: {start} - {end}") + if truncate_offset > end: + break + rpk.trim_prefix(co_topic, truncate_offset, partitions=[0]) + admin.partition_transfer_leadership("kafka", co_topic, partition=0) + admin.await_stable_leader(topic=co_topic, + replication=3, + timeout_s=30) + truncate_offset += 200 + attempts = attempts - 1 + + @contextmanager def expect_kafka_error(err: Optional[ck.KafkaError] = None): try: