From dd26dd381593a94e8f6f28eaa4f33f5e8fbc11c4 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Thu, 8 Jun 2023 15:39:10 +0800 Subject: [PATCH] cp #8893 Signed-off-by: hi-rustin --- cdc/sink/mq/producer/kafka/kafka.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cdc/sink/mq/producer/kafka/kafka.go b/cdc/sink/mq/producer/kafka/kafka.go index 275dd2ddcb9..b7effca49b4 100644 --- a/cdc/sink/mq/producer/kafka/kafka.go +++ b/cdc/sink/mq/producer/kafka/kafka.go @@ -569,13 +569,18 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s return "", err } - if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName { - log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName)) - return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( - "cannot find the `%s` from the broker's configuration", brokerConfigName) + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range configEntries { + if entry.Name == brokerConfigName { + return entry.Value, nil + } } - - return configEntries[0].Value, nil + log.Warn("Kafka config item not found", + zap.String("configName", brokerConfigName)) + return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( + "cannot find the `%s` from the broker's configuration", brokerConfigName) } // getTopicConfig gets topic config by name.