From 59dd56590fd23b65201ae3378467197a488881a4 Mon Sep 17 00:00:00 2001 From: Sebastien Launay Date: Tue, 8 Feb 2022 10:32:40 -0800 Subject: [PATCH 1/5] Fix deadlock when closing Broker in brokerProducer - add unit test to reproduce the deadlock by simulating a network error - document possible deadlock when closing the Broker from an AsyncProduce callback when handling a response error - add closeBroker goroutine and channel to asynchronously close a Broker once - reuse the stopchan channel to signal that the closeBroker goroutine is done - update TestBrokerProducerShutdown to check goroutine leak by closing the input vs the stopchan channel - fixes #2129 --- async_producer.go | 46 ++++++++++++++++++++++++++--------- async_producer_test.go | 55 ++++++++++++++++++++++++++++++++++++++++-- broker.go | 2 ++ 3 files changed, 90 insertions(+), 13 deletions(-) diff --git a/async_producer.go b/async_producer.go index 2ca97121a..45918e9ee 100644 --- a/async_producer.go +++ b/async_producer.go @@ -684,6 +684,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { input: input, output: bridge, responses: responses, + closeBroker: make(chan struct{}), stopchan: make(chan struct{}), buffer: newProduceSet(p), currentRetries: make(map[string]map[int32]error), @@ -724,6 +725,22 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { close(responses) }) + // use a dedicated goroutine to close the broker instead of inside handleError + // this is because the AsyncProduce callback inside the bridge is invoked from the broker + // responseReceiver goroutine and closing the broker requires for such goroutine to be finished + // therefore leading to a deadlock + go withRecover(func() { + for range bp.closeBroker { + if err := bp.broker.Close(); err != nil { + Logger.Printf("producer/broker/%d unable to close broker: %v\n", bp.broker.ID(), err) + } else { + Logger.Printf("producer/broker/%d closing done\n", bp.broker.ID()) + } + } + // Signal that we are done + close(bp.stopchan) + }) + if p.conf.Producer.Retry.Max <= 0 { bp.abandoned = make(chan struct{}) } @@ -743,11 +760,12 @@ type brokerProducer struct { parent *asyncProducer broker *Broker - input chan *ProducerMessage - output chan<- *produceSet - responses <-chan *brokerProducerResponse - abandoned chan struct{} - stopchan chan struct{} + input chan *ProducerMessage + output chan<- *produceSet + responses <-chan *brokerProducerResponse + closeBroker chan struct{} + abandoned chan struct{} + stopchan chan struct{} buffer *produceSet timer <-chan time.Time @@ -830,10 +848,6 @@ func (bp *brokerProducer) run() { if ok { bp.handleResponse(response) } - case <-bp.stopchan: - Logger.Printf( - "producer/broker/%d run loop asked to stop\n", bp.broker.ID()) - return } if bp.timerFired || bp.buffer.readyToFlush() { @@ -854,10 +868,15 @@ func (bp *brokerProducer) shutdown() { } } close(bp.output) + // Drain responses from bridge goroutine for response := range bp.responses { bp.handleResponse(response) } - close(bp.stopchan) + // Ask for the closeBroker goroutine to stop + close(bp.closeBroker) + // And wait for it to be done + <-bp.stopchan + // No more brokerProducer related goroutine should be running Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } @@ -1028,7 +1047,12 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { default: Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) bp.parent.abandonBrokerConnection(bp.broker) - _ = bp.broker.Close() + // We only try to close the broker once + if bp.closing == nil { + // Request the closeBroker goroutine to close the broker for us + // because calling bp.broker.Close here can lead to a deadlock + bp.closeBroker <- struct{}{} + } bp.closing = err sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { bp.parent.retryMessages(pSet.msgs, err) diff --git a/async_producer_test.go b/async_producer_test.go index e571aa068..01a227d9b 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -643,6 +643,55 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { } } +// https://github.com/Shopify/sarama/issues/2129 +func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) { + //Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) + + // The seed broker only handles Metadata request + seedBroker.setHandler(func(req *request) (res encoderWithHeader) { + metadataLeader := new(MetadataResponse) + metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) + metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) + return metadataLeader + }) + + // Simulate a slow broker by taking ~200ms to handle requests + // therefore triggering the read timeout and the retry logic + leader.setHandler(func(req *request) (res encoderWithHeader) { + time.Sleep(200 * time.Millisecond) + // Will likely not be read by the producer (read timeout) + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + return prodSuccess + }) + + config := NewTestConfig() + // Use very short read to simulate read error on unresponsive broker + config.Net.ReadTimeout = 50 * time.Millisecond + // Flush every record to generate N in-flight Produce requests + config.Producer.Flush.Messages = 1 + config.Producer.Return.Successes = true + // Reduce retries to speed up the test while keeping the default backoff + config.Producer.Retry.Max = 1 + config.Net.MaxOpenRequests = 1 + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + expectResults(t, producer, 0, 10) + + seedBroker.Close() + leader.Close() + closeProducer(t, producer) +} + func TestAsyncProducerOutOfRetries(t *testing.T) { t.Skip("Enable once bug #294 is fixed.") @@ -1249,9 +1298,11 @@ func TestBrokerProducerShutdown(t *testing.T) { addr: mockBroker.Addr(), id: mockBroker.BrokerID(), } - bp := producer.(*asyncProducer).newBrokerProducer(broker) + // Starts various goroutines in newBrokerProducer + bp := producer.(*asyncProducer).getBrokerProducer(broker) + // Initiate the shutdown of all of them + producer.(*asyncProducer).unrefBrokerProducer(broker, bp) - bp.shutdown() _ = producer.Close() mockBroker.Close() } diff --git a/broker.go b/broker.go index a22efcaca..c60e9a044 100644 --- a/broker.go +++ b/broker.go @@ -389,6 +389,8 @@ type ProduceCallback func(*ProduceResponse, error) // When configured with RequiredAcks == NoResponse, the callback will not be invoked. // If an error is returned because the request could not be sent then the callback // will not be invoked either. +// +// Make sure not to Close the broker in the callback as it will lead to a deadlock. func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error { needAcks := request.RequiredAcks != NoResponse // Use a nil promise when no acks is required From 6c70a6c230a42afe4408e54ecdf73722e415945e Mon Sep 17 00:00:00 2001 From: Sebastien Launay Date: Tue, 8 Feb 2022 16:52:48 -0800 Subject: [PATCH 2/5] Address possible data race with the responses chan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WARNING: DATA RACE Write at 0x00c0003421f0 by goroutine 71: runtime.closechan() runtime/chan.go:355 +0x0 github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1() github.com/Shopify/sarama/async_producer.go:725 +0x1c4 github.com/Shopify/sarama.withRecover() github.com/Shopify/sarama/utils.go:43 +0x74 github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer·dwrap·15() github.com/Shopify/sarama/async_producer.go:695 +0x39 Previous read at 0x00c0003421f0 by goroutine 58: runtime.chansend() runtime/chan.go:158 +0x0 github.com/Shopify/sarama.(*asyncProducer).newBrokerProducer.func1.1.1() github.com/Shopify/sarama/async_producer.go:702 +0x125 github.com/Shopify/sarama.(*Broker).AsyncProduce.func1() github.com/Shopify/sarama/broker.go:408 +0x1a9 github.com/Shopify/sarama.(*responsePromise).handle() github.com/Shopify/sarama/broker.go:132 +0x1b8 github.com/Shopify/sarama.(*Broker).responseReceiver() github.com/Shopify/sarama/broker.go:1040 +0x124 github.com/Shopify/sarama.(*Broker).responseReceiver-fm() github.com/Shopify/sarama/broker.go:1032 +0x39 github.com/Shopify/sarama.withRecover() github.com/Shopify/sarama/utils.go:43 +0x74 github.com/Shopify/sarama.(*Broker).Open.func1·dwrap·22() github.com/Shopify/sarama/broker.go:244 +0x39 --- async_producer.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/async_producer.go b/async_producer.go index 45918e9ee..e44107e3e 100644 --- a/async_producer.go +++ b/async_producer.go @@ -693,9 +693,12 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { // minimal bridge to make the network response `select`able go withRecover(func() { + var wg sync.WaitGroup for set := range bridge { request := set.buildRequest() + // Count the callbacks to know when to close the responses channel safely + wg.Add(1) // Capture the current set to forward in the callback sendResponse := func(set *produceSet) ProduceCallback { return func(response *ProduceResponse, err error) { @@ -704,6 +707,8 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { err: err, res: response, } + // We forwarded the response + wg.Done() } }(set) @@ -722,6 +727,8 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { sendResponse(nil, nil) } } + // Wait for all callbacks invocations to close the channel safely + wg.Wait() close(responses) }) From 8f92872069c181f8404bf3aca3f84797998839c6 Mon Sep 17 00:00:00 2001 From: Sebastien Launay Date: Wed, 9 Feb 2022 09:31:35 -0800 Subject: [PATCH 3/5] Update unit test to use up to 5 in-flight requests --- async_producer_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index 01a227d9b..aa16eff63 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -670,12 +670,12 @@ func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) { config := NewTestConfig() // Use very short read to simulate read error on unresponsive broker config.Net.ReadTimeout = 50 * time.Millisecond - // Flush every record to generate N in-flight Produce requests - config.Producer.Flush.Messages = 1 + // Flush every record to generate up to 5 in-flight Produce requests + // because config.Net.MaxOpenRequests defaults to 5 + config.Producer.Flush.MaxMessages = 1 config.Producer.Return.Successes = true // Reduce retries to speed up the test while keeping the default backoff config.Producer.Retry.Max = 1 - config.Net.MaxOpenRequests = 1 producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) if err != nil { t.Fatal(err) From f1bc44e541eecf45f935b97db6a457740aaa073e Mon Sep 17 00:00:00 2001 From: Sebastien Launay Date: Thu, 10 Feb 2022 21:16:06 -0800 Subject: [PATCH 4/5] Keep closing the broker synchronously but buffer pending responses Closing the broker asynchronously fixes the deadlock but leads to a race condition between opening the broker in client updateLeader. This might result in a closed broker used by the new brokerProducer and all produce requests will fail with ErrNotConnected. --- async_producer.go | 77 +++++++++++++++++++++++------------------- async_producer_test.go | 1 + 2 files changed, 43 insertions(+), 35 deletions(-) diff --git a/async_producer.go b/async_producer.go index e44107e3e..c0dcce9b9 100644 --- a/async_producer.go +++ b/async_producer.go @@ -675,6 +675,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { var ( input = make(chan *ProducerMessage) bridge = make(chan *produceSet) + pending = make(chan *brokerProducerResponse) responses = make(chan *brokerProducerResponse) ) @@ -684,8 +685,6 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { input: input, output: bridge, responses: responses, - closeBroker: make(chan struct{}), - stopchan: make(chan struct{}), buffer: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } @@ -693,21 +692,23 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { // minimal bridge to make the network response `select`able go withRecover(func() { + // Use a wait group to know if we still have in flight requests var wg sync.WaitGroup + for set := range bridge { request := set.buildRequest() - // Count the callbacks to know when to close the responses channel safely + // Count the in flight requests to know when we can close the pending channel safely wg.Add(1) // Capture the current set to forward in the callback sendResponse := func(set *produceSet) ProduceCallback { return func(response *ProduceResponse, err error) { - responses <- &brokerProducerResponse{ + // Forward the response to make sure we do not block the responseReceiver + pending <- &brokerProducerResponse{ set: set, err: err, res: response, } - // We forwarded the response wg.Done() } }(set) @@ -727,25 +728,42 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { sendResponse(nil, nil) } } - // Wait for all callbacks invocations to close the channel safely + // Wait for all in flight requests to close the pending channel safely wg.Wait() - close(responses) + close(pending) }) - // use a dedicated goroutine to close the broker instead of inside handleError - // this is because the AsyncProduce callback inside the bridge is invoked from the broker - // responseReceiver goroutine and closing the broker requires for such goroutine to be finished - // therefore leading to a deadlock + // In order to avoid a deadlock when closing the broker on network or malformed response error + // we use an intermediate channel to buffer and send pending responses in order + // This is because the AsyncProduce callback inside the bridge is invoked from the broker + // responseReceiver goroutine and closing the broker requires such goroutine to be finished go withRecover(func() { - for range bp.closeBroker { - if err := bp.broker.Close(); err != nil { - Logger.Printf("producer/broker/%d unable to close broker: %v\n", bp.broker.ID(), err) - } else { - Logger.Printf("producer/broker/%d closing done\n", bp.broker.ID()) + buf := queue.New() + for { + if buf.Length() == 0 { + res, ok := <-pending + if !ok { + // We are done forwarding the last pending response + close(responses) + return + } + buf.Add(res) + } + // Send the head pending response or buffer another one + // so that we never block the callback + headRes := buf.Peek().(*brokerProducerResponse) + select { + case res, ok := <-pending: + if !ok { + continue + } + buf.Add(res) + continue + case responses <- headRes: + buf.Remove() + continue } } - // Signal that we are done - close(bp.stopchan) }) if p.conf.Producer.Retry.Max <= 0 { @@ -767,12 +785,10 @@ type brokerProducer struct { parent *asyncProducer broker *Broker - input chan *ProducerMessage - output chan<- *produceSet - responses <-chan *brokerProducerResponse - closeBroker chan struct{} - abandoned chan struct{} - stopchan chan struct{} + input chan *ProducerMessage + output chan<- *produceSet + responses <-chan *brokerProducerResponse + abandoned chan struct{} buffer *produceSet timer <-chan time.Time @@ -875,14 +891,10 @@ func (bp *brokerProducer) shutdown() { } } close(bp.output) - // Drain responses from bridge goroutine + // Drain responses from the bridge goroutine for response := range bp.responses { bp.handleResponse(response) } - // Ask for the closeBroker goroutine to stop - close(bp.closeBroker) - // And wait for it to be done - <-bp.stopchan // No more brokerProducer related goroutine should be running Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } @@ -1054,12 +1066,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { default: Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) bp.parent.abandonBrokerConnection(bp.broker) - // We only try to close the broker once - if bp.closing == nil { - // Request the closeBroker goroutine to close the broker for us - // because calling bp.broker.Close here can lead to a deadlock - bp.closeBroker <- struct{}{} - } + _ = bp.broker.Close() bp.closing = err sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { bp.parent.retryMessages(pSet.msgs, err) diff --git a/async_producer_test.go b/async_producer_test.go index aa16eff63..fb2c9255f 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -38,6 +38,7 @@ func closeProducer(t *testing.T, p AsyncProducer) { } func expectResults(t *testing.T, p AsyncProducer, successes, errors int) { + t.Helper() expect := successes + errors for expect > 0 { select { From 85a2d3caec1bda97ceddeeea9f58a91454f2ec1e Mon Sep 17 00:00:00 2001 From: Sebastien Launay Date: Thu, 10 Feb 2022 22:31:25 -0800 Subject: [PATCH 5/5] t.Parallel() TestAsyncProducer...ConcurrentRequest --- async_producer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/async_producer_test.go b/async_producer_test.go index fb2c9255f..93c6c1fa3 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -646,6 +646,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) { // https://github.com/Shopify/sarama/issues/2129 func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) { + t.Parallel() //Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2)