Skip to content

Commit

Permalink
Resolve conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: hi-rustin <rustin.liu@gmail.com>
  • Loading branch information
Rustin170506 committed May 17, 2023
1 parent 26c5d52 commit 7333210
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions cdc/sink/mq/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,19 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s
return "", err
}

if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName {
log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", brokerConfigName)
// For compatibility with KOP, we checked all return values.
// 1. Kafka only returns requested configs.
// 2. Kop returns all configs.
for _, entry := range configEntries {
if entry.Name == brokerConfigName {
return entry.Value, nil
}
}

return configEntries[0].Value, nil
log.Warn("Kafka config item not found",
zap.String("configName", brokerConfigName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", brokerConfigName)
}

// getTopicConfig gets topic config by name.
Expand Down

0 comments on commit 7333210

Please sign in to comment.