diff --git a/async_producer.go b/async_producer.go index 2ca97121a..c0dcce9b9 100644 --- a/async_producer.go +++ b/async_producer.go @@ -675,6 +675,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { var ( input = make(chan *ProducerMessage) bridge = make(chan *produceSet) + pending = make(chan *brokerProducerResponse) responses = make(chan *brokerProducerResponse) ) @@ -684,7 +685,6 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { input: input, output: bridge, responses: responses, - stopchan: make(chan struct{}), buffer: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } @@ -692,17 +692,24 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { // minimal bridge to make the network response `select`able go withRecover(func() { + // Use a wait group to know if we still have in flight requests + var wg sync.WaitGroup + for set := range bridge { request := set.buildRequest() + // Count the in flight requests to know when we can close the pending channel safely + wg.Add(1) // Capture the current set to forward in the callback sendResponse := func(set *produceSet) ProduceCallback { return func(response *ProduceResponse, err error) { - responses <- &brokerProducerResponse{ + // Forward the response to make sure we do not block the responseReceiver + pending <- &brokerProducerResponse{ set: set, err: err, res: response, } + wg.Done() } }(set) @@ -721,7 +728,42 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { sendResponse(nil, nil) } } - close(responses) + // Wait for all in flight requests to close the pending channel safely + wg.Wait() + close(pending) + }) + + // In order to avoid a deadlock when closing the broker on network or malformed response error + // we use an intermediate channel to buffer and send pending responses in order + // This is because the AsyncProduce callback inside the bridge is invoked from the broker + // responseReceiver goroutine and closing the broker requires such goroutine to be finished + go withRecover(func() { + buf := queue.New() + for { + if buf.Length() == 0 { + res, ok := <-pending + if !ok { + // We are done forwarding the last pending response + close(responses) + return + } + buf.Add(res) + } + // Send the head pending response or buffer another one + // so that we never block the callback + headRes := buf.Peek().(*brokerProducerResponse) + select { + case res, ok := <-pending: + if !ok { + continue + } + buf.Add(res) + continue + case responses <- headRes: + buf.Remove() + continue + } + } }) if p.conf.Producer.Retry.Max <= 0 { @@ -747,7 +789,6 @@ type brokerProducer struct { output chan<- *produceSet responses <-chan *brokerProducerResponse abandoned chan struct{} - stopchan chan struct{} buffer *produceSet timer <-chan time.Time @@ -830,10 +871,6 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } - case <-bp.stopchan: - Logger.Printf( - "producer/broker/%d run loop asked to stop\n", bp.broker.ID()) - return } if bp.timerFired || bp.buffer.readyToFlush() { @@ -854,10 +891,11 @@ func (bp *brokerProducer) shutdown() { } } close(bp.output) + // Drain responses from the bridge goroutine for response := range bp.responses { bp.handleResponse(response) } - close(bp.stopchan) + // No more brokerProducer related goroutine should be running Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } diff --git a/async_producer_test.go b/async_producer_test.go index e571aa068..93c6c1fa3 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -38,6 +38,7 @@ func closeProducer(t *testing.T, p AsyncProducer) { } func expectResults(t *testing.T, p AsyncProducer, successes, errors int) { + t.Helper() expect := successes + errors for expect > 0 { select { @@ -643,6 +644,56 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { } } +// https://github.com/Shopify/sarama/issues/2129 +func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) { + t.Parallel() + //Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) + + // The seed broker only handles Metadata request + seedBroker.setHandler(func(req *request) (res encoderWithHeader) { + metadataLeader := new(MetadataResponse) + metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) + metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) + return metadataLeader + }) + + // Simulate a slow broker by taking ~200ms to handle requests + // therefore triggering the read timeout and the retry logic + leader.setHandler(func(req *request) (res encoderWithHeader) { + time.Sleep(200 * time.Millisecond) + // Will likely not be read by the producer (read timeout) + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + return prodSuccess + }) + + config := NewTestConfig() + // Use very short read to simulate read error on unresponsive broker + config.Net.ReadTimeout = 50 * time.Millisecond + // Flush every record to generate up to 5 in-flight Produce requests + // because config.Net.MaxOpenRequests defaults to 5 + config.Producer.Flush.MaxMessages = 1 + config.Producer.Return.Successes = true + // Reduce retries to speed up the test while keeping the default backoff + config.Producer.Retry.Max = 1 + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + expectResults(t, producer, 0, 10) + + seedBroker.Close() + leader.Close() + closeProducer(t, producer) +} + func TestAsyncProducerOutOfRetries(t *testing.T) { t.Skip("Enable once bug #294 is fixed.") @@ -1249,9 +1300,11 @@ func TestBrokerProducerShutdown(t *testing.T) { addr: mockBroker.Addr(), id: mockBroker.BrokerID(), } - bp := producer.(*asyncProducer).newBrokerProducer(broker) + // Starts various goroutines in newBrokerProducer + bp := producer.(*asyncProducer).getBrokerProducer(broker) + // Initiate the shutdown of all of them + producer.(*asyncProducer).unrefBrokerProducer(broker, bp) - bp.shutdown() _ = producer.Close() mockBroker.Close() } diff --git a/broker.go b/broker.go index a22efcaca..c60e9a044 100644 --- a/broker.go +++ b/broker.go @@ -389,6 +389,8 @@ type ProduceCallback func(*ProduceResponse, error) // When configured with RequiredAcks == NoResponse, the callback will not be invoked. // If an error is returned because the request could not be sent then the callback // will not be invoked either. +// +// Make sure not to Close the broker in the callback as it will lead to a deadlock. func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error { needAcks := request.RequiredAcks != NoResponse // Use a nil promise when no acks is required