From f3829d3df471bf9e421ec58a4c5e18270e459603 Mon Sep 17 00:00:00 2001 From: Andy PENG Date: Sun, 29 Jul 2018 13:01:45 +0800 Subject: [PATCH] feat: add Timeout for ClusterAdmin --- admin.go | 11 +++++++++-- config.go | 15 +++++++++++++++ config_test.go | 22 ++++++++++++++++++++++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/admin.go b/admin.go index 68284641c..52725758d 100644 --- a/admin.go +++ b/admin.go @@ -118,6 +118,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO request := &CreateTopicsRequest{ TopicDetails: topicDetails, ValidateOnly: validateOnly, + Timeout: ca.conf.Admin.Timeout, } if ca.conf.Version.IsAtLeast(V0_11_0_0) { @@ -155,7 +156,10 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error { return ErrInvalidTopic } - request := &DeleteTopicsRequest{Topics: []string{topic}} + request := &DeleteTopicsRequest{ + Topics: []string{topic}, + Timeout: ca.conf.Admin.Timeout, + } if ca.conf.Version.IsAtLeast(V0_11_0_0) { request.Version = 1 @@ -192,6 +196,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ request := &CreatePartitionsRequest{ TopicPartitions: topicPartitions, + Timeout: ca.conf.Admin.Timeout, } b, err := ca.Controller() @@ -225,7 +230,9 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i topics := make(map[string]*DeleteRecordsRequestTopic) topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets} request := &DeleteRecordsRequest{ - Topics: topics} + Topics: topics, + Timeout: ca.conf.Admin.Timeout, + } b, err := ca.Controller() if err != nil { diff --git a/config.go b/config.go index 56fa37952..08f533084 100644 --- a/config.go +++ b/config.go @@ -18,6 +18,13 @@ var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`) // Config is used to pass multiple configuration options to Sarama's constructors. type Config struct { + // Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client. + Admin struct { + // The maximum duration the administrative Kafka client will wait for ClusterAdmin operations, + // including topics, brokers, configurations and ACLs (defaults to 3 seconds). + Timeout time.Duration + } + // Net is the namespace for network-level properties used by the Broker, and // shared by the Client/Producer/Consumer. Net struct { @@ -292,6 +299,8 @@ type Config struct { func NewConfig() *Config { c := &Config{} + c.Admin.Timeout = 3 * time.Second + c.Net.MaxOpenRequests = 5 c.Net.DialTimeout = 30 * time.Second c.Net.ReadTimeout = 30 * time.Second @@ -391,6 +400,12 @@ func (c *Config) Validate() error { return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled") } + // validate the Admin values + switch { + case c.Admin.Timeout <= 0: + return ConfigurationError("Admin.Timeout must be > 0") + } + // validate the Metadata values switch { case c.Metadata.Retry.Max < 0: diff --git a/config_test.go b/config_test.go index 40aa453a9..7f751922b 100644 --- a/config_test.go +++ b/config_test.go @@ -122,6 +122,28 @@ func TestMetadataConfigValidates(t *testing.T) { } } +func TestAdminConfigValidates(t *testing.T) { + tests := []struct { + name string + cfg func(*Config) // resorting to using a function as a param because of internal composite structs + err string + }{ + {"Timeout", + func(cfg *Config) { + cfg.Admin.Timeout = 0 + }, + "Admin.Timeout must be > 0"}, + } + + for i, test := range tests { + c := NewConfig() + test.cfg(c) + if err := c.Validate(); string(err.(ConfigurationError)) != test.err { + t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err) + } + } +} + func TestProducerConfigValidates(t *testing.T) { tests := []struct { name string