Skip to content

Commit

Permalink
feat(controller.py): for serde added decode_feature_command (feature_…
Browse files Browse the repository at this point in the history
…update_command) and decode_node_management_command
  • Loading branch information
andijcr committed Aug 31, 2022
1 parent da3897d commit 885238d
Showing 1 changed file with 76 additions and 13 deletions.
89 changes: 76 additions & 13 deletions tools/offline_log_viewer/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,32 +496,91 @@ def decode_config_command_adl(k_rdr: Reader, rdr: Reader):
return cmd


def decode_feature_command(record):
def decode_action_t(v):
if 1 <= v <= 3:
return ['', 'complete_preparing', 'activate', 'deactivate'][v]
return v


def decode_feature_command_serde(k_rdr: Reader, rdr: Reader):
cmd = {'type': rdr.read_int8()}
if cmd['type'] == 0:
cmd['type_name'] = 'feature_update'
cmd |= k_rdr.read_envelope(
lambda k_rdr, _: {
'cluster_version':
k_rdr.read_int64(),
'actions':
k_rdr.read_serde_vector(lambda k_rdr: k_rdr.read_envelope(
lambda k_rdr, _: {
'feature_name': k_rdr.read_string(),
'action': decode_action_t(k_rdr.read_serde_enum()),
}))
})
elif cmd['type'] == 1:
cmd['type_name'] = 'license_update'
cmd |= k_rdr.read_envelope(
lambda k_rdr, _: {
'redpanda_license':
k_rdr.read_envelope(
lambda k_rdr, _: {
'format_version': k_rdr.read_uint8(),
'type': k_rdr.read_serde_enum(),
'organization': k_rdr.read_string(),
'expiry': k_rdr.read_int64(),
})
})
return cmd


def decode_feature_command_adl(k_rdr: Reader, rdr: Reader):
def decode_feature_update_action(r):
action = {}
action['v'] = r.read_int8()
action['feature_name'] = r.read_string()
action['action'] = r.read_int16()
action['action'] = decode_action_t(r.read_int16())
return action

rdr = Reader(BytesIO(record.value))
k_rdr = Reader(BytesIO(record.key))
cmd = {}
cmd['type'] = rdr.read_int8()
if cmd['type'] == 0:
cmd['type_name'] = 'feature_update'
cmd['v'] = k_rdr.read_int8()
cmd['cluster_version'] = k_rdr.read_int64()
cmd['actions'] = k_rdr.read_vector(decode_feature_update_action)
return cmd


def decode_node_management_command(record):
rdr = Reader(BufferedReader(BytesIO(record.value)))
k_rdr = Reader(BytesIO(record.key))
either_adl_or_serde = rdr.peek_int8()
assert either_adl_or_serde >= -1, "unsupported serialization format"
if either_adl_or_serde == -1:
# serde encoding flag, consume it and proceed
rdr.skip(1)
cmd = {'type': rdr.read_int8()}
if cmd['type'] == 0:
cmd |= {
'type_string': 'decommission_node',
'node_id': k_rdr.read_int32()
}
elif cmd['type'] == 1:
cmd['type_name'] = 'license_update'
k_rdr.read_envelope()
cmd['format_v'] = k_rdr.read_uint8()
cmd['license_type'] = k_rdr.read_uint8()
cmd['org'] = k_rdr.read_string()
cmd['expiry'] = k_rdr.read_string()
else:
cmd['type_name'] = 'unknown'
cmd |= {
'type_string': 'recommission_node',
'node_id': k_rdr.read_int32()
}
elif cmd['type'] == 2:
cmd |= {
'type_string': 'finish_reallocations',
'node_id': k_rdr.read_int32()
}
elif cmd['type'] == 3:
cmd |= {
'type_string': 'maintenance_mode',
'node_id': k_rdr.read_int32(),
'enabled': rdr.read_bool()
}
return cmd


Expand Down Expand Up @@ -563,7 +622,11 @@ def decode_record(batch, record):
ret['data'] = decode_adl_or_serde(record, decode_config_command_adl,
decode_config_command_serde)
if batch.type == BatchType.feature_update:
ret['data'] = decode_feature_command(record)
ret['data'] = decode_adl_or_serde(record, decode_feature_command_adl,
decode_feature_command_serde)
if batch.type == BatchType.node_management_cmd:
ret['data'] = decode_node_management_command(record)

return ret


Expand Down

0 comments on commit 885238d

Please sign in to comment.