Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/sink: adjust kafka initialization logic (#3192) #4162

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,11 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
return r == '/'
})
producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh)
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -411,8 +415,8 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if s != "" {
opts["max-batch-size"] = s
}
// For now, it's a place holder. Avro format have to make connection to Schema Registery,
// and it may needs credential.
// For now, it's a placeholder. Avro format have to make connection to Schema Registry,
// and it may need credential.
credential := &security.Credential{}
sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -163,6 +169,12 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down
239 changes: 177 additions & 62 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import (
"go.uber.org/zap"
)

const defaultPartitionNum = 4
const defaultPartitionNum = 3

// Config stores the Kafka configuration
// Config stores user specified Kafka producer configuration
type Config struct {
PartitionNum int32
BrokerEndpoints []string
PartitionNum int32

// User should make sure that `replication-factor` not greater than the number of kafka brokers.
ReplicationFactor int16

Version string
Expand All @@ -50,8 +53,8 @@ type Config struct {
ClientID string
Credential *security.Credential
SaslScram *security.SaslScram
// control whether to create topic and verify partition number
TopicPreProcess bool
// control whether to create topic
AutoCreate bool
}

// NewConfig returns a default Kafka configuration
Expand All @@ -64,19 +67,24 @@ func NewConfig() *Config {
Compression: "none",
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
TopicPreProcess: true,
AutoCreate: true,
}
}

// Initialize the kafka configuration
func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error {
s := sinkURI.Query().Get("partition-num")
c.BrokerEndpoints = strings.Split(sinkURI.Host, ",")
params := sinkURI.Query()
s := params.Get("partition-num")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.PartitionNum = int32(a)
if c.PartitionNum <= 0 {
return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(c.PartitionNum)
}
}

s = sinkURI.Query().Get("replication-factor")
Expand Down Expand Up @@ -156,7 +164,7 @@ func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfi
if err != nil {
return err
}
c.TopicPreProcess = autoCreate
c.AutoCreate = autoCreate
}

return nil
Expand Down Expand Up @@ -379,85 +387,122 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
}
}

// kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't
// exit, creates it automatically.
func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) {
admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg)
func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error {
// FIXME: find a way to remove this failpoint for workload the unit test
failpoint.Inject("SkipTopicAutoCreate", func() {
failpoint.Return(nil)
})
admin, err := sarama.NewClusterAdmin(config.BrokerEndpoints, saramaConfig)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
defer func() {
err := admin.Close()
if err != nil {
log.Warn("close admin client failed", zap.Error(err))
if err := admin.Close(); err != nil {
log.Warn("close kafka cluster admin failed", zap.Error(err))
}
}()

topics, err := admin.ListTopics()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
partitionNum := config.PartitionNum
topicDetail, exist := topics[topic]
if exist {
log.Info("get partition number of topic", zap.String("topic", topic), zap.Int32("partition_num", topicDetail.NumPartitions))
if partitionNum == 0 {
partitionNum = topicDetail.NumPartitions
} else if partitionNum < topicDetail.NumPartitions {
log.Warn("partition number assigned in sink-uri is less than that of topic", zap.Int32("topic partition num", topicDetail.NumPartitions))
} else if partitionNum > topicDetail.NumPartitions {
return 0, cerror.ErrKafkaInvalidPartitionNum.GenWithStack(
"partition number(%d) assigned in sink-uri is more than that of topic(%d)", partitionNum, topicDetail.NumPartitions)

info, created := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if created {
// make sure that topic's `max.message.bytes` is not less than given `max-message-bytes`
// else the producer will send message that too large to make topic reject, then changefeed would error.
// only the default `open protocol` and `craft protocol` use `max-message-bytes`, so check this for them.
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
} else {
if partitionNum == 0 {
partitionNum = defaultPartitionNum
log.Warn("topic not found and partition number is not specified, using default partition number", zap.String("topic", topic), zap.Int32("partition_num", partitionNum))
if topicMaxMessageBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+
"Please make sure `max-message-bytes` not greater than topic `max.message.bytes`",
topicMaxMessageBytes, config.MaxMessageBytes)
}
log.Info("create a topic", zap.String("topic", topic),
zap.Int32("partition_num", partitionNum),
zap.Int16("replication_factor", config.ReplicationFactor))
err := admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: partitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
// TODO idenfity the cause of "Topic with this name already exists"
if err != nil && !strings.Contains(err.Error(), "already exists") {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
if config.AutoCreate {
log.Warn("topic already exist, TiCDC will not create the topic",
zap.String("topic", topic), zap.Any("detail", info))
}

if err := config.adjustPartitionNum(info.NumPartitions); err != nil {
return errors.Trace(err)
}

return nil
}

if !config.AutoCreate {
return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found")
}

// when try to create the topic, we don't know how to set the `max.message.bytes` for the topic.
// Kafka would create the topic with broker's `message.max.bytes`,
// we have to make sure it's not greater than `max-message-bytes`
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}

if brokerMessageMaxBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"broker's message.max.bytes(%d) less than max-message-bytes(%d)"+
"Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`",
brokerMessageMaxBytes, config.MaxMessageBytes)
}

// topic not created yet, and user does not specify the `partition-num` in the sink uri.
if config.PartitionNum == 0 {
config.PartitionNum = defaultPartitionNum
log.Warn("partition-num is not set, use the default partition count",
zap.String("topic", topic), zap.Int32("partitions", config.PartitionNum))
}

err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: config.PartitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
// TODO identify the cause of "Topic with this name already exists"
if err != nil && !strings.Contains(err.Error(), "already exists") {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

return partitionNum, nil
log.Info("TiCDC create the topic",
zap.Int32("partition-num", config.PartitionNum),
zap.Int16("replication-factor", config.ReplicationFactor))

return nil
}

var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return nil, err
}
if config.PartitionNum < 0 {
return nil, cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(config.PartitionNum)

if err := topicPreProcess(topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
asyncClient, err := sarama.NewAsyncProducer(strings.Split(address, ","), cfg)

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
syncClient, err := sarama.NewSyncProducer(strings.Split(address, ","), cfg)
syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

partitionNum := config.PartitionNum
if config.TopicPreProcess {
partitionNum, err = kafkaTopicPreProcess(topic, address, config, cfg)
if err != nil {
return nil, err
}
}

notifier := new(notify.Notifier)
flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
Expand All @@ -467,11 +512,11 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
asyncClient: asyncClient,
syncClient: syncClient,
topic: topic,
partitionNum: partitionNum,
partitionNum: config.PartitionNum,
partitionOffset: make([]struct {
flushed uint64
sent uint64
}, partitionNum),
}, config.PartitionNum),
flushedNotifier: notifier,
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
Expand Down Expand Up @@ -558,6 +603,11 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.WaitForAll

// Time out in five minutes(600 * 500ms).
config.Producer.Retry.Max = 600
config.Producer.Retry.Backoff = 500 * time.Millisecond

switch strings.ToLower(strings.TrimSpace(c.Compression)) {
case "none":
config.Producer.Compression = sarama.CompressionNone
Expand All @@ -574,10 +624,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Producer.Compression = sarama.CompressionNone
}

// Time out in five minutes(600 * 500ms).
config.Producer.Retry.Max = 600
config.Producer.Retry.Backoff = 500 * time.Millisecond

// Time out in one minute(120 * 500ms).
config.Admin.Retry.Max = 120
config.Admin.Retry.Backoff = 500 * time.Millisecond
Expand Down Expand Up @@ -606,3 +652,72 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {

return config, err
}

func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) {
target := "message.max.bytes"
_, controllerID, err := admin.DescribeCluster()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

configEntries, err := admin.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(int(controllerID)),
ConfigNames: []string{target},
})
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if len(configEntries) == 0 || configEntries[0].Name != target {
return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack(
"since cannot find the `message.max.bytes` from the broker's configuration, " +
"ticdc decline to create the topic and changefeed to prevent potential error")
}

result, err := strconv.Atoi(configEntries[0].Value)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

return result, nil
}

func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) {
if a, ok := info.ConfigEntries["max.message.bytes"]; ok {
result, err := strconv.Atoi(*a)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
return result, nil
}

return getBrokerMessageMaxBytes(admin)
}

// adjust the partition-num by the topic's partition count
func (c *Config) adjustPartitionNum(realPartitionCount int32) error {
// user does not specify the `partition-num` in the sink-uri
if c.PartitionNum == 0 {
c.PartitionNum = realPartitionCount
return nil
}

if c.PartitionNum < realPartitionCount {
log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+
"Some partitions will not have messages dispatched to",
zap.Int32("sink-uri partitions", c.PartitionNum),
zap.Int32("topic partitions", realPartitionCount))
return nil
}

// Make sure that the user-specified `partition-num` is not greater than
// the real partition count, since messages would be dispatched to different
// partitions, this could prevent potential correctness problems.
if c.PartitionNum > realPartitionCount {
return cerror.ErrKafkaInvalidPartitionNum.GenWithStack(
"the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)",
c.PartitionNum, realPartitionCount)
}
return nil
}
Loading