Skip to content

Commit

Permalink
kafka: healthcheck for consumer group coordinator
Browse files Browse the repository at this point in the history
currently, when you try to use consumer groups right away,
it's broken and you need to wait a few seconds before the coordinator
is ready.

This fixes it.
  • Loading branch information
karelbilek authored and orlangure committed May 14, 2024
1 parent 4dc3b8c commit cf7544d
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions preset/kafka/preset.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ func (p *P) healthcheck(ctx context.Context, c *gnomock.Container) (err error) {
return fmt.Errorf("can't create topic: %w", err)
}

group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{
ID: "gnomock",
Brokers: []string{c.Address(BrokerPort)},
Topics: []string{"gnomock"},
})
if err != nil {
return fmt.Errorf("can't create consumer group: %w", err)
}
defer group.Close()

if _, err := group.Next(ctx); err != nil {
return fmt.Errorf("can't read next consumer group: %w", err)
}

if p.UseSchemaRegistry {
url := "http://" + c.Address(SchemaRegistryPort)

Expand Down

0 comments on commit cf7544d

Please sign in to comment.