diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..0c79feb --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,5 @@ +{ + "go.testEnvVars": { + "KAFKA_BROKERS": "localhost:9092", + } +} \ No newline at end of file diff --git a/sarama.go b/sarama.go index 360599f..e2efbe3 100644 --- a/sarama.go +++ b/sarama.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "sync" "time" "github.com/Shopify/sarama" @@ -77,10 +78,11 @@ func (p *producer) Close() error { } type consumer struct { - ctx context.Context - group sarama.ConsumerGroup - topics []string - started chan none + ctx context.Context + group sarama.ConsumerGroup + topics []string + started chan none + startedOnce sync.Once } func NewConsumer(ctx context.Context, group sarama.ConsumerGroup, topics []string) (KafkaConsumer, error) { @@ -96,7 +98,9 @@ func (c *consumer) Receive(ctx context.Context, f HandlerFunc) error { handler := &consumerGroupHandler{ handler: f, started: func() { - close(c.started) + c.startedOnce.Do(func() { + close(c.started) + }) }, }