Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use the broker for any admin on BrokerConfig #1571

Merged
merged 1 commit into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package sarama

import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
)

Expand Down Expand Up @@ -226,6 +228,16 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
return response.Brokers, response.ControllerID, nil
}

func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
brokers := ca.client.Brokers()
for _, b := range brokers {
if b.ID() == id {
return b, nil
}
}
return nil, fmt.Errorf("could not find broker id %d", id)
}

func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
brokers := ca.client.Brokers()
if len(brokers) > 0 {
Expand Down Expand Up @@ -432,6 +444,13 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
return nil
}

// Returns a bool indicating whether the resource request needs to go to a
// specific broker
func dependsOnSpecificNode(resource ConfigResource) bool {
return (resource.Type == BrokerResource && resource.Name != "") ||
resource.Type == BrokerLoggerResource
}

func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {

var entries []ConfigEntry
Expand All @@ -442,11 +461,23 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry,
Resources: resources,
}

b, err := ca.Controller()
var (
b *Broker
err error
)

// DescribeConfig of broker/broker logger must be sent to the broker in question
if dependsOnSpecificNode(resource) {
id, _ := strconv.Atoi(resource.Name)
b, err = ca.findBroker(int32(id))
} else {
b, err = ca.findAnyBroker()
}
if err != nil {
return nil, err
}

_ = b.Open(ca.client.Config())
rsp, err := b.DescribeConfigs(request)
if err != nil {
return nil, err
Expand Down Expand Up @@ -479,11 +510,23 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string
ValidateOnly: validateOnly,
}

b, err := ca.Controller()
var (
b *Broker
err error
)

// AlterConfig of broker/broker logger must be sent to the broker in question
if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
id, _ := strconv.Atoi(name)
b, err = ca.findBroker(int32(id))
} else {
b, err = ca.findAnyBroker()
}
if err != nil {
return err
}

_ = b.Open(ca.client.Config())
rsp, err := b.AlterConfigs(request)
if err != nil {
return err
Expand Down
113 changes: 113 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package sarama

import (
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"testing"
)
Expand Down Expand Up @@ -511,6 +515,61 @@ func TestClusterAdminDescribeConfig(t *testing.T) {
}
}

// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
// is sent to the broker in the resource struct, _not_ the controller
func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
Logger = log.New(os.Stdout, fmt.Sprintf("[%s] ", t.Name()), log.LstdFlags)
defer func() { Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) }()

controllerBroker := NewMockBroker(t, 1)
defer controllerBroker.Close()
configBroker := NewMockBroker(t, 2)
defer configBroker.Close()

controllerBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
})

configBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
})

config := NewConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin(
[]string{
controllerBroker.Addr(),
configBroker.Addr(),
}, config)
if err != nil {
t.Fatal(err)
}

for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
resource := ConfigResource{Name: "2", Type: resourceType}
entries, err := admin.DescribeConfig(resource)
if err != nil {
t.Fatal(err)
}

if len(entries) <= 0 {
t.Fatal(errors.New("no resource present"))
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminAlterConfig(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down Expand Up @@ -544,6 +603,60 @@ func TestClusterAdminAlterConfig(t *testing.T) {
}
}

func TestClusterAdminAlterBrokerConfig(t *testing.T) {
controllerBroker := NewMockBroker(t, 1)
defer controllerBroker.Close()
configBroker := NewMockBroker(t, 2)
defer configBroker.Close()

controllerBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
})
configBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
})

config := NewConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin(
[]string{
controllerBroker.Addr(),
configBroker.Addr(),
}, config)
if err != nil {
t.Fatal(err)
}

var value string
entries := make(map[string]*string)
value = "3"
entries["min.insync.replicas"] = &value

for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
resource := ConfigResource{Name: "2", Type: resourceType}
err = admin.AlterConfig(
resource.Type,
resource.Name,
entries,
false)
if err != nil {
t.Fatal(err)
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminCreateAcl(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down
26 changes: 11 additions & 15 deletions config_resource_type.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
package sarama

//ConfigResourceType is a type for config resource
// ConfigResourceType is a type for resources that have configs.
type ConfigResourceType int8

// Taken from :
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
// Taken from:
// https://github.com/apache/kafka/blob/ed7c071e07f1f90e4c2895582f61ca090ced3c42/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L32-L55

const (
//UnknownResource constant type
UnknownResource ConfigResourceType = iota
//AnyResource constant type
AnyResource
//TopicResource constant type
TopicResource
//GroupResource constant type
GroupResource
//ClusterResource constant type
ClusterResource
//BrokerResource constant type
BrokerResource
// UnknownResource constant type
UnknownResource ConfigResourceType = 0
// TopicResource constant type
TopicResource ConfigResourceType = 2
// BrokerResource constant type
BrokerResource ConfigResourceType = 4
// BrokerLoggerResource constant type
BrokerLoggerResource ConfigResourceType = 8
dnwe marked this conversation as resolved.
Show resolved Hide resolved
)
28 changes: 27 additions & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,32 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
for _, r := range req.Resources {
var configEntries []*ConfigEntry
switch r.Type {
case BrokerResource:
configEntries = append(configEntries,
&ConfigEntry{
Name: "min.insync.replicas",
Value: "2",
ReadOnly: false,
Default: false,
},
)
res.Resources = append(res.Resources, &ResourceResponse{
Name: r.Name,
Configs: configEntries,
})
case BrokerLoggerResource:
configEntries = append(configEntries,
&ConfigEntry{
Name: "kafka.controller.KafkaController",
Value: "DEBUG",
ReadOnly: false,
Default: false,
},
)
res.Resources = append(res.Resources, &ResourceResponse{
Name: r.Name,
Configs: configEntries,
})
case TopicResource:
configEntries = append(configEntries,
&ConfigEntry{Name: "max.message.bytes",
Expand Down Expand Up @@ -777,7 +803,7 @@ func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {

for _, r := range req.Resources {
res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
Type: TopicResource,
Type: r.Type,
ErrorMsg: "",
})
}
Expand Down