Skip to content

Commit

Permalink
offline_log_viewer: support transactional offset commit parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed May 8, 2024
1 parent 60e6c3d commit c60b39e
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 105 deletions.
9 changes: 6 additions & 3 deletions tests/rptest/tests/consumer_group_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,19 @@ def test_basic_group_join(self, static_members):
node=node)
offsets = {}
groups = set()
for partition in consumer_offsets_partitions:
for r in partition['records']:
for partition, records in consumer_offsets_partitions.items():
self.logger.debug(
f"processing partition: {partition}, records: {len(records)}"
)
for r in records:
self.logger.info(f"{r}")
if r['key']['type'] == 'group_metadata':
groups.add(r['key']['group_id'])
elif r['key']['type'] == 'offset_commit':
tp = f"{r['key']['topic']}/{r['key']['partition']}"
if tp not in offsets:
offsets[tp] = -1
offsets[tp] = max(r['value']['committed_offset'],
offsets[tp] = max(r['val']['committed_offset'],
offsets[tp])

assert len(groups) == 1 and group in groups
Expand Down
6 changes: 6 additions & 0 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ducktape.utils.util import wait_until
from ducktape.errors import TimeoutError

from rptest.clients.offline_log_viewer import OfflineLogViewer
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.admin import Admin
from rptest.services.redpanda import RedpandaService, SecurityConfig, SaslCredentials
Expand Down Expand Up @@ -975,6 +976,11 @@ def consumer_offsets_retention_test(self):
producer.abort_transaction()
consumed += len(records)

log_viewer = OfflineLogViewer(self.redpanda)
for node in self.redpanda.started_nodes():
co_records = log_viewer.read_consumer_offsets(node=node)
self.logger.info(f"Read {len(co_records)} from node {node.name}")

admin = Admin(self.redpanda)
co_topic = "__consumer_offsets"

Expand Down
258 changes: 167 additions & 91 deletions tools/offline_log_viewer/consumer_offsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,76 +6,161 @@
import datetime


def decode_key_type(v):
if v == 0 or v == 1:
return "offset_commit"
elif v == 2:
return "group_metadata"

return "unknown"


def decode_member_proto(rdr):
ret = {}
ret['name'] = rdr.read_string()
ret['metadata'] = rdr.read_iobuf().hex()
return ret


def decode_member(rdr):
ret = {}
ret['v'] = rdr.read_int16()
ret['member_id'] = rdr.read_kafka_string()
ret['instance_id'] = rdr.read_kafka_optional_string()
ret['client_id'] = rdr.read_kafka_string()
ret['client_host'] = rdr.read_kafka_string()
ret['rebalance_timeout'] = rdr.read_int32()
ret['session_timeout'] = rdr.read_int32()
ret['subscription'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')
ret['assignment'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')

return ret


def decode_metadata(rdr):
ret = {}
ret['version'] = rdr.read_int16()
ret['protocol_type'] = rdr.read_kafka_string()
ret['generation_id'] = rdr.read_int32()
ret['protocol_name'] = rdr.read_kafka_optional_string()
ret['leader'] = rdr.read_kafka_optional_string()
ret['state_timestamp'] = rdr.read_int64()
ret['member_state'] = rdr.read_vector(decode_member)
return ret


def decode_key(key_rdr):
ret = {}
v = key_rdr.read_int16()
ret['type'] = decode_key_type(v)
ret['group_id'] = key_rdr.read_kafka_string()
if ret['type'] == 'offset_commit':
ret['topic'] = key_rdr.read_kafka_string()
ret['partition'] = key_rdr.read_int32()

return ret


def decode_offset_commit(v_rdr):
ret = {}
ret['version'] = v_rdr.read_int16()
ret['committed_offset'] = v_rdr.read_int64()
if ret['version'] >= 3:
ret['leader_epoch'] = v_rdr.read_int32()

ret['committed_metadata'] = v_rdr.read_kafka_string()
ret['commit_timestamp'] = v_rdr.read_int64()
if ret['version'] == 1:
ret['expiry_timestamp'] = v_rdr.read_int64()

return ret
class TxRecordParser:
def __init__(self, hdr, record) -> None:
self.r = record
self.hdr = hdr

def parse(self):
key = self.decode_key()
val = self.decode_value(key['type'])
val['producer_id'] = self.hdr.producer_id
val['producer_epoch'] = self.hdr.producer_epoch
return (key, val)

def decode_key(self):
key_rdr = Reader(BytesIO(self.r.key))
ret = {}
v = key_rdr.read_int8()
ret['type'] = self.decode_key_type(v)
ret['id'] = key_rdr.read_int64()
return ret

def decode_fence(self, rdr):
# Only supports latest fence batch
ret = {}
rdr.skip(1)
ret['group'] = rdr.read_string()
ret['tx_seq'] = rdr.read_int64()
ret['tx_timeout'] = rdr.read_int64()
ret['partition'] = rdr.read_int32()
return ret

def decode_commit(self, rdr):
ret = {}
rdr.skip(1)
ret['group'] = rdr.read_string()
return ret

def decode_abort(self, rdr):
ret = {}
rdr.skip(1)
ret['group'] = rdr.read_string()
ret['tx_seq'] = rdr.read_int64()
return ret

def decode_value(self, key_type):
if not self.r.value:
return 'tombstone'
val_rdr = Reader(BytesIO(self.r.value))
if key_type == 'tx_fence':
return self.decode_fence(val_rdr)
elif key_type == 'tx_commit':
return self.decode_commit(val_rdr)
return self.decode_abort(val_rdr)

def decode_key_type(self, v):
if v == 10:
return 'tx_fence'
elif v == 15:
return 'tx_commit'
elif v == 16:
return 'tx_abort'
return 'unknown'


class NonTxRecordParser:
def __init__(self, record) -> None:
self.r = record

def parse(self):
key = self.decode_key()
if key['type'] == "group_metadata":
if self.r.value:
v_rdr = Reader(BytesIO(self.r.value),
endianness=Endianness.BIG_ENDIAN)
val = self.decode_metadata(v_rdr)
else:
val = 'tombstone'
elif key['type'] == "offset_commit":
if self.r.value:
v_rdr = Reader(BytesIO(self.r.value),
endianness=Endianness.BIG_ENDIAN)
val = self.decode_offset_commit(v_rdr)
else:
val = 'tombstone'
return (key, val)

def decode_key_type(self, v):
if v == 0 or v == 1:
return "offset_commit"
elif v == 2:
return "group_metadata"

return "unknown"

def decode_member_proto(self, rdr):
ret = {}
ret['name'] = rdr.read_string()
ret['metadata'] = rdr.read_iobuf().hex()
return ret

def decode_member(self, rdr):
ret = {}
ret['v'] = rdr.read_int16()
ret['member_id'] = rdr.read_kafka_string()
ret['instance_id'] = rdr.read_kafka_optional_string()
ret['client_id'] = rdr.read_kafka_string()
ret['client_host'] = rdr.read_kafka_string()
ret['rebalance_timeout'] = rdr.read_int32()
ret['session_timeout'] = rdr.read_int32()
ret['subscription'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')
ret['assignment'] = base64.b64encode(
rdr.read_kafka_bytes()).decode('utf-8')

return ret

def decode_metadata(self, rdr):
ret = {}
ret['version'] = rdr.read_int16()
ret['protocol_type'] = rdr.read_kafka_string()
ret['generation_id'] = rdr.read_int32()
ret['protocol_name'] = rdr.read_kafka_optional_string()
ret['leader'] = rdr.read_kafka_optional_string()
ret['state_timestamp'] = rdr.read_int64()
ret['member_state'] = rdr.read_vector(self.decode_member)
return ret

def decode_key(self):
key_rdr = Reader(BytesIO(self.r.key), endianness=Endianness.BIG_ENDIAN)
ret = {}
v = key_rdr.read_int16()
ret['type'] = self.decode_key_type(v)
ret['group_id'] = key_rdr.read_kafka_string()
if ret['type'] == 'offset_commit':
ret['topic'] = key_rdr.read_kafka_string()
ret['partition'] = key_rdr.read_int32()

return ret

def decode_offset_commit(self, v_rdr):
ret = {}
ret['version'] = v_rdr.read_int16()
ret['committed_offset'] = v_rdr.read_int64()
if ret['version'] >= 3:
ret['leader_epoch'] = v_rdr.read_int32()

ret['committed_metadata'] = v_rdr.read_kafka_string()
ret['commit_timestamp'] = v_rdr.read_int64()
if ret['version'] == 1:
ret['expiry_timestamp'] = v_rdr.read_int64()

return ret


def is_transactional_type(hdr):
return hdr.type != 1


def decode_record(hdr, r):
Expand All @@ -84,40 +169,31 @@ def decode_record(hdr, r):
v['offset'] = hdr.base_offset + r.offset_delta
v['ts'] = datetime.datetime.utcfromtimestamp(
hdr.first_ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
k_rdr = Reader(BytesIO(r.key), endianness=Endianness.BIG_ENDIAN)

v['key'] = decode_key(k_rdr)

if v['key']['type'] == "group_metadata":
if r.value:
rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN)
v['value'] = decode_metadata(rdr)
else:
v['value'] = 'tombstone'
elif v['key']['type'] == "offset_commit":
if r.value:
rdr = Reader(BytesIO(r.value), endianness=Endianness.BIG_ENDIAN)
v['value'] = decode_offset_commit(rdr)
else:
v['value'] = 'tombstone'

if is_transactional_type(hdr):
v['key'], v['val'] = TxRecordParser(hdr, r).parse()
else:
v['key'], v['val'] = NonTxRecordParser(r).parse()
return v


class OffsetsLog:
def __init__(self, ntp):
self.ntp = ntp
self.records = []

def decode(self):
def __iter__(self):
paths = []
for path in self.ntp.segments:
paths.append(path)
paths.sort()
# 1 - raft_data - regular offset commits
# 10 - tx_fence - tx offset commits
# 15 - group_commit_tx
# 16 - group_abort_tx
accepted_batch_types = set([1, 10, 15, 16])
for path in paths:
s = Segment(path)
for b in s:
if b.header.type != 1:
if b.header.type not in accepted_batch_types:
continue
for r in b:
self.records.append(decode_record(b.header, r))
yield decode_record(b.header, r)
16 changes: 5 additions & 11 deletions tools/offline_log_viewer/viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,13 @@ def print_groups(store):


def print_consumer_offsets(store):
records = []
logs = dict()
for ntp in store.ntps:
if ntp.nspace == "kafka" and ntp.topic == "__consumer_offsets":
l = OffsetsLog(ntp)
l.decode()
records.append({
"partition_id": ntp.partition,
"records": l.records
})

# Send JSON output to stdout in case caller wants to parse it, other
# CLI output goes to stderr via logger
print(json.dumps(records, indent=2))
logs[str(ntp)] = SerializableGenerator(OffsetsLog(ntp))
json_records = json.JSONEncoder(indent=2).iterencode(logs)
for record in json_records:
print(record, end='')


def print_tx_coordinator(store):
Expand Down

0 comments on commit c60b39e

Please sign in to comment.