Skip to content

Commit

Permalink
feat: remove spaces from broker lists implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
osemrt committed Dec 11, 2024
1 parent 0c34509 commit 92891ac
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 0 deletions.
9 changes: 9 additions & 0 deletions consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ func (cfg ReaderConfig) JSON() string {
cfg.MaxWait, cfg.CommitInterval, kcronsumer.ToStringOffset(cfg.StartOffset))
}

func (cfg *ReaderConfig) removeSpaceBrokerList() {
for i := range cfg.Brokers {
cfg.Brokers[i] = strings.TrimSpace(cfg.Brokers[i])
}
}

func (cfg *ConsumerConfig) JSON() string {
if cfg == nil {
return "{}"
Expand Down Expand Up @@ -271,7 +277,10 @@ func (cfg *ConsumerConfig) newKafkaReader() (Reader, error) {
return nil, err
}

cfg.Reader.removeSpaceBrokerList()

readerCfg := kafka.ReaderConfig(cfg.Reader)

readerCfg.Dialer = dialer
if cfg.Rack != "" {
readerCfg.GroupBalancers = []kafka.GroupBalancer{kafka.RackAffinityGroupBalancer{Rack: cfg.Rack}}
Expand Down
43 changes: 43 additions & 0 deletions consumer_config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -356,6 +357,48 @@ func TestConsumerConfig_JSONPretty(t *testing.T) {
})
}

func TestConsumerConfig_removeSpaceBrokerList(t *testing.T) {
type fields struct {
Reader ReaderConfig
}
tests := []struct {
name string
fields fields
expected []string
}{
{
name: "Should_Remove_Spaces_In_Broker_Lists",
fields: fields{
Reader: ReaderConfig{Brokers: []string{" address", "address ", " address "}},
},
expected: []string{"address", "address", "address"},
},
{
name: "Should_Do_Nothing_When_Broker_Lists_Have_Not_Any_Space",
fields: fields{
Reader: ReaderConfig{Brokers: []string{"address1", "address2", "address3"}},
},
expected: []string{"address1", "address2", "address3"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Given
cfg := &ConsumerConfig{
Reader: tt.fields.Reader,
}

// When
cfg.Reader.removeSpaceBrokerList()

// Then
if !reflect.DeepEqual(cfg.Reader.Brokers, tt.expected) {
t.Errorf("For broker list %v, expected %v", cfg.Reader.Brokers, tt.expected)
}
})
}
}

func getConsumerConfigExample() *ConsumerConfig {
return &ConsumerConfig{
Rack: "stage",
Expand Down
1 change: 1 addition & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type producer struct {
}

func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) {
cfg.Writer.removeSpaceBrokerList()
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Writer.Brokers...),
Topic: cfg.Writer.Topic,
Expand Down
6 changes: 6 additions & 0 deletions producer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func (cfg WriterConfig) JSON() string {
strings.Join(cfg.Brokers, "\", \""), GetBalancerString(cfg.Balancer), cfg.Compression.String())
}

func (cfg *WriterConfig) removeSpaceBrokerList() {
for i := range cfg.Brokers {
cfg.Brokers[i] = strings.TrimSpace(cfg.Brokers[i])
}
}

type TransportConfig struct {
MetadataTopics []string
DialTimeout time.Duration
Expand Down
43 changes: 43 additions & 0 deletions producer_config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"reflect"
"testing"

"github.com/segmentio/kafka-go"
Expand Down Expand Up @@ -90,6 +91,48 @@ func TestProducerConfig_JsonPretty(t *testing.T) {
})
}

func TestProducerConfig_removeSpaceBrokerList(t *testing.T) {
type fields struct {
Brokers []string
}
tests := []struct {
name string
fields fields
expected []string
}{
{
name: "Should_Remove_Spaces_In_Broker_Lists",
fields: fields{
Brokers: []string{" address", "address ", " address "},
},
expected: []string{"address", "address", "address"},
},
{
name: "Should_Do_Nothing_When_Broker_Lists_Have_Not_Any_Space",
fields: fields{
Brokers: []string{"address1", "address2", "address3"},
},
expected: []string{"address1", "address2", "address3"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Given
cfg := &WriterConfig{
Brokers: tt.fields.Brokers,
}

// When
cfg.removeSpaceBrokerList()

// Then
if !reflect.DeepEqual(cfg.Brokers, tt.expected) {
t.Errorf("For broker list %v, expected %v", cfg.Brokers, tt.expected)
}
})
}
}

func TestProducerConfig_String(t *testing.T) {
t.Run("Should_Convert_To_String", func(t *testing.T) {
// Given
Expand Down

0 comments on commit 92891ac

Please sign in to comment.