Skip to content

Commit

Permalink
Update to franz-go v0.8.0
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Jun 10, 2021
1 parent 1ffd02b commit 8157df5
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 152 deletions.
16 changes: 4 additions & 12 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,9 @@ import (

func (s *Service) startConsumeMessages(ctx context.Context) {
client := s.client
topicName := s.config.TopicManagement.Name
topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicName)
client.AssignPartitions(topic)

// Create our own consumer group
client.AssignGroup(s.groupId,
kgo.GroupTopics(topicName),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.DisableAutoCommit(),
)
s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", s.groupId))
s.logger.Info("Starting to consume end-to-end topic",
zap.String("topicName", s.config.TopicManagement.Name),
zap.String("groupId", s.groupId))

for {
select {
Expand Down Expand Up @@ -59,7 +51,7 @@ func (s *Service) commitOffsets(ctx context.Context) {
}

startCommitTimestamp := time.Now()
client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
client.CommitOffsets(ctx, uncommittedOffset, func(_ *kgo.Client, req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
// Got commit response
latency := time.Since(startCommitTimestamp)

Expand Down
76 changes: 0 additions & 76 deletions e2e/partitioner.go

This file was deleted.

81 changes: 36 additions & 45 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ type Service struct {
client *kgo.Client

// Service
minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time
groupId string // our own consumer group
groupTracker *groupTracker // tracks consumer groups starting with the kminion prefix and deletes them if they are unused for some time
messageTracker *messageTracker // tracks successfully produced messages,
clientHooks *clientHooks // logs broker events, tracks the coordinator (i.e. which broker last responded to our offset commit)
partitioner *customPartitioner // takes care of sending our end-to-end messages to the right partition
partitionCount int // number of partitions of our test topic, used to send messages to all partitions
minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time
groupId string // our own consumer group
groupTracker *groupTracker // tracks consumer groups starting with the kminion prefix and deletes them if they are unused for some time
messageTracker *messageTracker // tracks successfully produced messages,
clientHooks *clientHooks // logs broker events, tracks the coordinator (i.e. which broker last responded to our offset commit)
partitionCount int // number of partitions of our test topic, used to send messages to all partitions

// Metrics
endToEndMessagesProduced prometheus.Counter
Expand All @@ -43,24 +42,48 @@ type Service struct {

// NewService creates a new instance of the e2e moinitoring service (wow)
func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricNamespace string, ctx context.Context) (*Service, error) {
minionID := uuid.NewString()
groupID := fmt.Sprintf("%v-%v", cfg.Consumer.GroupIdPrefix, minionID)

client, hooks, partitioner, err := createKafkaClient(cfg, logger, kafkaSvc, ctx)
// Producer options
var kgoOpts []kgo.Opt
if cfg.Producer.RequiredAcks == "all" {
kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.AllISRAcks()))
} else {
kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.LeaderAck()))
kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite())
}
kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(cfg.Producer.AckSla))

// Consumer configs
kgoOpts = append(kgoOpts,
kgo.ConsumerGroup(groupID),
kgo.ConsumeTopics(cfg.TopicManagement.Name),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.DisableAutoCommit())

// Prepare hooks
hooks := newEndToEndClientHooks(logger)
kgoOpts = append(kgoOpts, kgo.WithHooks(hooks))

// We use the manual partitioner so that the records' partition id will be used as target partition
kgoOpts = append(kgoOpts, kgo.RecordPartitioner(kgo.ManualPartitioner()))

// Create kafka service and check if client can successfully connect to Kafka cluster
client, err := kafkaSvc.CreateAndTestClient(ctx, logger, kgoOpts)
if err != nil {
return nil, fmt.Errorf("failed to create kafka client for e2e: %w", err)
}

minionId := uuid.NewString()

svc := &Service{
config: cfg,
logger: logger.Named("e2e"),
kafkaSvc: kafkaSvc,
client: client,

minionID: minionId,
groupId: fmt.Sprintf("%v-%v", cfg.Consumer.GroupIdPrefix, minionId),
minionID: minionID,
groupId: groupID,
clientHooks: hooks,
partitioner: partitioner,
}

svc.groupTracker = newGroupTracker(svc, ctx)
Expand Down Expand Up @@ -101,37 +124,6 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN
return svc, nil
}

func createKafkaClient(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, ctx context.Context) (*kgo.Client, *clientHooks, *customPartitioner, error) {

// Add RequiredAcks, as options can't be altered later
kgoOpts := []kgo.Opt{}

if cfg.Producer.RequiredAcks == "all" {
kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.AllISRAcks()))
} else {
kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.LeaderAck()))
kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite())
}

// produce request timeout
kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(cfg.Producer.AckSla))

// Prepare hooks
e2eHooks := newEndToEndClientHooks(logger)
kgoOpts = append(kgoOpts, kgo.WithHooks(e2eHooks))

// Use a custom partitioner that uses the 'PartitionID' of a record to directly assign the right partition
partitioner := &customPartitioner{
logger: logger.Named("e2e-partitioner"),
expectedPartitionCount: 0, // not yet known, will be set before we start producing
}
kgoOpts = append(kgoOpts, kgo.RecordPartitioner(partitioner))

// Create kafka service and check if client can successfully connect to Kafka cluster
client, err := kafkaSvc.CreateAndTestClient(logger, kgoOpts, ctx)
return client, e2eHooks, partitioner, err
}

// Start starts the service (wow)
func (s *Service) Start(ctx context.Context) error {

Expand All @@ -146,7 +138,6 @@ func (s *Service) Start(ctx context.Context) error {
return fmt.Errorf("could not get topic metadata after validation: %w", err)
}
partitions := len(topicMetadata.Topics[0].Partitions)
s.partitioner.expectedPartitionCount = partitions
s.partitionCount = partitions

// finally start everything else (producing, consuming, continous validation, consumer group tracking)
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.10.0
github.com/prometheus/common v0.24.0 // indirect
github.com/twmb/franz-go v0.7.1
github.com/twmb/franz-go v0.8.1-0.20210607222818-6808a5539cf2
go.uber.org/atomic v1.7.0
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20210510120150-4163338589ed // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.12.2 h1:2KCfW3I9M7nSc5wOqXAlW2v2U6v+w6cbjvbfp+OykW8=
github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.0 h1:2T7tUoQrQT+fQWdaY5rjWztFGAFwbGD04iPJg90ZiOs=
github.com/klauspost/compress v1.13.0/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/knadh/koanf v0.16.0 h1:qQqGvE8hs/y5pZTG5kT354vqUqsDKQcXX8IOq2Rg11Y=
github.com/knadh/koanf v0.16.0/go.mod h1:DMZ6jQlhA3PqxnKR63luVaBtDemi/m8v/FpXI7B5Ez8=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -276,8 +276,8 @@ github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.6 h1:ueMTcBBFrbT8K4uGDNNZPa8Z7LtPV7Cl0TDjaeHxP44=
github.com/pierrec/lz4/v4 v4.1.6/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.7 h1:UDV9geJWhFIufAliH7HQlz9wP3JA0t748w+RwbWMLow=
github.com/pierrec/lz4/v4 v4.1.7/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -348,8 +348,10 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/twmb/franz-go v0.7.1 h1:oUS5zLzqX00NWnv8XmJKMjGpHwcjbMyUZVng7YLighI=
github.com/twmb/franz-go v0.7.1/go.mod h1:StwVC7bQkTM3I6DJyNGvmgpnza7Tz11YfLACXwMvQ0k=
github.com/twmb/franz-go v0.8.0 h1:DFe9ptohEBtzuFyDKpUM1d39h+jkuEg/fEudDHqKhyw=
github.com/twmb/franz-go v0.8.0/go.mod h1:v6QnB3abhlVAzlIEIO5L/1Emu8NlkreCI2HSps9utH0=
github.com/twmb/franz-go v0.8.1-0.20210607222818-6808a5539cf2 h1:dmZRYgpuU7I9O+AXU9l4ZSAXZhvw1mMVA9vu+F08KRE=
github.com/twmb/franz-go v0.8.1-0.20210607222818-6808a5539cf2/go.mod h1:v6QnB3abhlVAzlIEIO5L/1Emu8NlkreCI2HSps9utH0=
github.com/twmb/go-rbtree v1.0.0 h1:KxN7dXJ8XaZ4cvmHV1qqXTshxX3EBvX/toG5+UR49Mg=
github.com/twmb/go-rbtree v1.0.0/go.mod h1:UlIAI8gu3KRPkXSobZnmJfVwCJgEhD/liWzT5ppzIyc=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
Expand Down Expand Up @@ -384,7 +386,6 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -420,9 +421,8 @@ golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I=
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
4 changes: 2 additions & 2 deletions kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func NewService(cfg Config, logger *zap.Logger) *Service {
}
}

// Create a client with the services default settings
// CreateAndTestClient creates a client with the services default settings
// logger: will be used to log connections, errors, warnings about tls config, ...
func (s *Service) CreateAndTestClient(logger *zap.Logger, opts []kgo.Opt, ctx context.Context) (*kgo.Client, error) {
func (s *Service) CreateAndTestClient(ctx context.Context, logger *zap.Logger, opts []kgo.Opt) (*kgo.Client, error) {
// Config with default options
kgoOpts, err := NewKgoConfig(s.cfg, logger)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions minion/offset_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
// methods where they'll be decoded and further processed.
func (s *Service) startConsumingOffsets(ctx context.Context) {
client := s.client
topic := kgo.ConsumeTopics(kgo.NewOffset().AtStart(), "__consumer_offsets")
client.AssignPartitions(topic)

s.logger.Info("starting to consume messages from offsets topic")
go s.checkIfConsumerLagIsCaughtUp(ctx)
Expand Down
8 changes: 6 additions & 2 deletions minion/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metrics
// Kafka client
hooksChildLogger := logger.With(zap.String("source", "minion_kafka_client"))
minionHooks := newMinionClientHooks(hooksChildLogger, metricsNamespace)
kgoOpts := []kgo.Opt{kgo.WithHooks(minionHooks)}
kgoOpts := []kgo.Opt{
kgo.WithHooks(minionHooks),
kgo.ConsumeTopics("__consumer_offsets"),
kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),
}

client, err := kafkaSvc.CreateAndTestClient(logger, kgoOpts, ctx)
client, err := kafkaSvc.CreateAndTestClient(ctx, logger, kgoOpts)
if err != nil {
return nil, fmt.Errorf("failed to create kafka client: %w", err)
}
Expand Down

0 comments on commit 8157df5

Please sign in to comment.