diff --git a/async_producer.go b/async_producer.go index 6d29c76bd..0aef9b2dd 100644 --- a/async_producer.go +++ b/async_producer.go @@ -137,12 +137,12 @@ type ProducerMessage struct { retries int flags flagSet - - keyCache, valueCache []byte } +const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. + func (m *ProducerMessage) byteSize() int { - size := 26 // the metadata overhead of CRC, flags, etc. + size := producerMessageOverhead if m.Key != nil { size += m.Key.Length() } @@ -155,8 +155,6 @@ func (m *ProducerMessage) byteSize() int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 - m.keyCache = nil - m.valueCache = nil } // ProducerError is the type of error generated when the producer fails to deliver a message. @@ -509,260 +507,248 @@ func (pp *partitionProducer) updateLeader() error { }) } -// one per broker, constructs both an aggregator and a flusher +// one per broker; also constructs an associated flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { - input := make(chan *ProducerMessage) - bridge := make(chan []*ProducerMessage) - - a := &aggregator{ - parent: p, - broker: broker, - input: input, - output: bridge, - } - go withRecover(a.run) + var ( + input = make(chan *ProducerMessage) + bridge = make(chan *produceSet) + responses = make(chan *brokerProducerResponse) + ) - f := &flusher{ + a := &brokerProducer{ parent: p, broker: broker, - input: bridge, + input: input, + output: bridge, + responses: responses, + buffer: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } - go withRecover(f.run) + go withRecover(a.run) + + // minimal bridge to make the network response `select`able + go withRecover(func() { + for set := range bridge { + request := set.buildRequest() + + response, err := broker.Produce(request) + + responses <- &brokerProducerResponse{ + set: set, + err: err, + res: response, + } + } + close(responses) + }) return input } +type brokerProducerResponse struct { + set *produceSet + err error + res *ProduceResponse +} + // groups messages together into appropriately-sized batches for sending to the broker -// based on https://godoc.org/github.com/eapache/channels#BatchingChannel -type aggregator struct { +// handles state related to retries etc +type brokerProducer struct { parent *asyncProducer broker *Broker - input <-chan *ProducerMessage - output chan<- []*ProducerMessage - buffer []*ProducerMessage - bufferBytes int - timer <-chan time.Time + input <-chan *ProducerMessage + output chan<- *produceSet + responses <-chan *brokerProducerResponse + + buffer *produceSet + timer <-chan time.Time + timerFired bool + + closing error + currentRetries map[string]map[int32]error } -func (a *aggregator) run() { - var output chan<- []*ProducerMessage +func (bp *brokerProducer) run() { + var output chan<- *produceSet + Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID()) for { select { - case msg := <-a.input: + case msg := <-bp.input: if msg == nil { goto shutdown } - if a.wouldOverflow(msg) { - Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID()) - a.output <- a.buffer - a.reset() - output = nil + if reason := bp.needsRetry(msg); reason != nil { + bp.parent.retryMessage(msg, reason) + + if bp.closing == nil && msg.flags&chaser == chaser { + // we were retrying this partition but we can start processing again + delete(bp.currentRetries[msg.Topic], msg.Partition) + Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n", + bp.broker.ID(), msg.Topic, msg.Partition) + } + + continue + } + + if bp.buffer.wouldOverflow(msg) { + if err := bp.waitForSpace(msg); err != nil { + bp.parent.retryMessage(msg, err) + continue + } } - a.buffer = append(a.buffer, msg) - a.bufferBytes += msg.byteSize() + if err := bp.buffer.add(msg); err != nil { + bp.parent.returnError(msg, err) + continue + } - if a.readyToFlush(msg) { - output = a.output - } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil { - a.timer = time.After(a.parent.conf.Producer.Flush.Frequency) + if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { + bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency) } - case <-a.timer: - output = a.output - case output <- a.buffer: - a.reset() + case <-bp.timer: + bp.timerFired = true + case output <- bp.buffer: + bp.rollOver() + case response := <-bp.responses: + bp.handleResponse(response) + } + + if bp.timerFired || bp.buffer.readyToFlush() { + output = bp.output + } else { output = nil } } shutdown: - if len(a.buffer) > 0 { - a.output <- a.buffer + for !bp.buffer.empty() { + select { + case response := <-bp.responses: + bp.handleResponse(response) + case bp.output <- bp.buffer: + bp.rollOver() + } } - close(a.output) -} - -func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool { - switch { - // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. - case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): - return true - // Would we overflow the size-limit of a compressed message-batch? - case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes: - return true - // Would we overflow simply in number of messages? - case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages: - return true - default: - return false + close(bp.output) + for response := range bp.responses { + bp.handleResponse(response) } -} -func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { - switch { - // If all three config values are 0, we always flush as-fast-as-possible - case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0: - return true - // If the messages is a chaser we must flush to maintain the state-machine - case msg.flags&chaser == chaser: - return true - // If we've passed the message trigger-point - case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages: - return true - // If we've passed the byte trigger-point - case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes: - return true - default: - return false - } + Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } -func (a *aggregator) reset() { - a.timer = nil - a.buffer = nil - a.bufferBytes = 0 -} +func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error { + if bp.closing != nil { + return bp.closing + } -// takes a batch at a time from the aggregator and sends to the broker -type flusher struct { - parent *asyncProducer - broker *Broker - input <-chan []*ProducerMessage + if bp.currentRetries[msg.Topic] == nil { + return nil + } - currentRetries map[string]map[int32]error + return bp.currentRetries[msg.Topic][msg.Partition] } -func (f *flusher) run() { - var closing error - - Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID()) +func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error { + Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) - for batch := range f.input { - if closing != nil { - f.parent.retryMessages(batch, closing) - continue + for { + select { + case response := <-bp.responses: + bp.handleResponse(response) + // handling a response can change our state, so re-check some things + if reason := bp.needsRetry(msg); reason != nil { + return reason + } else if !bp.buffer.wouldOverflow(msg) { + return nil + } + case bp.output <- bp.buffer: + bp.rollOver() + return nil } + } +} - msgSets := f.groupAndFilter(batch) - request := f.parent.buildRequest(msgSets) - if request == nil { - continue - } +func (bp *brokerProducer) rollOver() { + bp.timer = nil + bp.timerFired = false + bp.buffer = newProduceSet(bp.parent) +} - response, err := f.broker.Produce(request) +func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) { + if response.err != nil { + bp.handleError(response.set, response.err) + } else { + bp.handleSuccess(response.set, response.res) + } - switch err.(type) { - case nil: - break - case PacketEncodingError: - f.parent.returnErrors(batch, err) - continue - default: - Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err) - f.parent.abandonBrokerConnection(f.broker) - _ = f.broker.Close() - closing = err - f.parent.retryMessages(batch, err) - continue - } + if bp.buffer.empty() { + bp.rollOver() // this can happen if the response invalidated our buffer + } +} +func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) { + // we iterate through the blocks in the request set, not the response, so that we notice + // if the response is missing a block completely + sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - f.parent.returnSuccesses(batch) - continue + bp.parent.returnSuccesses(msgs) + return } - f.parseResponse(msgSets, response) - } - Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) -} - -func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { - var err error - msgSets := make(map[string]map[int32][]*ProducerMessage) - - for i, msg := range batch { - - if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { - // we're currently retrying this partition so we need to filter out this message - f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - batch[i] = nil - - if msg.flags&chaser == chaser { - // ...but now we can start processing future messages again - Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", - f.broker.ID(), msg.Topic, msg.Partition) - delete(f.currentRetries[msg.Topic], msg.Partition) - } - - continue + block := response.GetBlock(topic, partition) + if block == nil { + bp.parent.returnErrors(msgs, ErrIncompleteResponse) + return } - if msg.Key != nil { - if msg.keyCache, err = msg.Key.Encode(); err != nil { - f.parent.returnError(msg, err) - batch[i] = nil - continue + switch block.Err { + // Success + case ErrNoError: + for i, msg := range msgs { + msg.Offset = block.Offset + int64(i) } - } - - if msg.Value != nil { - if msg.valueCache, err = msg.Value.Encode(); err != nil { - f.parent.returnError(msg, err) - batch[i] = nil - continue + bp.parent.returnSuccesses(msgs) + // Retriable errors + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", + bp.broker.ID(), topic, partition, block.Err) + if bp.currentRetries[topic] == nil { + bp.currentRetries[topic] = make(map[int32]error) } + bp.currentRetries[topic][partition] = block.Err + bp.parent.retryMessages(msgs, block.Err) + bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err) + // Other non-retriable errors + default: + bp.parent.returnErrors(msgs, block.Err) } - - partitionSet := msgSets[msg.Topic] - if partitionSet == nil { - partitionSet = make(map[int32][]*ProducerMessage) - msgSets[msg.Topic] = partitionSet - } - - partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) - } - - return msgSets + }) } -func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) { - // we iterate through the blocks in the request set, not the response, so that we notice - // if the response is missing a block completely - for topic, partitionSet := range msgSets { - for partition, msgs := range partitionSet { - block := response.GetBlock(topic, partition) - if block == nil { - f.parent.returnErrors(msgs, ErrIncompleteResponse) - continue - } - - switch block.Err { - // Success - case ErrNoError: - for i := range msgs { - msgs[i].Offset = block.Offset + int64(i) - } - f.parent.returnSuccesses(msgs) - // Retriable errors - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: - Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", - f.broker.ID(), topic, partition, block.Err) - if f.currentRetries[topic] == nil { - f.currentRetries[topic] = make(map[int32]error) - } - f.currentRetries[topic][partition] = block.Err - f.parent.retryMessages(msgs, block.Err) - // Other non-retriable errors - default: - f.parent.returnErrors(msgs, block.Err) - } - } +func (bp *brokerProducer) handleError(sent *produceSet, err error) { + switch err.(type) { + case PacketEncodingError: + sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.parent.returnErrors(msgs, err) + }) + default: + Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) + bp.parent.abandonBrokerConnection(bp.broker) + _ = bp.broker.Close() + bp.closing = err + sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.parent.retryMessages(msgs, err) + }) + bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.parent.retryMessages(msgs, err) + }) + bp.rollOver() } } @@ -793,73 +779,183 @@ func (p *asyncProducer) retryHandler() { } } -// utility functions +// produceSet -func (p *asyncProducer) shutdown() { - Logger.Println("Producer shutting down.") - p.inFlight.Add(1) - p.input <- &ProducerMessage{flags: shutdown} +type partitionSet struct { + msgs []*ProducerMessage + setToSend *MessageSet + bufferBytes int +} - p.inFlight.Wait() +type produceSet struct { + parent *asyncProducer + msgs map[string]map[int32]*partitionSet - if p.ownClient { - err := p.client.Close() - if err != nil { - Logger.Println("producer/shutdown failed to close the embedded client:", err) + bufferBytes int + bufferCount int +} + +func newProduceSet(parent *asyncProducer) *produceSet { + return &produceSet{ + msgs: make(map[string]map[int32]*partitionSet), + parent: parent, + } +} + +func (ps *produceSet) add(msg *ProducerMessage) error { + var err error + var key, val []byte + + if msg.Key != nil { + if key, err = msg.Key.Encode(); err != nil { + return err } } - close(p.input) - close(p.retries) - close(p.errors) - close(p.successes) -} + if msg.Value != nil { + if val, err = msg.Value.Encode(); err != nil { + return err + } + } -func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest { - - req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)} - empty := true - - for topic, partitionSet := range batch { - for partition, msgSet := range partitionSet { - setToSend := new(MessageSet) - setSize := 0 - for _, msg := range msgSet { - if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes { - // compression causes message-sets to be wrapped as single messages, which have tighter - // size requirements, so we have to respect those limits - valBytes, err := encode(setToSend) - if err != nil { - Logger.Println(err) // if this happens, it's basically our fault. - panic(err) - } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) - setToSend = new(MessageSet) - setSize = 0 - } - setSize += msg.byteSize() + partitions := ps.msgs[msg.Topic] + if partitions == nil { + partitions = make(map[int32]*partitionSet) + ps.msgs[msg.Topic] = partitions + } - setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache}) - empty = false - } + set := partitions[msg.Partition] + if set == nil { + set = &partitionSet{setToSend: new(MessageSet)} + partitions[msg.Partition] = set + } + + set.msgs = append(set.msgs, msg) + set.setToSend.addMessage(&Message{Codec: CompressionNone, Key: key, Value: val}) + + size := producerMessageOverhead + len(key) + len(val) + set.bufferBytes += size + ps.bufferBytes += size + ps.bufferCount++ + + return nil +} - if p.conf.Producer.Compression == CompressionNone { - req.AddSet(topic, partition, setToSend) +func (ps *produceSet) buildRequest() *ProduceRequest { + req := &ProduceRequest{ + RequiredAcks: ps.parent.conf.Producer.RequiredAcks, + Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond), + } + + for topic, partitionSet := range ps.msgs { + for partition, set := range partitionSet { + if ps.parent.conf.Producer.Compression == CompressionNone { + req.AddSet(topic, partition, set.setToSend) } else { - valBytes, err := encode(setToSend) + // When compression is enabled, the entire set for each partition is compressed + // and sent as the payload of a single fake "message" with the appropriate codec + // set and no key. When the server sees a message with a compression codec, it + // decompresses the payload and treats the result as its message set. + payload, err := encode(set.setToSend) if err != nil { Logger.Println(err) // if this happens, it's basically our fault. panic(err) } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) + req.AddMessage(topic, partition, &Message{ + Codec: ps.parent.conf.Producer.Compression, + Key: nil, + Value: payload, + }) } } } - if empty { + return req +} + +func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs []*ProducerMessage)) { + for topic, partitionSet := range ps.msgs { + for partition, set := range partitionSet { + cb(topic, partition, set.msgs) + } + } +} + +func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage { + if ps.msgs[topic] == nil { return nil } - return req + set := ps.msgs[topic][partition] + if set == nil { + return nil + } + ps.bufferBytes -= set.bufferBytes + ps.bufferCount -= len(set.msgs) + delete(ps.msgs[topic], partition) + return set.msgs +} + +func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { + switch { + // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. + case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): + return true + // Would we overflow the size-limit of a compressed message-batch for this partition? + case ps.parent.conf.Producer.Compression != CompressionNone && + ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && + ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes: + return true + // Would we overflow simply in number of messages? + case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: + return true + default: + return false + } +} + +func (ps *produceSet) readyToFlush() bool { + switch { + // If we don't have any messages, nothing else matters + case ps.empty(): + return false + // If all three config values are 0, we always flush as-fast-as-possible + case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0: + return true + // If we've passed the message trigger-point + case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages: + return true + // If we've passed the byte trigger-point + case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes: + return true + default: + return false + } +} + +func (ps *produceSet) empty() bool { + return ps.bufferCount == 0 +} + +// utility functions + +func (p *asyncProducer) shutdown() { + Logger.Println("Producer shutting down.") + p.inFlight.Add(1) + p.input <- &ProducerMessage{flags: shutdown} + + p.inFlight.Wait() + + if p.ownClient { + err := p.client.Close() + if err != nil { + Logger.Println("producer/shutdown failed to close the embedded client:", err) + } + } + + close(p.input) + close(p.retries) + close(p.errors) + close(p.successes) } func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { @@ -875,17 +971,12 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { for _, msg := range batch { - if msg != nil { - p.returnError(msg, err) - } + p.returnError(msg, err) } } func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { for _, msg := range batch { - if msg == nil { - continue - } if p.conf.Producer.Return.Successes { msg.clear() p.successes <- msg @@ -894,17 +985,18 @@ func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { } } +func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) { + if msg.retries >= p.conf.Producer.Retry.Max { + p.returnError(msg, err) + } else { + msg.retries++ + p.retries <- msg + } +} + func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) { for _, msg := range batch { - if msg == nil { - continue - } - if msg.retries >= p.conf.Producer.Retry.Max { - p.returnError(msg, err) - } else { - msg.retries++ - p.retries <- msg - } + p.retryMessage(msg, err) } } diff --git a/async_producer_test.go b/async_producer_test.go index 7306256aa..9aa13da53 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -320,7 +320,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { leader.Returns(prodSuccess) config := NewConfig() - config.Producer.Flush.Messages = 3 + config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Partitioner = NewManualPartitioner producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) @@ -330,8 +330,8 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { for flush := 0; flush < 3; flush++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)} - producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)} producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)} + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)} expectResults(t, producer, 1, 2) }