Skip to content

Commit

Permalink
Merge pull request #1142 from andy2046/add_Timeout_for_ClusterAdmin
Browse files Browse the repository at this point in the history
add Timeout config for ClusterAdmin related Requests
  • Loading branch information
eapache committed Jul 30, 2018
2 parents c5b44e3 + f3829d3 commit e7238b1
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
11 changes: 9 additions & 2 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
request := &CreateTopicsRequest{
TopicDetails: topicDetails,
ValidateOnly: validateOnly,
Timeout: ca.conf.Admin.Timeout,
}

if ca.conf.Version.IsAtLeast(V0_11_0_0) {
Expand Down Expand Up @@ -155,7 +156,10 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {
return ErrInvalidTopic
}

request := &DeleteTopicsRequest{Topics: []string{topic}}
request := &DeleteTopicsRequest{
Topics: []string{topic},
Timeout: ca.conf.Admin.Timeout,
}

if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 1
Expand Down Expand Up @@ -192,6 +196,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [

request := &CreatePartitionsRequest{
TopicPartitions: topicPartitions,
Timeout: ca.conf.Admin.Timeout,
}

b, err := ca.Controller()
Expand Down Expand Up @@ -225,7 +230,9 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
topics := make(map[string]*DeleteRecordsRequestTopic)
topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
request := &DeleteRecordsRequest{
Topics: topics}
Topics: topics,
Timeout: ca.conf.Admin.Timeout,
}

b, err := ca.Controller()
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)

// Config is used to pass multiple configuration options to Sarama's constructors.
type Config struct {
// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
Admin struct {
// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
Timeout time.Duration
}

// Net is the namespace for network-level properties used by the Broker, and
// shared by the Client/Producer/Consumer.
Net struct {
Expand Down Expand Up @@ -292,6 +299,8 @@ type Config struct {
func NewConfig() *Config {
c := &Config{}

c.Admin.Timeout = 3 * time.Second

c.Net.MaxOpenRequests = 5
c.Net.DialTimeout = 30 * time.Second
c.Net.ReadTimeout = 30 * time.Second
Expand Down Expand Up @@ -391,6 +400,12 @@ func (c *Config) Validate() error {
return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
}

// validate the Admin values
switch {
case c.Admin.Timeout <= 0:
return ConfigurationError("Admin.Timeout must be > 0")
}

// validate the Metadata values
switch {
case c.Metadata.Retry.Max < 0:
Expand Down
22 changes: 22 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ func TestMetadataConfigValidates(t *testing.T) {
}
}

func TestAdminConfigValidates(t *testing.T) {
tests := []struct {
name string
cfg func(*Config) // resorting to using a function as a param because of internal composite structs
err string
}{
{"Timeout",
func(cfg *Config) {
cfg.Admin.Timeout = 0
},
"Admin.Timeout must be > 0"},
}

for i, test := range tests {
c := NewConfig()
test.cfg(c)
if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
}
}
}

func TestProducerConfigValidates(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit e7238b1

Please sign in to comment.