Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock when closing Broker in brokerProducer #2133

Merged
merged 5 commits into from
Feb 13, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
input: input,
output: bridge,
responses: responses,
closeBroker: make(chan struct{}),
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
Expand All @@ -692,9 +693,12 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {

// minimal bridge to make the network response `select`able
go withRecover(func() {
var wg sync.WaitGroup
for set := range bridge {
request := set.buildRequest()

// Count the callbacks to know when to close the responses channel safely
wg.Add(1)
// Capture the current set to forward in the callback
sendResponse := func(set *produceSet) ProduceCallback {
return func(response *ProduceResponse, err error) {
Expand All @@ -703,6 +707,8 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
err: err,
res: response,
}
// We forwarded the response
wg.Done()
}
}(set)

Expand All @@ -721,9 +727,27 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
sendResponse(nil, nil)
}
}
// Wait for all callbacks invocations to close the channel safely
wg.Wait()
close(responses)
})

// use a dedicated goroutine to close the broker instead of inside handleError
// this is because the AsyncProduce callback inside the bridge is invoked from the broker
// responseReceiver goroutine and closing the broker requires for such goroutine to be finished
// therefore leading to a deadlock
go withRecover(func() {
for range bp.closeBroker {
if err := bp.broker.Close(); err != nil {
Logger.Printf("producer/broker/%d unable to close broker: %v\n", bp.broker.ID(), err)
} else {
Logger.Printf("producer/broker/%d closing done\n", bp.broker.ID())
}
}
// Signal that we are done
close(bp.stopchan)
})

if p.conf.Producer.Retry.Max <= 0 {
bp.abandoned = make(chan struct{})
}
Expand All @@ -743,11 +767,12 @@ type brokerProducer struct {
parent *asyncProducer
broker *Broker

input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
abandoned chan struct{}
stopchan chan struct{}
input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
closeBroker chan struct{}
abandoned chan struct{}
stopchan chan struct{}

buffer *produceSet
timer <-chan time.Time
Expand Down Expand Up @@ -830,10 +855,6 @@ func (bp *brokerProducer) run() {
if ok {
bp.handleResponse(response)
}
case <-bp.stopchan:
Logger.Printf(
"producer/broker/%d run loop asked to stop\n", bp.broker.ID())
return
}

if bp.timerFired || bp.buffer.readyToFlush() {
Expand All @@ -854,10 +875,15 @@ func (bp *brokerProducer) shutdown() {
}
}
close(bp.output)
// Drain responses from bridge goroutine
for response := range bp.responses {
bp.handleResponse(response)
}
close(bp.stopchan)
// Ask for the closeBroker goroutine to stop
close(bp.closeBroker)
// And wait for it to be done
<-bp.stopchan
// No more brokerProducer related goroutine should be running
Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
}

Expand Down Expand Up @@ -1028,7 +1054,12 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
default:
Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
bp.parent.abandonBrokerConnection(bp.broker)
_ = bp.broker.Close()
// We only try to close the broker once
if bp.closing == nil {
// Request the closeBroker goroutine to close the broker for us
// because calling bp.broker.Close here can lead to a deadlock
bp.closeBroker <- struct{}{}
}
bp.closing = err
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
Expand Down
55 changes: 53 additions & 2 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,55 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
}
}

// https://github.com/Shopify/sarama/issues/2129
func TestAsyncProducerMultipleRetriesWithConcurrentRequests(t *testing.T) {
//Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

// The seed broker only handles Metadata request
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
metadataLeader := new(MetadataResponse)
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
return metadataLeader
})

// Simulate a slow broker by taking ~200ms to handle requests
// therefore triggering the read timeout and the retry logic
leader.setHandler(func(req *request) (res encoderWithHeader) {
time.Sleep(200 * time.Millisecond)
// Will likely not be read by the producer (read timeout)
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
return prodSuccess
})

config := NewTestConfig()
// Use very short read to simulate read error on unresponsive broker
config.Net.ReadTimeout = 50 * time.Millisecond
// Flush every record to generate N in-flight Produce requests
config.Producer.Flush.Messages = 1
config.Producer.Return.Successes = true
// Reduce retries to speed up the test while keeping the default backoff
config.Producer.Retry.Max = 1
config.Net.MaxOpenRequests = 1
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
}

expectResults(t, producer, 0, 10)

seedBroker.Close()
leader.Close()
closeProducer(t, producer)
}

func TestAsyncProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

Expand Down Expand Up @@ -1249,9 +1298,11 @@ func TestBrokerProducerShutdown(t *testing.T) {
addr: mockBroker.Addr(),
id: mockBroker.BrokerID(),
}
bp := producer.(*asyncProducer).newBrokerProducer(broker)
// Starts various goroutines in newBrokerProducer
bp := producer.(*asyncProducer).getBrokerProducer(broker)
// Initiate the shutdown of all of them
producer.(*asyncProducer).unrefBrokerProducer(broker, bp)

bp.shutdown()
_ = producer.Close()
mockBroker.Close()
}
Expand Down
2 changes: 2 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ type ProduceCallback func(*ProduceResponse, error)
// When configured with RequiredAcks == NoResponse, the callback will not be invoked.
// If an error is returned because the request could not be sent then the callback
// will not be invoked either.
//
// Make sure not to Close the broker in the callback as it will lead to a deadlock.
func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error {
needAcks := request.RequiredAcks != NoResponse
// Use a nil promise when no acks is required
Expand Down