From 66abf7d4ab98fe2a611bbf7d2b6cc196632f53e9 Mon Sep 17 00:00:00 2001 From: hi-rustin Date: Sat, 6 May 2023 15:23:11 +0800 Subject: [PATCH] pkg/sink(ticdc): iterate all Kafka configs to support KOP --- pkg/sink/kafka/admin.go | 21 +++++++++++++-------- pkg/sink/kafka/v2/admin.go | 20 ++++++++++++-------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index 2cfe7e33291..c5d7533ca35 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -179,16 +179,21 @@ func (a *saramaAdminClient) GetBrokerConfig( return "", err } - if len(configEntries) == 0 || configEntries[0].Name != configName { - log.Warn("Kafka config item not found", - zap.String("namespace", a.changefeed.Namespace), - zap.String("changefeed", a.changefeed.ID), - zap.String("configName", configName)) - return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( - "cannot find the `%s` from the broker's configuration", configName) + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range configEntries { + if entry.Name == configName { + return entry.Value, nil + } } - return configEntries[0].Value, nil + log.Warn("Kafka config item not found", + zap.String("namespace", a.changefeed.Namespace), + zap.String("changefeed", a.changefeed.ID), + zap.String("configName", configName)) + return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack( + "cannot find the `%s` from the broker's configuration", configName) } func (a *saramaAdminClient) GetTopicsPartitions(_ context.Context) (map[string]int32, error) { diff --git a/pkg/sink/kafka/v2/admin.go b/pkg/sink/kafka/v2/admin.go index c1c3a66d3e6..247adeb4f4f 100644 --- a/pkg/sink/kafka/v2/admin.go +++ b/pkg/sink/kafka/v2/admin.go @@ -96,21 +96,25 @@ func (a *admin) GetBrokerConfig(ctx context.Context, configName string) (string, } if len(resp.Resources) == 0 || len(resp.Resources[0].ConfigEntries) == 0 { - log.Warn("kafka config item not found", + log.Warn("Kafka config item not found", zap.String("configName", configName)) return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack( "cannot find the `%s` from the broker's configuration", configName) } - entry := resp.Resources[0].ConfigEntries[0] - if entry.ConfigName != configName { - log.Warn("kafka config item not found", - zap.String("configName", configName)) - return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack( - "cannot find the `%s` from the broker's configuration", configName) + // For compatibility with KOP, we checked all return values. + // 1. Kafka only returns requested configs. + // 2. Kop returns all configs. + for _, entry := range resp.Resources[0].ConfigEntries { + if entry.ConfigName == configName { + return entry.ConfigValue, nil + } } - return entry.ConfigValue, nil + log.Warn("Kafka config item not found", + zap.String("configName", configName)) + return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack( + "cannot find the `%s` from the broker's configuration", configName) } func (a *admin) GetTopicsPartitions(ctx context.Context) (map[string]int32, error) {