Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] configuration to enable delete retention for consumer offsets #18194

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3138,7 +3138,15 @@ configuration::configuration()
"Minimum number of active producers per virtual cluster",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
std::numeric_limits<uint64_t>::max(),
{.min = 1}) {}
{.min = 1})
, unsafe_enable_consumer_offsets_delete_retention(
*this,
"unsafe_enable_consumer_offsets_delete_retention",
"Enables delete retention of consumer offsets topic. This is an "
"internal-only configuration and should be enabled only after consulting "
"with Redpanda Support or engineers.",
{.needs_restart = needs_restart::yes, .visibility = visibility::user},
false) {}

configuration::error_map_t configuration::load(const YAML::Node& root_node) {
if (!root_node["redpanda"]) {
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ struct configuration final : public config_store {
property<bool> enable_mpx_extensions;
bounded_property<uint64_t> virtual_cluster_min_producer_ids;

// temporary - to be deprecated
property<bool> unsafe_enable_consumer_offsets_delete_retention;

configuration();

error_map_t load(const YAML::Node& root_node);
Expand Down
70 changes: 70 additions & 0 deletions src/v/kafka/server/tests/consumer_groups_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
#include "kafka/protocol/errors.h"
#include "kafka/protocol/find_coordinator.h"
#include "kafka/protocol/join_group.h"
#include "kafka/protocol/offset_commit.h"
#include "kafka/protocol/schemata/join_group_request.h"
#include "kafka/types.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "model/timeout_clock.h"
#include "redpanda/tests/fixture.h"
#include "test_utils/async.h"
#include "test_utils/scoped_config.h"
#include "utils/base64.h"

#include <seastar/core/smp.hh>
Expand Down Expand Up @@ -101,6 +103,74 @@ FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) {
}).get();
}

FIXTURE_TEST(conditional_retention_test, consumer_offsets_fixture) {
scoped_config cfg;
cfg.get("group_topic_partitions").set_value(1);
// setting to true to begin with, so log_eviction_stm is attached to
// the partition.
cfg.get("unsafe_enable_consumer_offsets_delete_retention").set_value(true);
add_topic(
model::topic_namespace_view{model::kafka_namespace, model::topic{"foo"}})
.get();
kafka::group_instance_id gr("instance-1");
wait_for_consumer_offsets_topic(gr);
// load some data into the topic via offset_commit requests.
auto client = make_kafka_client().get0();
auto deferred = ss::defer([&client] {
client.stop().then([&client] { client.shutdown(); }).get();
});
client.connect().get();
auto offset = 0;
auto rand_offset_commit = [&] {
auto req_part = offset_commit_request_partition{
.partition_index = model::partition_id{0},
.committed_offset = model::offset{offset++}};
auto topic = offset_commit_request_topic{
.name = model::topic{"foo"}, .partitions = {std::move(req_part)}};

return offset_commit_request{.data{
.group_id = kafka::group_id{fmt::format("foo-{}", offset)},
.topics = {std::move(topic)}}};
};
for (int i = 0; i < 10; i++) {
auto req = rand_offset_commit();
req.data.group_instance_id = gr;
auto resp = client.dispatch(std::move(req)).get();
BOOST_REQUIRE(!resp.data.errored());
}
auto part = app.partition_manager.local().get(model::ntp{
model::kafka_namespace,
model::kafka_consumer_offsets_topic,
model::partition_id{0}});
BOOST_REQUIRE(part);
auto log = part->log();
storage::ntp_config::default_overrides ov;
ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion
| model::cleanup_policy_bitflags::compaction;
log->update_configuration(ov).get();
log->flush().get();
log->force_roll(ss::default_priority_class()).get();
for (auto retention_enabled : {false, true}) {
// number of partitions of CO topic.
cfg.get("unsafe_enable_consumer_offsets_delete_retention")
.set_value(retention_enabled);
// attempt a GC on the partition log.
// evict the first segment.
storage::gc_config gc_cfg{model::timestamp::max(), 1};
log->gc(gc_cfg).get();
// Check if retention works
try {
tests::cooperative_spin_wait_with_timeout(5s, [&] {
return log.get()->offsets().start_offset > model::offset{0};
}).get();
} catch (const ss::timed_out_error& e) {
if (retention_enabled) {
std::rethrow_exception(std::make_exception_ptr(e));
}
}
}
}

SEASTAR_THREAD_TEST_CASE(consumer_group_decode) {
{
// snatched from a log message after a franz-go client joined
Expand Down
8 changes: 7 additions & 1 deletion src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ namespace storage {
* from normal cluster operation (leadershp movement) and this cost is not
* driven / constrained by historical reads. Similarly for transactions and
* idempotence. Controller topic should space can be managed by snapshots.
*
* Note on unsafe_enable_consumer_offsets_delete_retention: This a special
* configuration some select users can use to enable retention on CO topic
* because the compaction logic is ineffective and they would like to use
* retention as a stop gap until that is fixed. This configuration will be
* deprecated once we fix the compaction gaps.
*/
bool deletion_exempt(const model::ntp& ntp) {
bool is_internal_namespace = ntp.ns() == model::redpanda_ns
Expand All @@ -101,7 +107,7 @@ bool deletion_exempt(const model::ntp& ntp) {
&& ntp.tp.topic
== model::kafka_consumer_offsets_nt.tp;
return (!is_tx_manager_ntp && is_internal_namespace)
|| is_consumer_offsets_ntp;
|| (is_consumer_offsets_ntp && !config::shard_local_cfg().unsafe_enable_consumer_offsets_delete_retention());
}

disk_log_impl::disk_log_impl(
Expand Down
100 changes: 100 additions & 0 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,106 @@ def _produce_one(producer, idx):
assert num_consumed == expected_records


class TransactionsStreamsTest(RedpandaTest, TransactionsMixin):
topics = (TopicSpec(partition_count=1, replication_factor=3),
TopicSpec(partition_count=1, replication_factor=3))

def __init__(self, test_context):
extra_rp_conf = {
'unsafe_enable_consumer_offsets_delete_retention': True,
'group_topic_partitions': 1, # to reduce log noise
'log_segment_size_min': 99,
# to be able to make changes to CO
'kafka_nodelete_topics': [],
'kafka_noproduce_topics': [],
}
super(TransactionsStreamsTest,
self).__init__(test_context=test_context,
extra_rp_conf=extra_rp_conf)
self.input_t = self.topics[0]
self.output_t = self.topics[1]

def setup_consumer_offsets(self, rpk: RpkTool):
# initialize consumer groups topic
rpk.consume(topic=self.input_t.name, n=1, group="test-group")
topic = "__consumer_offsets"
# Aggressive roll settings to clear multiple small segments
rpk.alter_topic_config(topic, TopicSpec.PROPERTY_CLEANUP_POLICY,
TopicSpec.CLEANUP_DELETE)
rpk.alter_topic_config(topic, TopicSpec.PROPERTY_SEGMENT_SIZE, 100)

@cluster(num_nodes=3)
def consumer_offsets_retention_test(self):
"""Ensure consumer offsets replays correctly after transactional offset commits"""
input_records = 10000
self.generate_data(self.input_t, input_records)
rpk = RpkTool(self.redpanda)
self.setup_consumer_offsets(rpk)
# Populate consumer offsets with transactional offset commits/aborts
producer_conf = {
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': 'streams',
'transaction.timeout.ms': 10000,
}
producer = ck.Producer(producer_conf)
consumer_conf = {
'bootstrap.servers': self.redpanda.brokers(),
'group.id': "test",
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer = ck.Consumer(consumer_conf)
consumer.subscribe([self.input_t])

producer.init_transactions()
consumed = 0
while consumed != input_records:
records = self.consume(consumer)
producer.begin_transaction()
for record in records:
producer.produce(self.output_t.name,
record.value(),
record.key(),
on_delivery=self.on_delivery)

producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.consumer_group_metadata())

producer.flush()

if random.randint(0, 9) < 5:
producer.commit_transaction()
else:
producer.abort_transaction()
consumed += len(records)

admin = Admin(self.redpanda)
co_topic = "__consumer_offsets"

def get_offsets():
topic_info = list(rpk.describe_topic(co_topic))[0]
assert topic_info
return (topic_info.start_offset, topic_info.high_watermark)

# trim prefix, change leadership and validate the log is replayed successfully on
# the new leader.
attempts = 30
truncate_offset = 100
while attempts > 0:
(start, end) = get_offsets()
self.redpanda.logger.debug(f"Current offsets: {start} - {end}")
if truncate_offset > end:
break
rpk.trim_prefix(co_topic, truncate_offset, partitions=[0])
admin.partition_transfer_leadership("kafka", co_topic, partition=0)
admin.await_stable_leader(topic=co_topic,
replication=3,
timeout_s=30)
truncate_offset += 200
attempts = attempts - 1


@contextmanager
def expect_kafka_error(err: Optional[ck.KafkaError] = None):
try:
Expand Down
Loading