From 7c2e82e798eeeb05cab6e96a881c204fdfa490ec Mon Sep 17 00:00:00 2001 From: Alena Varkockova Date: Wed, 3 Nov 2021 09:51:38 +0100 Subject: [PATCH] Send first e2e init message immediately Otherwise right now we wait for 16 minutes until that happens. --- e2e/service.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/e2e/service.go b/e2e/service.go index 02a1f63..d8781f4 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -168,15 +168,13 @@ func (s *Service) Start(ctx context.Context) error { // Produce an init message until the consumer received at least one fetch initTicker := time.NewTicker(1000 * time.Second) isInitialized := false + // send first init message immediately + sendInitMessage(ctx, s.client, s.config.TopicManagement.Name) for !isInitialized { select { case <-initTicker.C: - s.client.Produce(ctx, &kgo.Record{ - Key: []byte("init-message"), - Value: nil, - Topic: s.config.TopicManagement.Name, - }, nil) + sendInitMessage(ctx, s.client, s.config.TopicManagement.Name) case <-initCh: isInitialized = true s.logger.Info("consumer has been successfully initialized") @@ -196,6 +194,14 @@ func (s *Service) Start(ctx context.Context) error { return nil } +func sendInitMessage(ctx context.Context, client *kgo.Client, topicName string) { + client.Produce(ctx, &kgo.Record{ + Key: []byte("init-message"), + Value: nil, + Topic: topicName, + }, nil) +} + func (s *Service) startReconciliation(ctx context.Context) { if !s.config.TopicManagement.Enabled { return