Skip to content

Commit

Permalink
make init phase more robust if cluster is offline
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Apr 5, 2024
1 parent 569fa82 commit a50eaf7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 21 deletions.
3 changes: 2 additions & 1 deletion e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func (s *Service) startConsumeMessages(ctx context.Context, initializedCh chan<- bool) {
client := s.client

s.logger.Info("Starting to consume end-to-end topic",
s.logger.Info("starting to consume end-to-end topic",
zap.String("topic_name", s.config.TopicManagement.Name),
zap.String("group_id", s.groupId))

Expand All @@ -24,6 +24,7 @@ func (s *Service) startConsumeMessages(ctx context.Context, initializedCh chan<-
if !isInitialized {
isInitialized = true
initializedCh <- true
close(initializedCh)
}

// Log all errors and continue afterwards as we might get errors and still have some fetch results
Expand Down
61 changes: 41 additions & 20 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
"strings"
"time"

"github.com/cloudhut/kminion/v2/kafka"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"

"github.com/cloudhut/kminion/v2/kafka"
)

type Service struct {
Expand Down Expand Up @@ -49,30 +50,32 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
groupID := fmt.Sprintf("%v-%v", cfg.Consumer.GroupIdPrefix, minionID)

// Producer options
var kgoOpts []kgo.Opt
kgoOpts := []kgo.Opt{
kgo.ProduceRequestTimeout(3 * time.Second),
kgo.RecordRetries(3),
// We use the manual partitioner so that the records' partition id will be used as target partitio
kgo.RecordPartitioner(kgo.ManualPartitioner()),
}
if cfg.Producer.RequiredAcks == "all" {
kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.AllISRAcks()))
} else {
kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.LeaderAck()))
kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite())
}
kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(3*time.Second))

// Consumer configs
kgoOpts = append(kgoOpts,
kgo.ConsumerGroup(groupID),
kgo.ConsumeTopics(cfg.TopicManagement.Name),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.DisableAutoCommit(),
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()))
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()),
)

// Prepare hooks
hooks := newEndToEndClientHooks(logger)
kgoOpts = append(kgoOpts, kgo.WithHooks(hooks))

// We use the manual partitioner so that the records' partition id will be used as target partition
kgoOpts = append(kgoOpts, kgo.RecordPartitioner(kgo.ManualPartitioner()))

// Create kafka service and check if client can successfully connect to Kafka cluster
logger.Info("connecting to Kafka seed brokers, trying to fetch cluster metadata",
zap.String("seed_brokers", strings.Join(kafkaSvc.Brokers(), ",")))
Expand Down Expand Up @@ -163,26 +166,39 @@ func (s *Service) Start(ctx context.Context) error {
// finally start everything else (producing, consuming, continuous validation, consumer group tracking)
go s.startReconciliation(ctx)

// Start consumer and wait until we've received a response for the first poll which would indicate that the
// consumer is ready. Only if the consumer is ready we want to start the producer to ensure that we will not
// miss messages because the consumer wasn't ready.
initCh := make(chan bool)
// Start consumer and wait until we've received a response for the first poll
// which would indicate that the consumer is ready. Only if the consumer is
// ready we want to start the e2e producer to ensure that we will not miss
// messages because the consumer wasn't ready. However, if this initialization
// does not succeed within 30s we have to assume, that something is wrong on the
// consuming or producing side. KMinion is supposed to report these kind of
// issues and therefore this should not block KMinion from starting.
initCh := make(chan bool, 1)
s.logger.Info("initializing consumer and waiting until it has received the first record batch")
go s.startConsumeMessages(ctx, initCh)

// Produce an init message until the consumer received at least one fetch
initTicker := time.NewTicker(1 * time.Second)
isInitialized := false
// send first init message immediately
sendInitMessage(ctx, s.client, s.config.TopicManagement.Name)

// We send a first message immediately, but we'll keep sending more messages later
// since the consumers start at the latest offset and may have missed this message.
initCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
s.sendInitMessage(initCtx, s.client, s.config.TopicManagement.Name)

for !isInitialized {
select {
case <-initTicker.C:
sendInitMessage(ctx, s.client, s.config.TopicManagement.Name)
s.sendInitMessage(initCtx, s.client, s.config.TopicManagement.Name)
case <-initCh:
isInitialized = true
s.logger.Info("consumer has been successfully initialized")
case <-initCtx.Done():
// At this point we just assume the consumers are running fine.
// The entire cluster may be down or producing fails.
s.logger.Warn("initializing the consumers timed out, proceeding with the startup")
isInitialized = true
case <-ctx.Done():
return nil
}
Expand All @@ -198,12 +214,17 @@ func (s *Service) Start(ctx context.Context) error {
return nil
}

func sendInitMessage(ctx context.Context, client *kgo.Client, topicName string) {
client.Produce(ctx, &kgo.Record{
Key: []byte("init-message"),
Value: nil,
Topic: topicName,
}, nil)
func (s *Service) sendInitMessage(ctx context.Context, client *kgo.Client, topicName string) {
// Try to produce one record into each partition. This is important because
// one or more partitions may be offline, while others may still be writable.
for i := 0; i < s.partitionCount; i++ {
client.TryProduce(ctx, &kgo.Record{
Key: []byte("init-message"),
Value: nil,
Topic: topicName,
Partition: int32(i),
}, nil)
}
}

func (s *Service) startReconciliation(ctx context.Context) {
Expand Down

0 comments on commit a50eaf7

Please sign in to comment.