Skip to content

Commit

Permalink
Support IncrementalAlterConfigs API in admin.go
Browse files Browse the repository at this point in the history
  • Loading branch information
fengyinqiao committed Dec 11, 2021
1 parent 024e359 commit dd5d74e
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 0 deletions.
58 changes: 58 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ type ClusterAdmin interface {
// for some resources while fail for others. The configs for a particular resource are updated automatically.
AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error

// IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
// This operation is supported by brokers with version 2.3.0.0 or higher.
// Updates are not transactional so they may succeed for some resources while fail for others.
// The configs for a particular resource are updated automatically.
IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error

// Creates access control lists (ACLs) which are bound to specific resources.
// This operation is not transactional so it may succeed for some ACLs while fail for others.
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
Expand Down Expand Up @@ -731,6 +737,58 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string
return nil
}

func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error {
var resources []*IncrementalAlterConfigsResource
resources = append(resources, &IncrementalAlterConfigsResource{
Type: resourceType,
Name: name,
ConfigEntries: entries,
})

request := &IncrementalAlterConfigsRequest{
Resources: resources,
ValidateOnly: validateOnly,
}

var (
b *Broker
err error
)

// AlterConfig of broker/broker logger must be sent to the broker in question
if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
var id int64
id, err = strconv.ParseInt(name, 10, 32)
if err != nil {
return err
}
b, err = ca.findBroker(int32(id))
} else {
b, err = ca.findAnyBroker()
}
if err != nil {
return err
}

_ = b.Open(ca.client.Config())
rsp, err := b.IncrementalAlterConfigs(request)
if err != nil {
return err
}

for _, rspResource := range rsp.Resources {
if rspResource.Name == name {
if rspResource.ErrorMsg != "" {
return errors.New(rspResource.ErrorMsg)
}
if rspResource.ErrorCode != 0 {
return KError(rspResource.ErrorCode)
}
}
}
return nil
}

func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
var acls []*AclCreation
acls = append(acls, &AclCreation{resource, acl})
Expand Down
142 changes: 142 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,148 @@ func TestClusterAdminAlterBrokerConfig(t *testing.T) {
}
}

func TestClusterAdminIncrementalAlterConfig(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"IncrementalAlterConfigsRequest": NewMockIncrementalAlterConfigsResponse(t),
})

config := NewTestConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

var value string
entries := make(map[string]IncrementalAlterConfigsEntry)
value = "60000"
entries["retention.ms"] = IncrementalAlterConfigsEntry{
Operation: IncrementalAlterConfigsOperationSet,
Value: &value,
}
value = "1073741824"
entries["segment.bytes"] = IncrementalAlterConfigsEntry{
Operation: IncrementalAlterConfigsOperationDelete,
Value: &value,
}
err = admin.IncrementalAlterConfig(TopicResource, "my_topic", entries, false)
if err != nil {
t.Fatal(err)
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminIncrementalAlterConfigWithErrorCode(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"IncrementalAlterConfigsRequest": NewMockIncrementalAlterConfigsResponseWithErrorCode(t),
})

config := NewTestConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer func() {
_ = admin.Close()
}()

var value string
entries := make(map[string]IncrementalAlterConfigsEntry)
value = "60000"
entries["retention.ms"] = IncrementalAlterConfigsEntry{
Operation: IncrementalAlterConfigsOperationSet,
Value: &value,
}
value = "1073741824"
entries["segment.bytes"] = IncrementalAlterConfigsEntry{
Operation: IncrementalAlterConfigsOperationDelete,
Value: &value,
}
err = admin.IncrementalAlterConfig(TopicResource, "my_topic", entries, false)
if err == nil {
t.Fatal(errors.New("ErrorCode present but no Error returned"))
}
}

func TestClusterAdminIncrementalAlterBrokerConfig(t *testing.T) {
controllerBroker := NewMockBroker(t, 1)
defer controllerBroker.Close()
configBroker := NewMockBroker(t, 2)
defer configBroker.Close()

controllerBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
})
configBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
"IncrementalAlterConfigsRequest": NewMockIncrementalAlterConfigsResponse(t),
})

config := NewTestConfig()
config.Version = V2_3_0_0
admin, err := NewClusterAdmin(
[]string{
controllerBroker.Addr(),
configBroker.Addr(),
}, config)
if err != nil {
t.Fatal(err)
}

var value string
entries := make(map[string]IncrementalAlterConfigsEntry)
value = "3"
entries["min.insync.replicas"] = IncrementalAlterConfigsEntry{
Operation: IncrementalAlterConfigsOperationSet,
Value: &value,
}
value = "2"
entries["log.cleaner.threads"] = IncrementalAlterConfigsEntry{
Operation: IncrementalAlterConfigsOperationDelete,
Value: &value,
}

for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
resource := ConfigResource{Name: "2", Type: resourceType}
err = admin.IncrementalAlterConfig(
resource.Type,
resource.Name,
entries,
false)
if err != nil {
t.Fatal(err)
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminCreateAcl(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down
45 changes: 45 additions & 0 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,51 @@ func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) e
return res
}

type MockIncrementalAlterConfigsResponse struct {
t TestReporter
}

func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse {
return &MockIncrementalAlterConfigsResponse{t: t}
}

func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*IncrementalAlterConfigsRequest)
res := &IncrementalAlterConfigsResponse{}

for _, r := range req.Resources {
res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
Name: r.Name,
Type: r.Type,
ErrorMsg: "",
})
}
return res
}

type MockIncrementalAlterConfigsResponseWithErrorCode struct {
t TestReporter
}

func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode {
return &MockIncrementalAlterConfigsResponseWithErrorCode{t: t}
}

func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*IncrementalAlterConfigsRequest)
res := &IncrementalAlterConfigsResponse{}

for _, r := range req.Resources {
res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
Name: r.Name,
Type: r.Type,
ErrorCode: 83,
ErrorMsg: "",
})
}
return res
}

type MockCreateAclsResponse struct {
t TestReporter
}
Expand Down

0 comments on commit dd5d74e

Please sign in to comment.