From 7333210a47806d3e2f25a4330f21d551f9b29972 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Wed, 17 May 2023 15:54:20 +0800 Subject: [PATCH] Resolve conflicts Signed-off-by: hi-rustin --- cdc/sink/mq/producer/kafka/kafka.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/cdc/sink/mq/producer/kafka/kafka.go b/cdc/sink/mq/producer/kafka/kafka.go index 3cf832c8104..866d7d84172 100644 --- a/cdc/sink/mq/producer/kafka/kafka.go +++ b/cdc/sink/mq/producer/kafka/kafka.go @@ -547,13 +547,19 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s return "", err } - if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName { - log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName)) - return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( - "cannot find the `%s` from the broker's configuration", brokerConfigName) + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range configEntries { + if entry.Name == brokerConfigName { + return entry.Value, nil + } } - return configEntries[0].Value, nil + log.Warn("Kafka config item not found", + zap.String("configName", brokerConfigName)) + return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( + "cannot find the `%s` from the broker's configuration", brokerConfigName) } // getTopicConfig gets topic config by name.