Skip to content

Commit

Permalink
drainer: fix kafka message limit problem (#1039) (#1080)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 12, 2021
1 parent b6be90d commit 75683e3
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 26 deletions.
1 change: 1 addition & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ port = 3306
# kafka-addrs = "127.0.0.1:9092"
# kafka-version = "0.8.2.0"
# kafka-max-messages = 1024
# kafka-max-message-size = 1073741824 # configure max kafka **client** message size
# kafka-client-id = "tidb_binlog"
#
# the topic name drainer will push msg, the default name is <cluster-id>_obinlog
Expand Down
9 changes: 6 additions & 3 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func (cfg *Config) Parse(args []string) error {
return errors.Trace(err)
}

initializeSaramaGlobalConfig()
if cfg.SyncerCfg.DestDBType == "kafka" {
initializeSaramaGlobalConfig(cfg.SyncerCfg.To.KafkaMaxMessageSize)
}
return cfg.validate()
}

Expand Down Expand Up @@ -439,8 +441,9 @@ func (cfg *Config) adjustConfig() error {
}

if cfg.SyncerCfg.DestDBType == "kafka" {
maxMsgSize = maxKafkaMsgSize

if cfg.SyncerCfg.To.KafkaMaxMessageSize <= 0 {
cfg.SyncerCfg.To.KafkaMaxMessageSize = maxKafkaMsgSize
}
// get KafkaAddrs from zookeeper if ZkAddrs is setted
if cfg.SyncerCfg.To.ZKAddrs != "" {
zkClient, err := newZKFromConnectionString(cfg.SyncerCfg.To.ZKAddrs, time.Second*5, time.Second*60)
Expand Down
13 changes: 10 additions & 3 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package drainer
import (
"bytes"
"fmt"
"math"
"os"
"path"
"testing"
"time"

"github.com/BurntSushi/toml"
"github.com/Shopify/sarama"
"github.com/pingcap/check"
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -206,7 +208,6 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file")
c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1)
c.Assert(maxMsgSize, Equals, maxGrpcMsgSize)

cfg = NewConfig()
err = cfg.adjustConfig()
Expand Down Expand Up @@ -335,12 +336,15 @@ func (t *testKafkaSuite) TestConfigDestDBTypeKafka(c *C) {
c.Assert(cfg.SyncerCfg.To.KafkaAddrs, Matches, defaultKafkaAddrs)
c.Assert(cfg.SyncerCfg.To.KafkaVersion, Equals, defaultKafkaVersion)
c.Assert(cfg.SyncerCfg.To.KafkaMaxMessages, Equals, 1024)
c.Assert(maxMsgSize, Equals, maxKafkaMsgSize)
c.Assert(sarama.MaxResponseSize, Equals, int32(maxKafkaMsgSize))
c.Assert(sarama.MaxRequestSize, Equals, int32(maxKafkaMsgSize)+1)

// With Zookeeper address
// With Zookeeper address and maxKafkaMsgSize
maxInt32 := math.MaxInt32
cfg = NewConfig()
cfg.SyncerCfg.To = new(dsync.DBConfig)
cfg.SyncerCfg.To.ZKAddrs = "host1:2181"
cfg.SyncerCfg.To.KafkaMaxMessageSize = int32(maxInt32)
err = cfg.Parse(args)
c.Assert(err, IsNil)
c.Assert(cfg.MetricsAddr, Equals, "192.168.15.10:9091")
Expand All @@ -354,4 +358,7 @@ func (t *testKafkaSuite) TestConfigDestDBTypeKafka(c *C) {
c.Assert(cfg.SyncerCfg.To.KafkaAddrs, Matches, `(192\.0\.2\.1:9092,192\.0\.2\.2:9092|192\.0\.2\.2:9092,192\.0\.2\.1:9092)`)
c.Assert(cfg.SyncerCfg.To.KafkaVersion, Equals, defaultKafkaVersion)
c.Assert(cfg.SyncerCfg.To.KafkaMaxMessages, Equals, 1024)
c.Assert(sarama.MaxResponseSize, Equals, int32(maxInt32))
c.Assert(sarama.MaxRequestSize, Equals, int32(maxInt32))
initializeSaramaGlobalConfig(maxKafkaMsgSize)
}
2 changes: 1 addition & 1 deletion drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error {
p.grpcConn.Close()
}

callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxMsgSize)}
callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxGrpcMsgSize)}

if compressor, ok := getCompressorName(ctx); ok {
p.logger.Info("pump grpc compression enabled")
Expand Down
14 changes: 8 additions & 6 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ type DBConfig struct {

Merge bool `toml:"merge" json:"merge"`

ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`
KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"`
KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"`
TopicName string `toml:"topic-name" json:"topic-name"`
ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`
KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"`
KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"`
KafkaMaxMessageSize int32 `toml:"kafka-max-message-size" json:"kafka-max-message-size"`
TopicName string `toml:"topic-name" json:"topic-name"`

// get it from pd
ClusterID uint64 `toml:"-" json:"-"`
}
Expand Down
18 changes: 9 additions & 9 deletions drainer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,8 @@ import (
)

const (
maxKafkaMsgSize = 1024 * 1024 * 1024
maxGrpcMsgSize = math.MaxInt32
)

var (
maxMsgSize = maxGrpcMsgSize
maxKafkaMsgSize = 1 << 30
maxGrpcMsgSize = int(^uint(0) >> 1)
)

// taskGroup is a wrapper of `sync.WaitGroup`.
Expand Down Expand Up @@ -132,10 +128,14 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) {
return checkpointCfg, nil
}

func initializeSaramaGlobalConfig() {
sarama.MaxResponseSize = int32(maxKafkaMsgSize)
func initializeSaramaGlobalConfig(kafkaMsgSize int32) {
sarama.MaxResponseSize = kafkaMsgSize
// add 1 to avoid confused log: Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored
sarama.MaxRequestSize = int32(maxKafkaMsgSize) + 1
if kafkaMsgSize < math.MaxInt32 {
sarama.MaxRequestSize = kafkaMsgSize + 1
} else {
sarama.MaxRequestSize = kafkaMsgSize
}
}

func getDDLJob(tiStore kv.Storage, id int64) (*model.Job, error) {
Expand Down
5 changes: 2 additions & 3 deletions pump/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"crypto/tls"
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
Expand All @@ -36,7 +35,7 @@ const (
defaultEtcdDialTimeout = 5 * time.Second
defaultEtcdURLs = "http://127.0.0.1:2379"
defaultListenAddr = "127.0.0.1:8250"
defautMaxMsgSize = math.MaxInt32 // max grpc message size
defaultMaxMsgSize = int(^uint(0) >> 1) // max grpc message size
defaultHeartbeatInterval = 2
defaultGC = "7"
defaultDataDir = "data.pump"
Expand Down Expand Up @@ -111,7 +110,7 @@ func NewConfig() *Config {

// global config
fs.BoolVar(&GlobalConfig.enableDebug, "enable-debug", false, "enable print debug log")
fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defautMaxMsgSize, "max message size tidb produce into pump")
fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defaultMaxMsgSize, "max message size tidb produce into pump")
fs.Int64Var(new(int64), "binlog-file-size", 0, "DEPRECATED")
fs.BoolVar(new(bool), "enable-binlog-slice", false, "DEPRECATED")
fs.IntVar(new(int), "binlog-slice-size", 0, "DEPRECATED")
Expand Down
2 changes: 1 addition & 1 deletion pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func init() {
// it must be set before any real grpc operation.
grpc.EnableTracing = false
GlobalConfig = &globalConfig{
maxMsgSize: defautMaxMsgSize,
maxMsgSize: defaultMaxMsgSize,
}
}

Expand Down

0 comments on commit 75683e3

Please sign in to comment.