diff --git a/cdc/sink/mq/producer/kafka/kafka.go b/cdc/sink/mq/producer/kafka/kafka.go index 3cf832c8104..866d7d84172 100644 --- a/cdc/sink/mq/producer/kafka/kafka.go +++ b/cdc/sink/mq/producer/kafka/kafka.go @@ -547,13 +547,19 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s return "", err } - if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName { - log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName)) - return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( - "cannot find the `%s` from the broker's configuration", brokerConfigName) + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range configEntries { + if entry.Name == brokerConfigName { + return entry.Value, nil + } } - return configEntries[0].Value, nil + log.Warn("Kafka config item not found", + zap.String("configName", brokerConfigName)) + return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( + "cannot find the `%s` from the broker's configuration", brokerConfigName) } // getTopicConfig gets topic config by name.