diff --git a/admin.go b/admin.go index 6c9b1e9e7..287e3dee6 100644 --- a/admin.go +++ b/admin.go @@ -2,7 +2,9 @@ package sarama import ( "errors" + "fmt" "math/rand" + "strconv" "sync" ) @@ -226,6 +228,16 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32 return response.Brokers, response.ControllerID, nil } +func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) { + brokers := ca.client.Brokers() + for _, b := range brokers { + if b.ID() == id { + return b, nil + } + } + return nil, fmt.Errorf("could not find broker id %d", id) +} + func (ca *clusterAdmin) findAnyBroker() (*Broker, error) { brokers := ca.client.Brokers() if len(brokers) > 0 { @@ -432,6 +444,13 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i return nil } +// Returns a bool indicating whether the resource request needs to go to a +// specific broker +func dependsOnSpecificNode(resource ConfigResource) bool { + return (resource.Type == BrokerResource && resource.Name != "") || + resource.Type == BrokerLoggerResource +} + func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) { var entries []ConfigEntry @@ -442,11 +461,23 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, Resources: resources, } - b, err := ca.Controller() + var ( + b *Broker + err error + ) + + // DescribeConfig of broker/broker logger must be sent to the broker in question + if dependsOnSpecificNode(resource) { + id, _ := strconv.Atoi(resource.Name) + b, err = ca.findBroker(int32(id)) + } else { + b, err = ca.findAnyBroker() + } if err != nil { return nil, err } + _ = b.Open(ca.client.Config()) rsp, err := b.DescribeConfigs(request) if err != nil { return nil, err @@ -479,11 +510,23 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string ValidateOnly: validateOnly, } - b, err := ca.Controller() + var ( + b *Broker + err error + ) + + // AlterConfig of broker/broker logger must be sent to the broker in question + if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) { + id, _ := strconv.Atoi(name) + b, err = ca.findBroker(int32(id)) + } else { + b, err = ca.findAnyBroker() + } if err != nil { return err } + _ = b.Open(ca.client.Config()) rsp, err := b.AlterConfigs(request) if err != nil { return err diff --git a/admin_test.go b/admin_test.go index 419c23058..92a5415c0 100644 --- a/admin_test.go +++ b/admin_test.go @@ -2,6 +2,10 @@ package sarama import ( "errors" + "fmt" + "io/ioutil" + "log" + "os" "strings" "testing" ) @@ -511,6 +515,61 @@ func TestClusterAdminDescribeConfig(t *testing.T) { } } +// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config +// is sent to the broker in the resource struct, _not_ the controller +func TestClusterAdminDescribeBrokerConfig(t *testing.T) { + Logger = log.New(os.Stdout, fmt.Sprintf("[%s] ", t.Name()), log.LstdFlags) + defer func() { Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) }() + + controllerBroker := NewMockBroker(t, 1) + defer controllerBroker.Close() + configBroker := NewMockBroker(t, 2) + defer configBroker.Close() + + controllerBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(controllerBroker.BrokerID()). + SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). + SetBroker(configBroker.Addr(), configBroker.BrokerID()), + }) + + configBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(controllerBroker.BrokerID()). + SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). + SetBroker(configBroker.Addr(), configBroker.BrokerID()), + "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin( + []string{ + controllerBroker.Addr(), + configBroker.Addr(), + }, config) + if err != nil { + t.Fatal(err) + } + + for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} { + resource := ConfigResource{Name: "2", Type: resourceType} + entries, err := admin.DescribeConfig(resource) + if err != nil { + t.Fatal(err) + } + + if len(entries) <= 0 { + t.Fatal(errors.New("no resource present")) + } + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestClusterAdminAlterConfig(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() @@ -544,6 +603,60 @@ func TestClusterAdminAlterConfig(t *testing.T) { } } +func TestClusterAdminAlterBrokerConfig(t *testing.T) { + controllerBroker := NewMockBroker(t, 1) + defer controllerBroker.Close() + configBroker := NewMockBroker(t, 2) + defer configBroker.Close() + + controllerBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(controllerBroker.BrokerID()). + SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). + SetBroker(configBroker.Addr(), configBroker.BrokerID()), + }) + configBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(controllerBroker.BrokerID()). + SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). + SetBroker(configBroker.Addr(), configBroker.BrokerID()), + "AlterConfigsRequest": NewMockAlterConfigsResponse(t), + }) + + config := NewConfig() + config.Version = V1_0_0_0 + admin, err := NewClusterAdmin( + []string{ + controllerBroker.Addr(), + configBroker.Addr(), + }, config) + if err != nil { + t.Fatal(err) + } + + var value string + entries := make(map[string]*string) + value = "3" + entries["min.insync.replicas"] = &value + + for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} { + resource := ConfigResource{Name: "2", Type: resourceType} + err = admin.AlterConfig( + resource.Type, + resource.Name, + entries, + false) + if err != nil { + t.Fatal(err) + } + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestClusterAdminCreateAcl(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() diff --git a/config_resource_type.go b/config_resource_type.go index 5399d75ca..bef1053aa 100644 --- a/config_resource_type.go +++ b/config_resource_type.go @@ -1,22 +1,18 @@ package sarama -//ConfigResourceType is a type for config resource +// ConfigResourceType is a type for resources that have configs. type ConfigResourceType int8 -// Taken from : -// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes +// Taken from: +// https://github.com/apache/kafka/blob/ed7c071e07f1f90e4c2895582f61ca090ced3c42/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L32-L55 const ( - //UnknownResource constant type - UnknownResource ConfigResourceType = iota - //AnyResource constant type - AnyResource - //TopicResource constant type - TopicResource - //GroupResource constant type - GroupResource - //ClusterResource constant type - ClusterResource - //BrokerResource constant type - BrokerResource + // UnknownResource constant type + UnknownResource ConfigResourceType = 0 + // TopicResource constant type + TopicResource ConfigResourceType = 2 + // BrokerResource constant type + BrokerResource ConfigResourceType = 4 + // BrokerLoggerResource constant type + BrokerLoggerResource ConfigResourceType = 8 ) diff --git a/mockresponses.go b/mockresponses.go index 7dcc93e36..72df4b363 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -736,6 +736,32 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder { for _, r := range req.Resources { var configEntries []*ConfigEntry switch r.Type { + case BrokerResource: + configEntries = append(configEntries, + &ConfigEntry{ + Name: "min.insync.replicas", + Value: "2", + ReadOnly: false, + Default: false, + }, + ) + res.Resources = append(res.Resources, &ResourceResponse{ + Name: r.Name, + Configs: configEntries, + }) + case BrokerLoggerResource: + configEntries = append(configEntries, + &ConfigEntry{ + Name: "kafka.controller.KafkaController", + Value: "DEBUG", + ReadOnly: false, + Default: false, + }, + ) + res.Resources = append(res.Resources, &ResourceResponse{ + Name: r.Name, + Configs: configEntries, + }) case TopicResource: configEntries = append(configEntries, &ConfigEntry{Name: "max.message.bytes", @@ -777,7 +803,7 @@ func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder { for _, r := range req.Resources { res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name, - Type: TopicResource, + Type: r.Type, ErrorMsg: "", }) }