Skip to content

Commit

Permalink
Fix close on closed channel
Browse files Browse the repository at this point in the history
  • Loading branch information
giautm committed Mar 17, 2019
1 parent a5cb1ff commit b53ed77
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"go.testEnvVars": {
"KAFKA_BROKERS": "localhost:9092",
}
}
14 changes: 9 additions & 5 deletions sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"sync"
"time"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -77,10 +78,11 @@ func (p *producer) Close() error {
}

type consumer struct {
ctx context.Context
group sarama.ConsumerGroup
topics []string
started chan none
ctx context.Context
group sarama.ConsumerGroup
topics []string
started chan none
startedOnce sync.Once
}

func NewConsumer(ctx context.Context, group sarama.ConsumerGroup, topics []string) (KafkaConsumer, error) {
Expand All @@ -96,7 +98,9 @@ func (c *consumer) Receive(ctx context.Context, f HandlerFunc) error {
handler := &consumerGroupHandler{
handler: f,
started: func() {
close(c.started)
c.startedOnce.Do(func() {
close(c.started)
})
},
}

Expand Down

0 comments on commit b53ed77

Please sign in to comment.