diff --git a/admin.go b/admin.go index 59fd9983e..f0e37f301 100644 --- a/admin.go +++ b/admin.go @@ -70,6 +70,12 @@ type ClusterAdmin interface { // for some resources while fail for others. The configs for a particular resource are updated automatically. AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error + // IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options. + // This operation is supported by brokers with version 2.3.0.0 or higher. + // Updates are not transactional so they may succeed for some resources while fail for others. + // The configs for a particular resource are updated automatically. + IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error + // Creates access control lists (ACLs) which are bound to specific resources. // This operation is not transactional so it may succeed for some ACLs while fail for others. // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but @@ -731,6 +737,58 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string return nil } +func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error { + var resources []*IncrementalAlterConfigsResource + resources = append(resources, &IncrementalAlterConfigsResource{ + Type: resourceType, + Name: name, + ConfigEntries: entries, + }) + + request := &IncrementalAlterConfigsRequest{ + Resources: resources, + ValidateOnly: validateOnly, + } + + var ( + b *Broker + err error + ) + + // AlterConfig of broker/broker logger must be sent to the broker in question + if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) { + var id int64 + id, err = strconv.ParseInt(name, 10, 32) + if err != nil { + return err + } + b, err = ca.findBroker(int32(id)) + } else { + b, err = ca.findAnyBroker() + } + if err != nil { + return err + } + + _ = b.Open(ca.client.Config()) + rsp, err := b.IncrementalAlterConfigs(request) + if err != nil { + return err + } + + for _, rspResource := range rsp.Resources { + if rspResource.Name == name { + if rspResource.ErrorMsg != "" { + return errors.New(rspResource.ErrorMsg) + } + if rspResource.ErrorCode != 0 { + return KError(rspResource.ErrorCode) + } + } + } + return nil +} + func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error { var acls []*AclCreation acls = append(acls, &AclCreation{resource, acl}) diff --git a/admin_test.go b/admin_test.go index f6e06fd62..57b59c374 100644 --- a/admin_test.go +++ b/admin_test.go @@ -911,6 +911,148 @@ func TestClusterAdminAlterBrokerConfig(t *testing.T) { } } +func TestClusterAdminIncrementalAlterConfig(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "IncrementalAlterConfigsRequest": NewMockIncrementalAlterConfigsResponse(t), + }) + + config := NewTestConfig() + config.Version = V2_3_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + var value string + entries := make(map[string]IncrementalAlterConfigsEntry) + value = "60000" + entries["retention.ms"] = IncrementalAlterConfigsEntry{ + Operation: IncrementalAlterConfigsOperationSet, + Value: &value, + } + value = "1073741824" + entries["segment.bytes"] = IncrementalAlterConfigsEntry{ + Operation: IncrementalAlterConfigsOperationDelete, + Value: &value, + } + err = admin.IncrementalAlterConfig(TopicResource, "my_topic", entries, false) + if err != nil { + t.Fatal(err) + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + +func TestClusterAdminIncrementalAlterConfigWithErrorCode(t *testing.T) { + seedBroker := NewMockBroker(t, 1) + defer seedBroker.Close() + + seedBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(seedBroker.BrokerID()). + SetBroker(seedBroker.Addr(), seedBroker.BrokerID()), + "IncrementalAlterConfigsRequest": NewMockIncrementalAlterConfigsResponseWithErrorCode(t), + }) + + config := NewTestConfig() + config.Version = V2_3_0_0 + admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + defer func() { + _ = admin.Close() + }() + + var value string + entries := make(map[string]IncrementalAlterConfigsEntry) + value = "60000" + entries["retention.ms"] = IncrementalAlterConfigsEntry{ + Operation: IncrementalAlterConfigsOperationSet, + Value: &value, + } + value = "1073741824" + entries["segment.bytes"] = IncrementalAlterConfigsEntry{ + Operation: IncrementalAlterConfigsOperationDelete, + Value: &value, + } + err = admin.IncrementalAlterConfig(TopicResource, "my_topic", entries, false) + if err == nil { + t.Fatal(errors.New("ErrorCode present but no Error returned")) + } +} + +func TestClusterAdminIncrementalAlterBrokerConfig(t *testing.T) { + controllerBroker := NewMockBroker(t, 1) + defer controllerBroker.Close() + configBroker := NewMockBroker(t, 2) + defer configBroker.Close() + + controllerBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(controllerBroker.BrokerID()). + SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). + SetBroker(configBroker.Addr(), configBroker.BrokerID()), + }) + configBroker.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetController(controllerBroker.BrokerID()). + SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()). + SetBroker(configBroker.Addr(), configBroker.BrokerID()), + "IncrementalAlterConfigsRequest": NewMockIncrementalAlterConfigsResponse(t), + }) + + config := NewTestConfig() + config.Version = V2_3_0_0 + admin, err := NewClusterAdmin( + []string{ + controllerBroker.Addr(), + configBroker.Addr(), + }, config) + if err != nil { + t.Fatal(err) + } + + var value string + entries := make(map[string]IncrementalAlterConfigsEntry) + value = "3" + entries["min.insync.replicas"] = IncrementalAlterConfigsEntry{ + Operation: IncrementalAlterConfigsOperationSet, + Value: &value, + } + value = "2" + entries["log.cleaner.threads"] = IncrementalAlterConfigsEntry{ + Operation: IncrementalAlterConfigsOperationDelete, + Value: &value, + } + + for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} { + resource := ConfigResource{Name: "2", Type: resourceType} + err = admin.IncrementalAlterConfig( + resource.Type, + resource.Name, + entries, + false) + if err != nil { + t.Fatal(err) + } + } + + err = admin.Close() + if err != nil { + t.Fatal(err) + } +} + func TestClusterAdminCreateAcl(t *testing.T) { seedBroker := NewMockBroker(t, 1) defer seedBroker.Close() diff --git a/mockresponses.go b/mockresponses.go index a5e94fb39..4c9cac809 100644 --- a/mockresponses.go +++ b/mockresponses.go @@ -926,6 +926,51 @@ func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) e return res } +type MockIncrementalAlterConfigsResponse struct { + t TestReporter +} + +func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse { + return &MockIncrementalAlterConfigsResponse{t: t} +} + +func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*IncrementalAlterConfigsRequest) + res := &IncrementalAlterConfigsResponse{} + + for _, r := range req.Resources { + res.Resources = append(res.Resources, &AlterConfigsResourceResponse{ + Name: r.Name, + Type: r.Type, + ErrorMsg: "", + }) + } + return res +} + +type MockIncrementalAlterConfigsResponseWithErrorCode struct { + t TestReporter +} + +func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode { + return &MockIncrementalAlterConfigsResponseWithErrorCode{t: t} +} + +func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader { + req := reqBody.(*IncrementalAlterConfigsRequest) + res := &IncrementalAlterConfigsResponse{} + + for _, r := range req.Resources { + res.Resources = append(res.Resources, &AlterConfigsResourceResponse{ + Name: r.Name, + Type: r.Type, + ErrorCode: 83, + ErrorMsg: "", + }) + } + return res +} + type MockCreateAclsResponse struct { t TestReporter }