diff --git a/kafka/config.go b/kafka/config.go index 5e9c69c..7dbf09f 100644 --- a/kafka/config.go +++ b/kafka/config.go @@ -45,18 +45,23 @@ type UserConfig struct { ScramClient *LSCRAMClient // LSCRAM safety certification // The maximum number of retries on failure, // the default is 0: retry all the time <0 means no retry - MaxRetry int - NetMaxOpenRequests int // Maximum number of requests - MaxProcessingTime time.Duration - NetDailTimeout time.Duration - NetReadTimeout time.Duration - NetWriteTimeout time.Duration - GroupSessionTimeout time.Duration - GroupRebalanceTimeout time.Duration - GroupRebalanceRetryMax int - IsolationLevel sarama.IsolationLevel - RetryInterval time.Duration // Retry Interval Works with MaxRetry - ProducerRetry struct { + MaxRetry int + NetMaxOpenRequests int // Maximum number of requests + MaxProcessingTime time.Duration + NetDailTimeout time.Duration + NetReadTimeout time.Duration + NetWriteTimeout time.Duration + GroupSessionTimeout time.Duration + GroupRebalanceTimeout time.Duration + GroupRebalanceRetryMax int + MetadataRetryMax int + MetadataRetryBackoff time.Duration + MetadataRefreshFrequency time.Duration + MetadataFull bool + MetadataAllowAutoTopicCreation bool + IsolationLevel sarama.IsolationLevel + RetryInterval time.Duration // Retry Interval Works with MaxRetry + ProducerRetry struct { Max int // Maximum number of retries RetryInterval time.Duration // RetryInterval retry interval } @@ -83,9 +88,11 @@ func (uc *UserConfig) getServerConfig() *sarama.Config { sc.ClientID = uc.ClientID } - sc.Metadata.Full = false // Disable pulling all metadata - sc.Metadata.Retry.Max = 1 // Metadata Update Repeat Times - sc.Metadata.Retry.Backoff = time.Second // Metadata update wait time + sc.Metadata.Retry.Max = uc.MetadataRetryMax + sc.Metadata.Retry.Backoff = uc.MetadataRetryBackoff + sc.Metadata.RefreshFrequency = uc.MetadataRefreshFrequency + sc.Metadata.Full = uc.MetadataFull + sc.Metadata.AllowAutoTopicCreation = uc.MetadataAllowAutoTopicCreation sc.Net.MaxOpenRequests = uc.NetMaxOpenRequests sc.Net.DialTimeout = uc.NetDailTimeout @@ -129,28 +136,33 @@ func GetDefaultConfig() *UserConfig { // The maximum waiting time for a single consumption pull request. // The maximum wait time will only wait if there is no recent data. // This value should be set larger to reduce the consumption of empty requests on the QPS of the server. - MaxWaitTime: time.Second, - RequiredAcks: sarama.WaitForAll, - ReturnSuccesses: true, - Timeout: time.Second, // Maximum request processing time on the server side - MaxMessageBytes: 131072, // CDMQ set up - FlushMessages: 0, - FlushMaxMessages: 0, - FlushBytes: 0, - FlushFrequency: 0, - BatchConsumeCount: 0, - BatchFlush: 2 * time.Second, - ScramClient: nil, - MaxRetry: 0, // Unlimited retries, compatible with historical situations - NetMaxOpenRequests: 5, - MaxProcessingTime: 100 * time.Millisecond, - NetDailTimeout: 30 * time.Second, - NetReadTimeout: 30 * time.Second, - NetWriteTimeout: 30 * time.Second, - GroupSessionTimeout: 10 * time.Second, - GroupRebalanceTimeout: 60 * time.Second, - GroupRebalanceRetryMax: 4, - IsolationLevel: 0, + MaxWaitTime: time.Second, + RequiredAcks: sarama.WaitForAll, + ReturnSuccesses: true, + Timeout: time.Second, // Maximum request processing time on the server side + MaxMessageBytes: 131072, // CDMQ set up + FlushMessages: 0, + FlushMaxMessages: 0, + FlushBytes: 0, + FlushFrequency: 0, + BatchConsumeCount: 0, + BatchFlush: 2 * time.Second, + ScramClient: nil, + MaxRetry: 0, // Unlimited retries, compatible with historical situations + NetMaxOpenRequests: 5, + MaxProcessingTime: 100 * time.Millisecond, + NetDailTimeout: 30 * time.Second, + NetReadTimeout: 30 * time.Second, + NetWriteTimeout: 30 * time.Second, + GroupSessionTimeout: 10 * time.Second, + GroupRebalanceTimeout: 60 * time.Second, + GroupRebalanceRetryMax: 4, + MetadataRetryMax: 1, + MetadataRetryBackoff: 1000 * time.Millisecond, + MetadataRefreshFrequency: 600 * time.Second, + MetadataFull: false, // disable pull all metadata + MetadataAllowAutoTopicCreation: true, + IsolationLevel: 0, // Message consumption error retry interval The default is 3s The unit of this parameter is time.Millisecond RetryInterval: 3000 * time.Millisecond, // production retries the default configuration to align with the default configuration of sarama.NewConfig diff --git a/kafka/config_parser.go b/kafka/config_parser.go index dc7230d..8ab0347 100644 --- a/kafka/config_parser.go +++ b/kafka/config_parser.go @@ -24,6 +24,7 @@ func newConfigParsers() map[string]configParseFunc { parserBasicConfig(m) parserAdvanceConfig(m) parserBatchConfig(m) + parserMetadataConfig(m) parserAuthConfig(m) parserDiscoverConfig(m) return m @@ -218,6 +219,52 @@ func parserBatchConfig(m map[string]configParseFunc) { } } +func parserMetadataConfig(m map[string]configParseFunc) { + m["metadataRetryMax"] = func(config *UserConfig, s string) error { + metadataRetryMax, err := strconv.Atoi(s) + if err != nil { + return err + } + if metadataRetryMax < 0 { + return errors.New("param not support: metadataRetryMax expect a value of no less than 0") + } + config.MetadataRetryMax = metadataRetryMax + return nil + } + m["metadataRetryBackoff"] = func(config *UserConfig, s string) error { + metadataRetryBackoff, err := strconv.Atoi(s) + if err != nil { + return err + } + if metadataRetryBackoff < 0 { + return errors.New("param not support: metadataRetryBackoff expect a value of no less than 0") + } + config.MetadataRetryBackoff = time.Duration(metadataRetryBackoff) * time.Millisecond + return nil + } + m["metadataRefreshFrequency"] = func(config *UserConfig, s string) error { + metadataRefreshFrequency, err := strconv.Atoi(s) + if err != nil { + return err + } + if metadataRefreshFrequency < 0 { + return errors.New("param not support: metadataRefreshFrequency expect a value of no less than 0") + } + config.MetadataRefreshFrequency = time.Duration(metadataRefreshFrequency) * time.Second + return nil + } + m["metadataFull"] = func(config *UserConfig, s string) error { + var err error + config.MetadataFull, err = strconv.ParseBool(s) + return err + } + m["metadataAllowAutoTopicCreation"] = func(config *UserConfig, s string) error { + var err error + config.MetadataAllowAutoTopicCreation, err = strconv.ParseBool(s) + return err + } +} + // parserAuthConfig returns configParseFunc for kafka auth, // These parameter items can assign the user's input configuration to userConfig. // The parameter items may validate the content of the user's input during execution, diff --git a/kafka/config_test.go b/kafka/config_test.go index 3890d41..d4e6eff 100644 --- a/kafka/config_test.go +++ b/kafka/config_test.go @@ -24,6 +24,11 @@ func TestParseAddress(t *testing.T) { assert.Equal(t, 54455, cfg.FetchMax) assert.Equal(t, 35326236, cfg.MaxMessageBytes) assert.Equal(t, sarama.OffsetOldest, cfg.Initial) + assert.Equal(t, 10, cfg.FlushMessages) + assert.Equal(t, 100, cfg.FlushMaxMessages) + assert.Equal(t, 10000000, cfg.FlushBytes) + assert.Equal(t, 100*time.Millisecond, cfg.FlushFrequency) + assert.Equal(t, true, cfg.Idempotent) RegisterAddrConfig("test_registered", cfg) @@ -36,8 +41,10 @@ func TestParseAddress(t *testing.T) { func Test_parseAddress(t *testing.T) { address := "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&user=kafka_test&password=cccaaabb&mechanism=SCRAM-SHA-512" _, err := ParseAddress(address) + assert.Nil(t, err) address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&protocol=SASL_SSL&user=kafka_test&password=cccaaabb&mechanism=SCRAM-SHA-512" _, err = ParseAddress(address) + assert.Nil(t, err) address = "127.0.0.1:9092?topics=Topic1,Topic2,Topic3&clientid=client1&version=0.10.2.0&strategy=sticky&batch=2&batchFlush=3000&group=test&maxRetry=10" _, err = ParseAddress(address) assert.Nil(t, err) @@ -72,7 +79,27 @@ func Test_parseAddressWithmaxWaitTime(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 250*time.Millisecond, conf.MaxWaitTime) address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&maxWaitTime=test" - conf, err = ParseAddress(address) + _, err = ParseAddress(address) + assert.NotNil(t, err) +} + +func Test_parseAddressWithMetadata(t *testing.T) { + address := "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=3&metadataRetryBackoff=10&metadataRefreshFrequency=100&metadataFull=true&metadataAllowAutoTopicCreation=true" + conf, err := ParseAddress(address) + assert.Nil(t, err) + assert.Equal(t, 3, conf.MetadataRetryMax) + assert.Equal(t, 10*time.Millisecond, conf.MetadataRetryBackoff) + assert.Equal(t, 100*time.Second, conf.MetadataRefreshFrequency) + assert.Equal(t, true, conf.MetadataFull) + assert.Equal(t, true, conf.MetadataAllowAutoTopicCreation) + address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=-3&metadataRetryBackoff=10&metadataRefreshFrequency=100" + _, err = ParseAddress(address) + assert.NotNil(t, err) + address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=3&metadataRetryBackoff=-10&metadataRefreshFrequency=100" + _, err = ParseAddress(address) + assert.NotNil(t, err) + address = "127.0.0.1:9092?topic=Topic1&clientid=client1&compression=none&partitioner=hash&metadataRetryMax=3&metadataRetryBackoff=10&metadataRefreshFrequency=-100" + _, err = ParseAddress(address) assert.NotNil(t, err) }