Skip to content

Commit

Permalink
Merge pull request #128 from alenkacz/av/send-msg-immediately
Browse files Browse the repository at this point in the history
Send first e2e init message immediately
  • Loading branch information
weeco authored Nov 3, 2021
2 parents 21e8a6b + 7c2e82e commit 26b2314
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,13 @@ func (s *Service) Start(ctx context.Context) error {
// Produce an init message until the consumer received at least one fetch
initTicker := time.NewTicker(1000 * time.Second)
isInitialized := false
// send first init message immediately
sendInitMessage(ctx, s.client, s.config.TopicManagement.Name)

for !isInitialized {
select {
case <-initTicker.C:
s.client.Produce(ctx, &kgo.Record{
Key: []byte("init-message"),
Value: nil,
Topic: s.config.TopicManagement.Name,
}, nil)
sendInitMessage(ctx, s.client, s.config.TopicManagement.Name)
case <-initCh:
isInitialized = true
s.logger.Info("consumer has been successfully initialized")
Expand All @@ -196,6 +194,14 @@ func (s *Service) Start(ctx context.Context) error {
return nil
}

func sendInitMessage(ctx context.Context, client *kgo.Client, topicName string) {
client.Produce(ctx, &kgo.Record{
Key: []byte("init-message"),
Value: nil,
Topic: topicName,
}, nil)
}

func (s *Service) startReconciliation(ctx context.Context) {
if !s.config.TopicManagement.Enabled {
return
Expand Down

0 comments on commit 26b2314

Please sign in to comment.