diff --git a/async_producer.go b/async_producer.go index ec75dce5d..7424a3abd 100644 --- a/async_producer.go +++ b/async_producer.go @@ -56,8 +56,8 @@ type asyncProducer struct { input, successes, retries chan *ProducerMessage inFlight sync.WaitGroup - brokers map[*Broker]chan *ProducerMessage - brokerRefs map[chan *ProducerMessage]int + brokers map[*Broker]chan<- *ProducerMessage + brokerRefs map[chan<- *ProducerMessage]int brokerLock sync.Mutex } @@ -91,8 +91,8 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) { input: make(chan *ProducerMessage), successes: make(chan *ProducerMessage), retries: make(chan *ProducerMessage), - brokers: make(map[*Broker]chan *ProducerMessage), - brokerRefs: make(map[chan *ProducerMessage]int), + brokers: make(map[*Broker]chan<- *ProducerMessage), + brokerRefs: make(map[chan<- *ProducerMessage]int), } // launch our singleton dispatchers @@ -347,7 +347,7 @@ type partitionProducer struct { leader *Broker breaker *breaker.Breaker - output chan *ProducerMessage + output chan<- *ProducerMessage // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through, // all other messages get buffered in retryState[msg.retries].buf to preserve ordering @@ -491,37 +491,63 @@ func (pp *partitionProducer) updateLeader() error { }) } +func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { + input := make(chan *ProducerMessage) + bridge := make(chan []*ProducerMessage) + + a := &aggregator{ + parent: p, + broker: broker, + input: input, + output: bridge, + } + go withRecover(a.run) + + f := &flusher{ + parent: p, + broker: broker, + input: bridge, + } + go withRecover(f.run) + + return input +} + // one per broker // groups messages together into appropriately-sized batches for sending to the broker // based on https://godoc.org/github.com/eapache/channels#BatchingChannel -func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) { +type aggregator struct { + parent *asyncProducer + broker *Broker + input <-chan *ProducerMessage + output chan<- []*ProducerMessage +} + +func (a *aggregator) run() { var ( timer <-chan time.Time buffer []*ProducerMessage - flushTriggered chan []*ProducerMessage + flushTriggered chan<- []*ProducerMessage bytesAccumulated int defaultFlush bool ) - if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 { + if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 { defaultFlush = true } - output := make(chan []*ProducerMessage) - go withRecover(func() { p.flusher(broker, output) }) - for { select { - case msg := <-input: + case msg := <-a.input: if msg == nil { goto shutdown } if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) || - (p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) || - (p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) { - Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID()) - output <- buffer + (a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) || + (a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) { + Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID()) + a.output <- buffer timer = nil buffer = nil flushTriggered = nil @@ -533,14 +559,14 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer if defaultFlush || msg.flags&chaser == chaser || - (p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) || - (p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) { - flushTriggered = output - } else if p.conf.Producer.Flush.Frequency > 0 && timer == nil { - timer = time.After(p.conf.Producer.Flush.Frequency) + (a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) || + (a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) { + flushTriggered = a.output + } else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil { + timer = time.After(a.parent.conf.Producer.Flush.Frequency) } case <-timer: - flushTriggered = output + flushTriggered = a.output case flushTriggered <- buffer: timer = nil buffer = nil @@ -551,21 +577,27 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer shutdown: if len(buffer) > 0 { - output <- buffer + a.output <- buffer } - close(output) + close(a.output) } // one per broker // takes a batch at a time from the messageAggregator and sends to the broker -func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) { +type flusher struct { + parent *asyncProducer + broker *Broker + input <-chan []*ProducerMessage +} + +func (f *flusher) run() { var closing error currentRetries := make(map[string]map[int32]error) - Logger.Printf("producer/flusher/%d starting up\n", broker.ID()) + Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID()) - for batch := range input { + for batch := range f.input { if closing != nil { - p.retryMessages(batch, closing) + f.parent.retryMessages(batch, closing) continue } @@ -576,10 +608,10 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) if msg.flags&chaser == chaser { // we can start processing this topic/partition again Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", - broker.ID(), msg.Topic, msg.Partition) + f.broker.ID(), msg.Topic, msg.Partition) currentRetries[msg.Topic][msg.Partition] = nil } - p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition]) + f.parent.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition]) batch[i] = nil // to prevent it being returned/retried twice continue } @@ -593,31 +625,31 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) } - request := p.buildRequest(msgSets) + request := f.parent.buildRequest(msgSets) if request == nil { continue } - response, err := broker.Produce(request) + response, err := f.broker.Produce(request) switch err.(type) { case nil: break case PacketEncodingError: - p.returnErrors(batch, err) + f.parent.returnErrors(batch, err) continue default: - Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err) - p.abandonBrokerConnection(broker) - _ = broker.Close() + Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err) + f.parent.abandonBrokerConnection(f.broker) + _ = f.broker.Close() closing = err - p.retryMessages(batch, err) + f.parent.retryMessages(batch, err) continue } if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - p.returnSuccesses(batch) + f.parent.returnSuccesses(batch) continue } @@ -628,7 +660,7 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) block := response.GetBlock(topic, partition) if block == nil { - p.returnErrors(msgs, ErrIncompleteResponse) + f.parent.returnErrors(msgs, ErrIncompleteResponse) continue } @@ -638,23 +670,23 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) for i := range msgs { msgs[i].Offset = block.Offset + int64(i) } - p.returnSuccesses(msgs) + f.parent.returnSuccesses(msgs) case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", - broker.ID(), topic, partition, block.Err) + f.broker.ID(), topic, partition, block.Err) if currentRetries[topic] == nil { currentRetries[topic] = make(map[int32]error) } currentRetries[topic][partition] = block.Err - p.retryMessages(msgs, block.Err) + f.parent.retryMessages(msgs, block.Err) default: - p.returnErrors(msgs, block.Err) + f.parent.returnErrors(msgs, block.Err) } } } } - Logger.Printf("producer/flusher/%d shut down\n", broker.ID()) + Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } // singleton @@ -814,17 +846,16 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) { } } -func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage { +func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage { p.brokerLock.Lock() defer p.brokerLock.Unlock() bp := p.brokers[broker] if bp == nil { - bp = make(chan *ProducerMessage) + bp = p.newBrokerProducer(broker) p.brokers[broker] = bp p.brokerRefs[bp] = 0 - go withRecover(func() { p.messageAggregator(broker, bp) }) } p.brokerRefs[bp]++ @@ -832,7 +863,7 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage return bp } -func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) { +func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) { p.brokerLock.Lock() defer p.brokerLock.Unlock()