Skip to content

Commit

Permalink
Address possible data race with the responses chan
Browse files Browse the repository at this point in the history
WARNING: DATA RACE
Write at 0x00c0003421f0 by goroutine 71:
  runtime.closechan()
      runtime/chan.go:355 +0x0
  github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1()
      github.com/Shopify/sarama/async_producer.go:725 +0x1c4
  github.com/Shopify/sarama.withRecover()
      github.com/Shopify/sarama/utils.go:43 +0x74
  github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer·dwrap·15()
      github.com/Shopify/sarama/async_producer.go:695 +0x39

Previous read at 0x00c0003421f0 by goroutine 58:
  runtime.chansend()
      runtime/chan.go:158 +0x0
  github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.1.1()
      github.com/Shopify/sarama/async_producer.go:702 +0x125
  github.com/Shopify/sarama.(*Broker).AsyncProduce.func1()
      github.com/Shopify/sarama/broker.go:408 +0x1a9
  github.com/Shopify/sarama.(*responsePromise).handle()
      github.com/Shopify/sarama/broker.go:132 +0x1b8
  github.com/Shopify/sarama.(*Broker).responseReceiver()
      github.com/Shopify/sarama/broker.go:1040 +0x124
  github.com/Shopify/sarama.(*Broker).responseReceiver-fm()
      github.com/Shopify/sarama/broker.go:1032 +0x39
  github.com/Shopify/sarama.withRecover()
      github.com/Shopify/sarama/utils.go:43 +0x74
  github.com/Shopify/sarama.(*Broker).Open.func1·dwrap·22()
      github.com/Shopify/sarama/broker.go:244 +0x39
  • Loading branch information
slaunay committed Feb 9, 2022
1 parent 59dd565 commit 6c70a6c
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,9 +693,12 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {

// minimal bridge to make the network response `select`able
go withRecover(func() {
var wg sync.WaitGroup
for set := range bridge {
request := set.buildRequest()

// Count the callbacks to know when to close the responses channel safely
wg.Add(1)
// Capture the current set to forward in the callback
sendResponse := func(set *produceSet) ProduceCallback {
return func(response *ProduceResponse, err error) {
Expand All @@ -704,6 +707,8 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
err: err,
res: response,
}
// We forwarded the response
wg.Done()
}
}(set)

Expand All @@ -722,6 +727,8 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
sendResponse(nil, nil)
}
}
// Wait for all callbacks invocations to close the channel safely
wg.Wait()
close(responses)
})

Expand Down

0 comments on commit 6c70a6c

Please sign in to comment.