Skip to content

Commit

Permalink
Create a new ClusterAdmin at each loop
Browse files Browse the repository at this point in the history
- Fix sarama issue: IBM/sarama#1565

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi committed Jul 27, 2020
1 parent deaa219 commit 06495ad
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 53 deletions.
36 changes: 19 additions & 17 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"math"
"strings"
"sync"
"time"

"github.com/Shopify/sarama"
"go.uber.org/zap"
Expand Down Expand Up @@ -53,10 +52,16 @@ type Reconciler struct {

Resolver *resolver.URIResolver

KafkaClusterAdmin sarama.ClusterAdmin
KafkaClusterAdminLock sync.RWMutex
// TODO these configurations should live in each Broker configuration, so that we don't assume each
// Broker object use the same Kafka cluster
KafkaDefaultTopicDetails sarama.TopicDetail
KafkaDefaultTopicDetailsLock sync.RWMutex
bootstrapServers []string
bootstrapServersLock sync.RWMutex

// NewClusterAdmin creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can
// mock the function used during the reconciliation loop.
NewClusterAdmin func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error)

Configs *Configs
}
Expand Down Expand Up @@ -300,26 +305,23 @@ func (r *Reconciler) ConfigMapUpdated(ctx context.Context) func(configMap *corev
func (r *Reconciler) SetBootstrapServers(servers string) error {
addrs := strings.Split(servers, ",")

r.bootstrapServersLock.Lock()
r.bootstrapServers = addrs
r.bootstrapServersLock.Unlock()

return nil
}

func (r *Reconciler) getKafkaClusterAdmin(bootstrapServers []string) (sarama.ClusterAdmin, error) {
config := sarama.NewConfig()
config.Version = sarama.MaxVersion
config.Net.KeepAlive = time.Second * 60
config.Metadata.RefreshFrequency = time.Minute

kafkaClusterAdmin, err := NewClusterAdmin(addrs, config)
kafkaClusterAdmin, err := r.NewClusterAdmin(bootstrapServers, config)
if err != nil {
return fmt.Errorf("failed to create kafka cluster admin: %w", err)
return nil, fmt.Errorf("failed to create cluster admin: %w", err)
}

r.KafkaClusterAdminLock.Lock()
oldKafkaClusterAdmin := r.KafkaClusterAdmin
r.KafkaClusterAdmin = kafkaClusterAdmin
r.KafkaClusterAdminLock.Unlock()

if oldKafkaClusterAdmin != nil {
_ = oldKafkaClusterAdmin.Close()
}

return nil
return kafkaClusterAdmin, nil
}

func (r *Reconciler) SetDefaultTopicDetails(topicDetail sarama.TopicDetail) {
Expand Down
23 changes: 9 additions & 14 deletions control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,14 +960,6 @@ func useTable(t *testing.T, table TableTest, configs *Configs) {
onDeleteTopicError = want.(error)
}

clusterAdmin := &MockKafkaClusterAdmin{
ExpectedTopicName: fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName),
ExpectedTopicDetail: defaultTopicDetail,
ErrorOnCreateTopic: onCreateTopicError,
ErrorOnDeleteTopic: onDeleteTopicError,
T: t,
}

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -977,7 +969,15 @@ func useTable(t *testing.T, table TableTest, configs *Configs) {
DataPlaneConfigFormat: configs.DataPlaneConfigFormat,
SystemNamespace: configs.SystemNamespace,
},
KafkaClusterAdmin: clusterAdmin,
NewClusterAdmin: func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) {
return &MockKafkaClusterAdmin{
ExpectedTopicName: fmt.Sprintf("%s%s-%s", TopicPrefix, BrokerNamespace, BrokerName),
ExpectedTopicDetail: defaultTopicDetail,
ErrorOnCreateTopic: onCreateTopicError,
ErrorOnDeleteTopic: onDeleteTopicError,
T: t,
}, nil
},
KafkaDefaultTopicDetails: defaultTopicDetail,
KafkaDefaultTopicDetailsLock: sync.RWMutex{},
Configs: configs,
Expand Down Expand Up @@ -1018,10 +1018,6 @@ func useTable(t *testing.T, table TableTest, configs *Configs) {

func TestConfigMapUpdate(t *testing.T) {

NewClusterAdmin = func(addrs []string, conf *sarama.Config) (sarama.ClusterAdmin, error) {
return MockKafkaClusterAdmin{}, nil
}

cm := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "cmname",
Expand All @@ -1044,7 +1040,6 @@ func TestConfigMapUpdate(t *testing.T) {
NumPartitions: 42,
ReplicationFactor: 3,
})
assert.NotNil(t, reconciler.KafkaClusterAdmin)
}

func patchFinalizers() clientgotesting.PatchActionImpl {
Expand Down
2 changes: 0 additions & 2 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ const (
DefaultReplicationFactor = 1
)

var NewClusterAdmin = sarama.NewClusterAdmin

func NewController(ctx context.Context, watcher configmap.Watcher, configs *Configs) *controller.Impl {

eventing.RegisterAlternateBrokerConditionSet(ConditionSet)
Expand Down
5 changes: 0 additions & 5 deletions control-plane/pkg/reconciler/broker/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package broker
import (
"testing"

"github.com/Shopify/sarama"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -37,10 +36,6 @@ import (
func TestNewController(t *testing.T) {
ctx, _ := reconcilertesting.SetupFakeContext(t)

NewClusterAdmin = func(addrs []string, conf *sarama.Config) (sarama.ClusterAdmin, error) {
return nil, nil
}

configs := &Configs{
EnvConfigs: EnvConfigs{
SystemNamespace: "cm",
Expand Down
23 changes: 16 additions & 7 deletions control-plane/pkg/reconciler/broker/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@ import (
)

func (r *Reconciler) CreateTopic(broker *eventing.Broker) (string, error) {
r.bootstrapServersLock.RLock()
defer r.bootstrapServersLock.RUnlock()

topic := Topic(broker)
topicDetail := r.topicDetailFromBrokerConfig(broker)

r.KafkaClusterAdminLock.Lock()
defer r.KafkaClusterAdminLock.Unlock()
kafkaClusterAdmin, err := r.getKafkaClusterAdmin(r.bootstrapServers)
if err != nil {
return "", err
}

createTopicError := r.KafkaClusterAdmin.CreateTopic(topic, topicDetail, true)
createTopicError := kafkaClusterAdmin.CreateTopic(topic, topicDetail, true)
if err, ok := createTopicError.(*sarama.TopicError); ok && err.Err == sarama.ErrTopicAlreadyExists {
return topic, nil
}
Expand All @@ -39,13 +44,17 @@ func (r *Reconciler) CreateTopic(broker *eventing.Broker) (string, error) {
}

func (r *Reconciler) deleteTopic(broker *eventing.Broker) (string, error) {

r.KafkaClusterAdminLock.RLock()
defer r.KafkaClusterAdminLock.RUnlock()
r.bootstrapServersLock.RLock()
defer r.bootstrapServersLock.RUnlock()

topic := Topic(broker)

err := r.KafkaClusterAdmin.DeleteTopic(topic)
kafkaClusterAdmin, err := r.getKafkaClusterAdmin(r.bootstrapServers)
if err != nil {
return "", err
}

err = kafkaClusterAdmin.DeleteTopic(topic)
if sarama.ErrUnknownTopicOrPartition == err {
return topic, nil
}
Expand Down
18 changes: 10 additions & 8 deletions control-plane/pkg/reconciler/broker/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ func TestCreateTopicTopicAlreadyExists(t *testing.T) {
errMsg := "topic already exists"

r := broker.Reconciler{
KafkaClusterAdmin: reconcilertesting.MockKafkaClusterAdmin{
ExpectedTopicName: topic,
ExpectedTopicDetail: sarama.TopicDetail{},
ErrorOnCreateTopic: &sarama.TopicError{
Err: sarama.ErrTopicAlreadyExists,
ErrMsg: &errMsg,
},
T: t,
NewClusterAdmin: func(addrs []string, config *sarama.Config) (sarama.ClusterAdmin, error) {
return reconcilertesting.MockKafkaClusterAdmin{
ExpectedTopicName: topic,
ExpectedTopicDetail: sarama.TopicDetail{},
ErrorOnCreateTopic: &sarama.TopicError{
Err: sarama.ErrTopicAlreadyExists,
ErrMsg: &errMsg,
},
T: t,
}, nil
},
}

Expand Down

0 comments on commit 06495ad

Please sign in to comment.