diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 667c44031..97d069106 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -164,6 +164,7 @@ port = 3306 # kafka-addrs = "127.0.0.1:9092" # kafka-version = "0.8.2.0" # kafka-max-messages = 1024 +# kafka-max-message-size = 1073741824 # configure max kafka **client** message size # kafka-client-id = "tidb_binlog" # # the topic name drainer will push msg, the default name is _obinlog diff --git a/drainer/config.go b/drainer/config.go index b42b7792a..52c78f224 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -320,7 +320,9 @@ func (cfg *Config) Parse(args []string) error { return errors.Trace(err) } - initializeSaramaGlobalConfig() + if cfg.SyncerCfg.DestDBType == "kafka" { + initializeSaramaGlobalConfig(cfg.SyncerCfg.To.KafkaMaxMessageSize) + } return cfg.validate() } @@ -439,8 +441,9 @@ func (cfg *Config) adjustConfig() error { } if cfg.SyncerCfg.DestDBType == "kafka" { - maxMsgSize = maxKafkaMsgSize - + if cfg.SyncerCfg.To.KafkaMaxMessageSize <= 0 { + cfg.SyncerCfg.To.KafkaMaxMessageSize = maxKafkaMsgSize + } // get KafkaAddrs from zookeeper if ZkAddrs is setted if cfg.SyncerCfg.To.ZKAddrs != "" { zkClient, err := newZKFromConnectionString(cfg.SyncerCfg.To.ZKAddrs, time.Second*5, time.Second*60) diff --git a/drainer/config_test.go b/drainer/config_test.go index 5345467e4..4729549cb 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -16,12 +16,14 @@ package drainer import ( "bytes" "fmt" + "math" "os" "path" "testing" "time" "github.com/BurntSushi/toml" + "github.com/Shopify/sarama" "github.com/pingcap/check" . "github.com/pingcap/check" "github.com/pingcap/parser/mysql" @@ -222,7 +224,6 @@ func (t *testDrainerSuite) TestAdjustConfig(c *C) { c.Assert(err, IsNil) c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file") c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1) - c.Assert(maxMsgSize, Equals, maxGrpcMsgSize) cfg = NewConfig() err = cfg.adjustConfig() @@ -352,12 +353,15 @@ func (t *testKafkaSuite) TestConfigDestDBTypeKafka(c *C) { c.Assert(cfg.SyncerCfg.To.KafkaAddrs, Matches, defaultKafkaAddrs) c.Assert(cfg.SyncerCfg.To.KafkaVersion, Equals, defaultKafkaVersion) c.Assert(cfg.SyncerCfg.To.KafkaMaxMessages, Equals, 1024) - c.Assert(maxMsgSize, Equals, maxKafkaMsgSize) + c.Assert(sarama.MaxResponseSize, Equals, int32(maxKafkaMsgSize)) + c.Assert(sarama.MaxRequestSize, Equals, int32(maxKafkaMsgSize)+1) - // With Zookeeper address + // With Zookeeper address and maxKafkaMsgSize + maxInt32 := math.MaxInt32 cfg = NewConfig() cfg.SyncerCfg.To = new(dsync.DBConfig) cfg.SyncerCfg.To.ZKAddrs = "host1:2181" + cfg.SyncerCfg.To.KafkaMaxMessageSize = int32(maxInt32) err = cfg.Parse(args) c.Assert(err, IsNil) c.Assert(cfg.MetricsAddr, Equals, "192.168.15.10:9091") @@ -371,4 +375,7 @@ func (t *testKafkaSuite) TestConfigDestDBTypeKafka(c *C) { c.Assert(cfg.SyncerCfg.To.KafkaAddrs, Matches, `(192\.0\.2\.1:9092,192\.0\.2\.2:9092|192\.0\.2\.2:9092,192\.0\.2\.1:9092)`) c.Assert(cfg.SyncerCfg.To.KafkaVersion, Equals, defaultKafkaVersion) c.Assert(cfg.SyncerCfg.To.KafkaMaxMessages, Equals, 1024) + c.Assert(sarama.MaxResponseSize, Equals, int32(maxInt32)) + c.Assert(sarama.MaxRequestSize, Equals, int32(maxInt32)) + initializeSaramaGlobalConfig(maxKafkaMsgSize) } diff --git a/drainer/pump.go b/drainer/pump.go index c3c0b41e5..9616a8b72 100644 --- a/drainer/pump.go +++ b/drainer/pump.go @@ -201,7 +201,7 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error { p.grpcConn.Close() } - callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxMsgSize)} + callOpts := []grpc.CallOption{grpc.MaxCallRecvMsgSize(maxGrpcMsgSize)} if compressor, ok := getCompressorName(ctx); ok { p.logger.Info("pump grpc compression enabled") diff --git a/drainer/sync/util.go b/drainer/sync/util.go index 50462c7c7..3cfce3762 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -45,12 +45,14 @@ type DBConfig struct { Merge bool `toml:"merge" json:"merge"` - ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` - KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` - KafkaVersion string `toml:"kafka-version" json:"kafka-version"` - KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"` - KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"` - TopicName string `toml:"topic-name" json:"topic-name"` + ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` + KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` + KafkaVersion string `toml:"kafka-version" json:"kafka-version"` + KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"` + KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"` + KafkaMaxMessageSize int32 `toml:"kafka-max-message-size" json:"kafka-max-message-size"` + TopicName string `toml:"topic-name" json:"topic-name"` + // get it from pd ClusterID uint64 `toml:"-" json:"-"` } diff --git a/drainer/util.go b/drainer/util.go index d11cd3619..5e9ace394 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -35,12 +35,8 @@ import ( ) const ( - maxKafkaMsgSize = 1024 * 1024 * 1024 - maxGrpcMsgSize = math.MaxInt32 -) - -var ( - maxMsgSize = maxGrpcMsgSize + maxKafkaMsgSize = 1 << 30 + maxGrpcMsgSize = int(^uint(0) >> 1) ) // taskGroup is a wrapper of `sync.WaitGroup`. @@ -132,10 +128,14 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) { return checkpointCfg, nil } -func initializeSaramaGlobalConfig() { - sarama.MaxResponseSize = int32(maxKafkaMsgSize) +func initializeSaramaGlobalConfig(kafkaMsgSize int32) { + sarama.MaxResponseSize = kafkaMsgSize // add 1 to avoid confused log: Producer.MaxMessageBytes must be smaller than MaxRequestSize; it will be ignored - sarama.MaxRequestSize = int32(maxKafkaMsgSize) + 1 + if kafkaMsgSize < math.MaxInt32 { + sarama.MaxRequestSize = kafkaMsgSize + 1 + } else { + sarama.MaxRequestSize = kafkaMsgSize + } } func getDDLJob(tiStore kv.Storage, id int64) (*model.Job, error) { diff --git a/pump/config.go b/pump/config.go index 978a62c37..ced2cfdd6 100644 --- a/pump/config.go +++ b/pump/config.go @@ -17,7 +17,6 @@ import ( "crypto/tls" "flag" "fmt" - "math" "net" "net/url" "os" @@ -36,7 +35,7 @@ const ( defaultEtcdDialTimeout = 5 * time.Second defaultEtcdURLs = "http://127.0.0.1:2379" defaultListenAddr = "127.0.0.1:8250" - defautMaxMsgSize = math.MaxInt32 // max grpc message size + defaultMaxMsgSize = int(^uint(0) >> 1) // max grpc message size defaultHeartbeatInterval = 2 defaultGC = "7" defaultDataDir = "data.pump" @@ -111,7 +110,7 @@ func NewConfig() *Config { // global config fs.BoolVar(&GlobalConfig.enableDebug, "enable-debug", false, "enable print debug log") - fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defautMaxMsgSize, "max message size tidb produce into pump") + fs.IntVar(&GlobalConfig.maxMsgSize, "max-message-size", defaultMaxMsgSize, "max message size tidb produce into pump") fs.Int64Var(new(int64), "binlog-file-size", 0, "DEPRECATED") fs.BoolVar(new(bool), "enable-binlog-slice", false, "DEPRECATED") fs.IntVar(new(int), "binlog-slice-size", 0, "DEPRECATED") diff --git a/pump/server.go b/pump/server.go index 125d6ef6a..1cfb515f0 100644 --- a/pump/server.go +++ b/pump/server.go @@ -103,7 +103,7 @@ func init() { // it must be set before any real grpc operation. grpc.EnableTracing = false GlobalConfig = &globalConfig{ - maxMsgSize: defautMaxMsgSize, + maxMsgSize: defaultMaxMsgSize, } }