Skip to content

Commit

Permalink
Merge pull request redpanda-data#17397 from vbotbuildovich/backport-p…
Browse files Browse the repository at this point in the history
…r-17260-v23.2.x-653

[v23.2.x] k/group: recover leader epoch on leader change
  • Loading branch information
piyushredpanda authored Mar 26, 2024
2 parents f1216cd + 232c128 commit 8b8d9ac
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ ss::future<> group_manager::do_recover_group(
.log_offset = meta.log_offset,
.offset = meta.metadata.offset,
.metadata = meta.metadata.metadata,
.committed_leader_epoch = meta.metadata.leader_epoch,
.commit_timestamp = meta.metadata.commit_timestamp,
.expiry_timestamp = expiry_timestamp,
.non_reclaimable = meta.metadata.non_reclaimable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,12 @@ private boolean isFinished() {
if (partitionRecords.isEmpty()) continue;

long minOffset = partitionRecords.get(0).offset();
long maxOffset
= partitionRecords.get(partitionRecords.size() - 1).offset();
var maxRecord = partitionRecords.get(partitionRecords.size() - 1);
long maxOffset = maxRecord.offset();

offsets.put(tp, new OffsetAndMetadata(maxOffset + 1));
offsets.put(
tp,
new OffsetAndMetadata(maxOffset + 1, maxRecord.leaderEpoch(), ""));
summaries.add(new RecordSetSummary(
tp.topic(), tp.partition(), partitionRecords.size(), minOffset,
maxOffset));
Expand Down
141 changes: 137 additions & 4 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,28 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from dataclasses import dataclass
from typing import Dict, List

from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.services.cluster import cluster

from rptest.clients.rpk import RpkException, RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.kafka_cli_consumer import KafkaCliConsumer
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST, RedpandaService
from rptest.services.rpk_producer import RpkProducer
from rptest.services.verifiable_consumer import VerifiableConsumer
from rptest.tests.redpanda_test import RedpandaTest
from rptest.utils.mode_checks import skip_debug_mode

from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from kafka import KafkaConsumer

from rptest.utils.mode_checks import skip_debug_mode
from kafka import KafkaConsumer, TopicPartition
from kafka.admin import KafkaAdminClient
from kafka.protocol.commit import OffsetFetchRequest_v3
from kafka.protocol.api import Request, Response
import kafka.protocol.types as types


class ConsumerGroupTest(RedpandaTest):
Expand Down Expand Up @@ -362,6 +370,62 @@ def test_consumer_is_removed_when_timedout(self, static_members):
c.wait()
c.free()

@cluster(num_nodes=4, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_group_recovery(self):
"""
Test validating that group state is recovered after broker restart.
"""
self.create_topic(1)

# Produce some messages.
self.start_producer(msg_cnt=1000)
self.producer.wait()
self.producer.free()

group_id = 'test-gr-1'

# Consume all messages and commit offsets.
self.consumer = VerifiableConsumer(self.test_context,
num_nodes=1,
redpanda=self.redpanda,
topic=self.topic_spec.name,
group_id=group_id,
max_messages=1000)
self.consumer.start()
self.consumer.wait()

test_admin = KafkaTestAdminClient(self.redpanda)
offsets = test_admin.list_offsets(
group_id, [TopicPartition(self.topic_spec.name, 0)])

# Test that the consumer committed what we expected.
self.logger.info(f"Got offsets: {offsets}")
assert len(offsets) == 1
assert offsets[TopicPartition(self.topic_spec.name, 0)].offset == 1000
assert offsets[TopicPartition(self.topic_spec.name,
0)].leader_epoch > 0

# Restart the broker.
self.logger.info("Restarting redpanda nodes.")
self.redpanda.restart_nodes(self.redpanda.nodes)
self.redpanda._admin.await_stable_leader("controller",
partition=0,
namespace='redpanda',
timeout_s=60,
backoff_s=2)

prev_offsets = offsets

# Validate that the group state is recovered.
test_admin = KafkaTestAdminClient(self.redpanda)
offsets = test_admin.list_offsets(
group_id, [TopicPartition(self.topic_spec.name, 0)])

self.logger.info(f"Got offsets after restart: {offsets}")
assert len(offsets) == 1
assert offsets == prev_offsets, \
f"Expected {prev_offsets}, got {offsets}."

@cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST)
@parametrize(static_members=True)
@parametrize(static_members=False)
Expand Down Expand Up @@ -490,3 +554,72 @@ async def create_groups(r):
list = rpk.group_list()

assert len(list) == groups_in_round * rounds


@dataclass
class OffsetAndMetadata():
offset: int
leader_epoch: int
metadata: str


class KafkaTestAdminClient():
"""
A wrapper around KafkaAdminClient with support for newer Kafka versions.
At the time of writing, KafkaAdminClient doesn't support KIP-320
(leader epoch) for consumer groups.
"""
def __init__(self, redpanda: RedpandaService):
self._bootstrap_servers = redpanda.brokers()
self._admin = KafkaAdminClient(
bootstrap_servers=self._bootstrap_servers)

def list_offsets(
self, group_id: str, partitions: List[TopicPartition]
) -> Dict[TopicPartition, OffsetAndMetadata]:
coordinator = self._admin._find_coordinator_ids([group_id])[group_id]
future = self._list_offsets_send_request(group_id, coordinator,
partitions)
self._admin._wait_for_futures([future])
response = future.value
return self._list_offsets_send_process_response(response)

def _list_offsets_send_request(self, group_id: str, coordinator: int,
partitions: List[TopicPartition]):
request = OffsetFetchRequest_v5(consumer_group=group_id,
topics=[(p.topic, [p.partition])
for p in partitions])
return self._admin._send_request_to_node(coordinator, request)

def _list_offsets_send_process_response(self, response):
offsets = {}
for topic, partitions in response.topics:
for partition, offset, leader_epoch, metadata, error_code in partitions:
if error_code != 0:
raise Exception(f"Error code: {error_code}")
offsets[(topic, partition)] = OffsetAndMetadata(
offset, leader_epoch, metadata)
return offsets


class OffsetFetchResponse_v5(Response):
API_KEY = 9
API_VERSION = 5
SCHEMA = types.Schema(
('throttle_time_ms', types.Int32),
('topics',
types.Array(
('topic', types.String('utf-8')),
('partitions',
types.Array(('partition', types.Int32), ('offset', types.Int64),
('leader_epoch', types.Int32),
('metadata', types.String('utf-8')),
('error_code', types.Int16))))),
('error_code', types.Int16))


class OffsetFetchRequest_v5(Request):
API_KEY = 9
API_VERSION = 5
RESPONSE_TYPE = OffsetFetchResponse_v5
SCHEMA = OffsetFetchRequest_v3.SCHEMA

0 comments on commit 8b8d9ac

Please sign in to comment.