Skip to content

Commit

Permalink
Keep closing the broker synchronously but buffer pending responses
Browse files Browse the repository at this point in the history
Closing the broker asynchronously fixes the deadlock but leads to a
race condition between opening the broker in client updateLeader.
This might result in a closed broker used by the new brokerProducer
and all produce requests will fail with ErrNotConnected.
  • Loading branch information
slaunay committed Feb 11, 2022
1 parent 8f92872 commit f1bc44e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 35 deletions.
77 changes: 42 additions & 35 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
pending = make(chan *brokerProducerResponse)
responses = make(chan *brokerProducerResponse)
)

Expand All @@ -684,30 +685,30 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
input: input,
output: bridge,
responses: responses,
closeBroker: make(chan struct{}),
stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
go withRecover(bp.run)

// minimal bridge to make the network response `select`able
go withRecover(func() {
// Use a wait group to know if we still have in flight requests
var wg sync.WaitGroup

for set := range bridge {
request := set.buildRequest()

// Count the callbacks to know when to close the responses channel safely
// Count the in flight requests to know when we can close the pending channel safely
wg.Add(1)
// Capture the current set to forward in the callback
sendResponse := func(set *produceSet) ProduceCallback {
return func(response *ProduceResponse, err error) {
responses <- &brokerProducerResponse{
// Forward the response to make sure we do not block the responseReceiver
pending <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
// We forwarded the response
wg.Done()
}
}(set)
Expand All @@ -727,25 +728,42 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
sendResponse(nil, nil)
}
}
// Wait for all callbacks invocations to close the channel safely
// Wait for all in flight requests to close the pending channel safely
wg.Wait()
close(responses)
close(pending)
})

// use a dedicated goroutine to close the broker instead of inside handleError
// this is because the AsyncProduce callback inside the bridge is invoked from the broker
// responseReceiver goroutine and closing the broker requires for such goroutine to be finished
// therefore leading to a deadlock
// In order to avoid a deadlock when closing the broker on network or malformed response error
// we use an intermediate channel to buffer and send pending responses in order
// This is because the AsyncProduce callback inside the bridge is invoked from the broker
// responseReceiver goroutine and closing the broker requires such goroutine to be finished
go withRecover(func() {
for range bp.closeBroker {
if err := bp.broker.Close(); err != nil {
Logger.Printf("producer/broker/%d unable to close broker: %v\n", bp.broker.ID(), err)
} else {
Logger.Printf("producer/broker/%d closing done\n", bp.broker.ID())
buf := queue.New()
for {
if buf.Length() == 0 {
res, ok := <-pending
if !ok {
// We are done forwarding the last pending response
close(responses)
return
}
buf.Add(res)
}
// Send the head pending response or buffer another one
// so that we never block the callback
headRes := buf.Peek().(*brokerProducerResponse)
select {
case res, ok := <-pending:
if !ok {
continue
}
buf.Add(res)
continue
case responses <- headRes:
buf.Remove()
continue
}
}
// Signal that we are done
close(bp.stopchan)
})

if p.conf.Producer.Retry.Max <= 0 {
Expand All @@ -767,12 +785,10 @@ type brokerProducer struct {
parent *asyncProducer
broker *Broker

input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
closeBroker chan struct{}
abandoned chan struct{}
stopchan chan struct{}
input chan *ProducerMessage
output chan<- *produceSet
responses <-chan *brokerProducerResponse
abandoned chan struct{}

buffer *produceSet
timer <-chan time.Time
Expand Down Expand Up @@ -875,14 +891,10 @@ func (bp *brokerProducer) shutdown() {
}
}
close(bp.output)
// Drain responses from bridge goroutine
// Drain responses from the bridge goroutine
for response := range bp.responses {
bp.handleResponse(response)
}
// Ask for the closeBroker goroutine to stop
close(bp.closeBroker)
// And wait for it to be done
<-bp.stopchan
// No more brokerProducer related goroutine should be running
Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
}
Expand Down Expand Up @@ -1054,12 +1066,7 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
default:
Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
bp.parent.abandonBrokerConnection(bp.broker)
// We only try to close the broker once
if bp.closing == nil {
// Request the closeBroker goroutine to close the broker for us
// because calling bp.broker.Close here can lead to a deadlock
bp.closeBroker <- struct{}{}
}
_ = bp.broker.Close()
bp.closing = err
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
Expand Down
1 change: 1 addition & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func closeProducer(t *testing.T, p AsyncProducer) {
}

func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
t.Helper()
expect := successes + errors
for expect > 0 {
select {
Expand Down

0 comments on commit f1bc44e

Please sign in to comment.