Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offline_log_viewer: support transactional offset commit parsing #17937

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,19 @@ def test_basic_group_join(self, static_members):
node=node)
offsets = {}
groups = set()
for partition in consumer_offsets_partitions:
for r in partition['records']:
for partition, records in consumer_offsets_partitions.items():
self.logger.debug(
f"processing partition: {partition}, records: {len(records)}"
)
for r in records:
self.logger.info(f"{r}")
if r['key']['type'] == 'group_metadata':
groups.add(r['key']['group_id'])
elif r['key']['type'] == 'offset_commit':
tp = f"{r['key']['topic']}/{r['key']['partition']}"
if tp not in offsets:
offsets[tp] = -1
offsets[tp] = max(r['value']['committed_offset'],
offsets[tp] = max(r['val']['committed_offset'],
offsets[tp])

assert len(groups) == 1 and group in groups
Expand Down
6 changes: 6 additions & 0 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError

from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.admin import Admin
from rptest.services.redpanda import RedpandaService, SecurityConfig, SaslCredentials
Expand Down Expand Up @@ -975,6 +976,11 @@ def consumer_offsets_retention_test(self):
producer.abort_transaction()
consumed += len(records)

log_viewer = OfflineLogViewer(self.redpanda)
for node in self.redpanda.started_nodes():
co_records = log_viewer.read_consumer_offsets(node=node)
self.logger.info(f"Read {len(co_records)} from node {node.name}")

admin = Admin(self.redpanda)
co_topic = "__consumer_offsets"

Expand Down
258 changes: 167 additions & 91 deletions tools/offline_log_viewer/consumer_offsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,76 +6,161 @@
import datetime


def decode_key_type(v):
if v == 0 or v == 1:
return "offset_commit"
elif v == 2:
return "group_metadata"

return "unknown"


def decode_member_proto(rdr):
ret = {}
ret['name'] = rdr.read_string()
ret['metadata'] = rdr.read_iobuf().hex()
return ret


def decode_member(rdr):
ret = {}
ret['v'] = rdr.read_int16()
ret['member_id'] = rdr.read_kafka_string()
ret['instance_id'] = rdr.read_kafka_optional_string()
ret['client_id'] = rdr.read_kafka_string()
ret['client_host'] = rdr.read_kafka_string()
ret['rebalance_timeout'] = rdr.read_int32()
ret['session_timeout'] = rdr.read_int32()
ret['subscription'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')
ret['assignment'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')

return ret


def decode_metadata(rdr):
ret = {}
ret['version'] = rdr.read_int16()
ret['protocol_type'] = rdr.read_kafka_string()
ret['generation_id'] = rdr.read_int32()
ret['protocol_name'] = rdr.read_kafka_optional_string()
ret['leader'] = rdr.read_kafka_optional_string()
ret['state_timestamp'] = rdr.read_int64()
ret['member_state'] = rdr.read_vector(decode_member)
return ret


def decode_key(key_rdr):
ret = {}
v = key_rdr.read_int16()
ret['type'] = decode_key_type(v)
ret['group_id'] = key_rdr.read_kafka_string()
if ret['type'] == 'offset_commit':
ret['topic'] = key_rdr.read_kafka_string()
ret['partition'] = key_rdr.read_int32()

return ret


def decode_offset_commit(v_rdr):
ret = {}
ret['version'] = v_rdr.read_int16()
ret['committed_offset'] = v_rdr.read_int64()
if ret['version'] >= 3:
ret['leader_epoch'] = v_rdr.read_int32()

ret['committed_metadata'] = v_rdr.read_kafka_string()
ret['commit_timestamp'] = v_rdr.read_int64()
if ret['version'] == 1:
ret['expiry_timestamp'] = v_rdr.read_int64()

return ret
class TxRecordParser:
def __init__(self, hdr, record) -> None:
self.r = record
self.hdr = hdr

def parse(self):
key = self.decode_key()
val = self.decode_value(key['type'])
val['producer_id'] = self.hdr.producer_id
val['producer_epoch'] = self.hdr.producer_epoch
return (key, val)

def decode_key(self):
key_rdr = Reader(BytesIO(self.r.key))
ret = {}
v = key_rdr.read_int8()
ret['type'] = self.decode_key_type(v)
ret['id'] = key_rdr.read_int64()
return ret

def decode_fence(self, rdr):
# Only supports latest fence batch
ret = {}
rdr.skip(1)
ret['group'] = rdr.read_string()
ret['tx_seq'] = rdr.read_int64()
ret['tx_timeout'] = rdr.read_int64()
ret['partition'] = rdr.read_int32()
return ret

def decode_commit(self, rdr):
ret = {}
rdr.skip(1)
ret['group'] = rdr.read_string()
return ret

def decode_abort(self, rdr):
ret = {}
rdr.skip(1)
ret['group'] = rdr.read_string()
ret['tx_seq'] = rdr.read_int64()
return ret

def decode_value(self, key_type):
if not self.r.value:
return 'tombstone'
val_rdr = Reader(BytesIO(self.r.value))
if key_type == 'tx_fence':
return self.decode_fence(val_rdr)
elif key_type == 'tx_commit':
return self.decode_commit(val_rdr)
return self.decode_abort(val_rdr)

def decode_key_type(self, v):
if v == 10:
return 'tx_fence'
elif v == 15:
return 'tx_commit'
elif v == 16:
return 'tx_abort'
return 'unknown'


class NonTxRecordParser:
def __init__(self, record) -> None:
self.r = record

def parse(self):
key = self.decode_key()
if key['type'] == "group_metadata":
if self.r.value:
v_rdr = Reader(BytesIO(self.r.value),
endianness=Endianness.BIG_ENDIAN)
val = self.decode_metadata(v_rdr)
else:
val = 'tombstone'
elif key['type'] == "offset_commit":
if self.r.value:
v_rdr = Reader(BytesIO(self.r.value),
endianness=Endianness.BIG_ENDIAN)
val = self.decode_offset_commit(v_rdr)
else:
val = 'tombstone'
return (key, val)

def decode_key_type(self, v):
if v == 0 or v == 1:
return "offset_commit"
elif v == 2:
return "group_metadata"

return "unknown"

def decode_member_proto(self, rdr):
ret = {}
ret['name'] = rdr.read_string()
ret['metadata'] = rdr.read_iobuf().hex()
return ret

def decode_member(self, rdr):
ret = {}
ret['v'] = rdr.read_int16()
ret['member_id'] = rdr.read_kafka_string()
ret['instance_id'] = rdr.read_kafka_optional_string()
ret['client_id'] = rdr.read_kafka_string()
ret['client_host'] = rdr.read_kafka_string()
ret['rebalance_timeout'] = rdr.read_int32()
ret['session_timeout'] = rdr.read_int32()
ret['subscription'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')
ret['assignment'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')

return ret

def decode_metadata(self, rdr):
ret = {}
ret['version'] = rdr.read_int16()
ret['protocol_type'] = rdr.read_kafka_string()
ret['generation_id'] = rdr.read_int32()
ret['protocol_name'] = rdr.read_kafka_optional_string()
ret['leader'] = rdr.read_kafka_optional_string()
ret['state_timestamp'] = rdr.read_int64()
ret['member_state'] = rdr.read_vector(self.decode_member)
return ret

def decode_key(self):
key_rdr = Reader(BytesIO(self.r.key), endianness=Endianness.BIG_ENDIAN)
ret = {}
v = key_rdr.read_int16()
ret['type'] = self.decode_key_type(v)
ret['group_id'] = key_rdr.read_kafka_string()
if ret['type'] == 'offset_commit':
ret['topic'] = key_rdr.read_kafka_string()
ret['partition'] = key_rdr.read_int32()

return ret

def decode_offset_commit(self, v_rdr):
ret = {}
ret['version'] = v_rdr.read_int16()
ret['committed_offset'] = v_rdr.read_int64()
if ret['version'] >= 3:
ret['leader_epoch'] = v_rdr.read_int32()

ret['committed_metadata'] = v_rdr.read_kafka_string()
ret['commit_timestamp'] = v_rdr.read_int64()
if ret['version'] == 1:
ret['expiry_timestamp'] = v_rdr.read_int64()

return ret


def is_transactional_type(hdr):
return hdr.type != 1


def decode_record(hdr, r):
Expand All @@ -84,40 +169,31 @@ def decode_record(hdr, r):
v['offset'] = hdr.base_offset + r.offset_delta
v['ts'] = datetime.datetime.utcfromtimestamp(
hdr.first_ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
k_rdr = Reader(BytesIO(r.key), endianness=Endianness.BIG_ENDIAN)

v['key'] = decode_key(k_rdr)

if v['key']['type'] == "group_metadata":
if r.value:
rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN)
v['value'] = decode_metadata(rdr)
else:
v['value'] = 'tombstone'
elif v['key']['type'] == "offset_commit":
if r.value:
rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN)
v['value'] = decode_offset_commit(rdr)
else:
v['value'] = 'tombstone'

if is_transactional_type(hdr):
v['key'], v['val'] = TxRecordParser(hdr, r).parse()
else:
v['key'], v['val'] = NonTxRecordParser(r).parse()
return v


class OffsetsLog:
def __init__(self, ntp):
self.ntp = ntp
self.records = []

def decode(self):
def __iter__(self):
paths = []
for path in self.ntp.segments:
paths.append(path)
paths.sort()
# 1 - raft_data - regular offset commits
# 10 - tx_fence - tx offset commits
# 15 - group_commit_tx
# 16 - group_abort_tx
accepted_batch_types = set([1, 10, 15, 16])
for path in paths:
s = Segment(path)
for b in s:
if b.header.type != 1:
if b.header.type not in accepted_batch_types:
continue
for r in b:
self.records.append(decode_record(b.header, r))
yield decode_record(b.header, r)
16 changes: 5 additions & 11 deletions tools/offline_log_viewer/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,13 @@ def print_groups(store):


def print_consumer_offsets(store):
records = []
logs = dict()
for ntp in store.ntps:
if ntp.nspace == "kafka" and ntp.topic == "__consumer_offsets":
l = OffsetsLog(ntp)
l.decode()
records.append({
"partition_id": ntp.partition,
"records": l.records
})

# Send JSON output to stdout in case caller wants to parse it, other
# CLI output goes to stderr via logger
print(json.dumps(records, indent=2))
logs[str(ntp)] = SerializableGenerator(OffsetsLog(ntp))
json_records = json.JSONEncoder(indent=2).iterencode(logs)
for record in json_records:
print(record, end='')


def print_tx_coordinator(store):
Expand Down
Loading