diff --git a/async_producer.go b/async_producer.go index c807aea72..f842d6bf3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -670,12 +670,20 @@ func (pp *partitionProducer) updateLeader() error { }) } +type pendingResponse struct { + set *produceSet + version int16 + promise *responsePromise + response *ProduceResponse +} + // one per broker; also constructs an associated flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { var ( input = make(chan *ProducerMessage) bridge = make(chan *produceSet) responses = make(chan *brokerProducerResponse) + pendings = make(chan *pendingResponse, p.conf.Net.MaxOpenRequests-1) ) bp := &brokerProducer{ @@ -692,18 +700,54 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer { // minimal bridge to make the network response `select`able go withRecover(func() { + defer close(pendings) for set := range bridge { + set := set + var err error + var response *ProduceResponse + var promise *responsePromise + request := set.buildRequest() + if request.RequiredAcks != NoResponse { + response = new(ProduceResponse) + } - response, err := broker.Produce(request) + responseHeaderVersion := int16(-1) + if response != nil { + responseHeaderVersion = response.headerVersion() + } + + promise, err = broker.send(request, response != nil, responseHeaderVersion) + // return quickly if failed or ackMode: NoResponse + if err != nil || promise == nil { + responses <- &brokerProducerResponse{ + set: set, + err: err, + res: response, + } + continue + } + pending := &pendingResponse{set: set, version: request.version(), response: response, promise: promise} + pendings <- pending + } + }) + + go withRecover(func() { + defer close(responses) + for pending := range pendings { + var err error + select { + case buf := <-pending.promise.packets: + err = versionedDecode(buf, pending.response, pending.version) + case err = <-pending.promise.errors: + } responses <- &brokerProducerResponse{ - set: set, + set: pending.set, err: err, - res: response, + res: pending.response, } } - close(responses) }) if p.conf.Producer.Retry.Max <= 0 {