Skip to content

Commit

Permalink
cdc/sink: adjust kafka initialization logic (#3192) (#4162)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 13, 2022
1 parent 181bb6c commit 7d0a048
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 112 deletions.
10 changes: 7 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,11 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
return r == '/'
})
producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh)
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -411,8 +415,8 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if s != "" {
opts["max-batch-size"] = s
}
// For now, it's a place holder. Avro format have to make connection to Schema Registery,
// and it may needs credential.
// For now, it's a placeholder. Avro format have to make connection to Schema Registry,
// and it may need credential.
credential := &security.Credential{}
sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -163,6 +169,12 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down
239 changes: 177 additions & 62 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import (
"go.uber.org/zap"
)

const defaultPartitionNum = 4
const defaultPartitionNum = 3

// Config stores the Kafka configuration
// Config stores user specified Kafka producer configuration
type Config struct {
PartitionNum int32
BrokerEndpoints []string
PartitionNum int32

// User should make sure that `replication-factor` not greater than the number of kafka brokers.
ReplicationFactor int16

Version string
Expand All @@ -50,8 +53,8 @@ type Config struct {
ClientID string
Credential *security.Credential
SaslScram *security.SaslScram
// control whether to create topic and verify partition number
TopicPreProcess bool
// control whether to create topic
AutoCreate bool
}

// NewConfig returns a default Kafka configuration
Expand All @@ -64,19 +67,24 @@ func NewConfig() *Config {
Compression: "none",
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
TopicPreProcess: true,
AutoCreate: true,
}
}

// Initialize the kafka configuration
func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error {
s := sinkURI.Query().Get("partition-num")
c.BrokerEndpoints = strings.Split(sinkURI.Host, ",")
params := sinkURI.Query()
s := params.Get("partition-num")
if s != "" {
a, err := strconv.Atoi(s)
if err != nil {
return err
}
c.PartitionNum = int32(a)
if c.PartitionNum <= 0 {
return cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(c.PartitionNum)
}
}

s = sinkURI.Query().Get("replication-factor")
Expand Down Expand Up @@ -156,7 +164,7 @@ func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfi
if err != nil {
return err
}
c.TopicPreProcess = autoCreate
c.AutoCreate = autoCreate
}

return nil
Expand Down Expand Up @@ -379,85 +387,122 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
}
}

// kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't
// exit, creates it automatically.
func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) {
admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg)
func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error {
// FIXME: find a way to remove this failpoint for workload the unit test
failpoint.Inject("SkipTopicAutoCreate", func() {
failpoint.Return(nil)
})
admin, err := sarama.NewClusterAdmin(config.BrokerEndpoints, saramaConfig)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
defer func() {
err := admin.Close()
if err != nil {
log.Warn("close admin client failed", zap.Error(err))
if err := admin.Close(); err != nil {
log.Warn("close kafka cluster admin failed", zap.Error(err))
}
}()

topics, err := admin.ListTopics()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
partitionNum := config.PartitionNum
topicDetail, exist := topics[topic]
if exist {
log.Info("get partition number of topic", zap.String("topic", topic), zap.Int32("partition_num", topicDetail.NumPartitions))
if partitionNum == 0 {
partitionNum = topicDetail.NumPartitions
} else if partitionNum < topicDetail.NumPartitions {
log.Warn("partition number assigned in sink-uri is less than that of topic", zap.Int32("topic partition num", topicDetail.NumPartitions))
} else if partitionNum > topicDetail.NumPartitions {
return 0, cerror.ErrKafkaInvalidPartitionNum.GenWithStack(
"partition number(%d) assigned in sink-uri is more than that of topic(%d)", partitionNum, topicDetail.NumPartitions)

info, created := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if created {
// make sure that topic's `max.message.bytes` is not less than given `max-message-bytes`
// else the producer will send message that too large to make topic reject, then changefeed would error.
// only the default `open protocol` and `craft protocol` use `max-message-bytes`, so check this for them.
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
} else {
if partitionNum == 0 {
partitionNum = defaultPartitionNum
log.Warn("topic not found and partition number is not specified, using default partition number", zap.String("topic", topic), zap.Int32("partition_num", partitionNum))
if topicMaxMessageBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+
"Please make sure `max-message-bytes` not greater than topic `max.message.bytes`",
topicMaxMessageBytes, config.MaxMessageBytes)
}
log.Info("create a topic", zap.String("topic", topic),
zap.Int32("partition_num", partitionNum),
zap.Int16("replication_factor", config.ReplicationFactor))
err := admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: partitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
// TODO idenfity the cause of "Topic with this name already exists"
if err != nil && !strings.Contains(err.Error(), "already exists") {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
if config.AutoCreate {
log.Warn("topic already exist, TiCDC will not create the topic",
zap.String("topic", topic), zap.Any("detail", info))
}

if err := config.adjustPartitionNum(info.NumPartitions); err != nil {
return errors.Trace(err)
}

return nil
}

if !config.AutoCreate {
return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found")
}

// when try to create the topic, we don't know how to set the `max.message.bytes` for the topic.
// Kafka would create the topic with broker's `message.max.bytes`,
// we have to make sure it's not greater than `max-message-bytes`
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}

if brokerMessageMaxBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"broker's message.max.bytes(%d) less than max-message-bytes(%d)"+
"Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`",
brokerMessageMaxBytes, config.MaxMessageBytes)
}

// topic not created yet, and user does not specify the `partition-num` in the sink uri.
if config.PartitionNum == 0 {
config.PartitionNum = defaultPartitionNum
log.Warn("partition-num is not set, use the default partition count",
zap.String("topic", topic), zap.Int32("partitions", config.PartitionNum))
}

err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: config.PartitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
// TODO identify the cause of "Topic with this name already exists"
if err != nil && !strings.Contains(err.Error(), "already exists") {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

return partitionNum, nil
log.Info("TiCDC create the topic",
zap.Int32("partition-num", config.PartitionNum),
zap.Int16("replication-factor", config.ReplicationFactor))

return nil
}

var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return nil, err
}
if config.PartitionNum < 0 {
return nil, cerror.ErrKafkaInvalidPartitionNum.GenWithStackByArgs(config.PartitionNum)

if err := topicPreProcess(topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
asyncClient, err := sarama.NewAsyncProducer(strings.Split(address, ","), cfg)

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
syncClient, err := sarama.NewSyncProducer(strings.Split(address, ","), cfg)
syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

partitionNum := config.PartitionNum
if config.TopicPreProcess {
partitionNum, err = kafkaTopicPreProcess(topic, address, config, cfg)
if err != nil {
return nil, err
}
}

notifier := new(notify.Notifier)
flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
Expand All @@ -467,11 +512,11 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
asyncClient: asyncClient,
syncClient: syncClient,
topic: topic,
partitionNum: partitionNum,
partitionNum: config.PartitionNum,
partitionOffset: make([]struct {
flushed uint64
sent uint64
}, partitionNum),
}, config.PartitionNum),
flushedNotifier: notifier,
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
Expand Down Expand Up @@ -558,6 +603,11 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.WaitForAll

// Time out in five minutes(600 * 500ms).
config.Producer.Retry.Max = 600
config.Producer.Retry.Backoff = 500 * time.Millisecond

switch strings.ToLower(strings.TrimSpace(c.Compression)) {
case "none":
config.Producer.Compression = sarama.CompressionNone
Expand All @@ -574,10 +624,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Producer.Compression = sarama.CompressionNone
}

// Time out in five minutes(600 * 500ms).
config.Producer.Retry.Max = 600
config.Producer.Retry.Backoff = 500 * time.Millisecond

// Time out in one minute(120 * 500ms).
config.Admin.Retry.Max = 120
config.Admin.Retry.Backoff = 500 * time.Millisecond
Expand Down Expand Up @@ -606,3 +652,72 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {

return config, err
}

func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) {
target := "message.max.bytes"
_, controllerID, err := admin.DescribeCluster()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

configEntries, err := admin.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(int(controllerID)),
ConfigNames: []string{target},
})
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if len(configEntries) == 0 || configEntries[0].Name != target {
return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack(
"since cannot find the `message.max.bytes` from the broker's configuration, " +
"ticdc decline to create the topic and changefeed to prevent potential error")
}

result, err := strconv.Atoi(configEntries[0].Value)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

return result, nil
}

func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) {
if a, ok := info.ConfigEntries["max.message.bytes"]; ok {
result, err := strconv.Atoi(*a)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
return result, nil
}

return getBrokerMessageMaxBytes(admin)
}

// adjust the partition-num by the topic's partition count
func (c *Config) adjustPartitionNum(realPartitionCount int32) error {
// user does not specify the `partition-num` in the sink-uri
if c.PartitionNum == 0 {
c.PartitionNum = realPartitionCount
return nil
}

if c.PartitionNum < realPartitionCount {
log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+
"Some partitions will not have messages dispatched to",
zap.Int32("sink-uri partitions", c.PartitionNum),
zap.Int32("topic partitions", realPartitionCount))
return nil
}

// Make sure that the user-specified `partition-num` is not greater than
// the real partition count, since messages would be dispatched to different
// partitions, this could prevent potential correctness problems.
if c.PartitionNum > realPartitionCount {
return cerror.ErrKafkaInvalidPartitionNum.GenWithStack(
"the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)",
c.PartitionNum, realPartitionCount)
}
return nil
}
Loading

0 comments on commit 7d0a048

Please sign in to comment.