Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: hi-rustin <rustin.liu@gmail.com>
  • Loading branch information
Rustin170506 committed Jun 8, 2023
1 parent adeb006 commit dd26dd3
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions cdc/sink/mq/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,13 +569,18 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s
return "", err
}

if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName {
log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", brokerConfigName)
// For compatibility with KOP, we checked all return values.
// 1. Kafka only returns requested configs.
// 2. Kop returns all configs.
for _, entry := range configEntries {
if entry.Name == brokerConfigName {
return entry.Value, nil
}
}

return configEntries[0].Value, nil
log.Warn("Kafka config item not found",
zap.String("configName", brokerConfigName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", brokerConfigName)
}

// getTopicConfig gets topic config by name.
Expand Down

0 comments on commit dd26dd3

Please sign in to comment.