From 4fec243aca46cdcbbacbccb427930cc22afc2ac9 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 19:36:05 -0400 Subject: [PATCH 01/26] Introduce the concept of a produceSet Useless abstraction right this instant, but will be very useful soon. --- async_producer.go | 62 ++++++++++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/async_producer.go b/async_producer.go index 6d29c76bd..4f69fe1b0 100644 --- a/async_producer.go +++ b/async_producer.go @@ -647,8 +647,8 @@ func (f *flusher) run() { continue } - msgSets := f.groupAndFilter(batch) - request := f.parent.buildRequest(msgSets) + set := f.groupAndFilter(batch) + request := set.buildRequest() if request == nil { continue } @@ -676,14 +676,14 @@ func (f *flusher) run() { continue } - f.parseResponse(msgSets, response) + f.parseResponse(set, response) } Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } -func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { +func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { var err error - msgSets := make(map[string]map[int32][]*ProducerMessage) + set := newProduceSet(f.parent.conf) for i, msg := range batch { @@ -718,22 +718,16 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32] } } - partitionSet := msgSets[msg.Topic] - if partitionSet == nil { - partitionSet = make(map[int32][]*ProducerMessage) - msgSets[msg.Topic] = partitionSet - } - - partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) + set.add(msg) } - return msgSets + return set } -func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) { +func (f *flusher) parseResponse(set *produceSet, response *ProduceResponse) { // we iterate through the blocks in the request set, not the response, so that we notice // if the response is missing a block completely - for topic, partitionSet := range msgSets { + for topic, partitionSet := range set.msgs { for partition, msgs := range partitionSet { block := response.GetBlock(topic, partition) if block == nil { @@ -793,6 +787,30 @@ func (p *asyncProducer) retryHandler() { } } +// produceSet + +type produceSet struct { + msgs map[string]map[int32][]*ProducerMessage + conf *Config +} + +func newProduceSet(conf *Config) *produceSet { + return &produceSet{ + msgs: make(map[string]map[int32][]*ProducerMessage), + conf: conf, + } +} + +func (ps *produceSet) add(msg *ProducerMessage) { + partitionSet := ps.msgs[msg.Topic] + if partitionSet == nil { + partitionSet = make(map[int32][]*ProducerMessage) + ps.msgs[msg.Topic] = partitionSet + } + + partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) +} + // utility functions func (p *asyncProducer) shutdown() { @@ -815,17 +833,17 @@ func (p *asyncProducer) shutdown() { close(p.successes) } -func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest { +func (ps *produceSet) buildRequest() *ProduceRequest { - req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)} + req := &ProduceRequest{RequiredAcks: ps.conf.Producer.RequiredAcks, Timeout: int32(ps.conf.Producer.Timeout / time.Millisecond)} empty := true - for topic, partitionSet := range batch { + for topic, partitionSet := range ps.msgs { for partition, msgSet := range partitionSet { setToSend := new(MessageSet) setSize := 0 for _, msg := range msgSet { - if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes { + if ps.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > ps.conf.Producer.MaxMessageBytes { // compression causes message-sets to be wrapped as single messages, which have tighter // size requirements, so we have to respect those limits valBytes, err := encode(setToSend) @@ -833,7 +851,7 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa Logger.Println(err) // if this happens, it's basically our fault. panic(err) } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) + req.AddMessage(topic, partition, &Message{Codec: ps.conf.Producer.Compression, Key: nil, Value: valBytes}) setToSend = new(MessageSet) setSize = 0 } @@ -843,7 +861,7 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa empty = false } - if p.conf.Producer.Compression == CompressionNone { + if ps.conf.Producer.Compression == CompressionNone { req.AddSet(topic, partition, setToSend) } else { valBytes, err := encode(setToSend) @@ -851,7 +869,7 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa Logger.Println(err) // if this happens, it's basically our fault. panic(err) } - req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes}) + req.AddMessage(topic, partition, &Message{Codec: ps.conf.Producer.Compression, Key: nil, Value: valBytes}) } } } From ab96424cb7c8a90e23620c3a4fb6753e8b392dcb Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 19:36:36 -0400 Subject: [PATCH 02/26] Reorder some functions --- async_producer.go | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/async_producer.go b/async_producer.go index 4f69fe1b0..9d756b139 100644 --- a/async_producer.go +++ b/async_producer.go @@ -811,28 +811,6 @@ func (ps *produceSet) add(msg *ProducerMessage) { partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) } -// utility functions - -func (p *asyncProducer) shutdown() { - Logger.Println("Producer shutting down.") - p.inFlight.Add(1) - p.input <- &ProducerMessage{flags: shutdown} - - p.inFlight.Wait() - - if p.ownClient { - err := p.client.Close() - if err != nil { - Logger.Println("producer/shutdown failed to close the embedded client:", err) - } - } - - close(p.input) - close(p.retries) - close(p.errors) - close(p.successes) -} - func (ps *produceSet) buildRequest() *ProduceRequest { req := &ProduceRequest{RequiredAcks: ps.conf.Producer.RequiredAcks, Timeout: int32(ps.conf.Producer.Timeout / time.Millisecond)} @@ -880,6 +858,28 @@ func (ps *produceSet) buildRequest() *ProduceRequest { return req } +// utility functions + +func (p *asyncProducer) shutdown() { + Logger.Println("Producer shutting down.") + p.inFlight.Add(1) + p.input <- &ProducerMessage{flags: shutdown} + + p.inFlight.Wait() + + if p.ownClient { + err := p.client.Close() + if err != nil { + Logger.Println("producer/shutdown failed to close the embedded client:", err) + } + } + + close(p.input) + close(p.retries) + close(p.errors) + close(p.successes) +} + func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { msg.clear() pErr := &ProducerError{Msg: msg, Err: err} From 54d226db587d3030b9ffe500ca7059f633de8f8b Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 19:41:59 -0400 Subject: [PATCH 03/26] Simplify parseResponse into parseBlock Add an eachPartition to the produceSet --- async_producer.go | 72 +++++++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/async_producer.go b/async_producer.go index 9d756b139..5a7388d7f 100644 --- a/async_producer.go +++ b/async_producer.go @@ -676,7 +676,11 @@ func (f *flusher) run() { continue } - f.parseResponse(set, response) + // we iterate through the blocks in the request set, not the response, so that we notice + // if the response is missing a block completely + set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + f.parseBlock(topic, partition, msgs, response) + }) } Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } @@ -724,39 +728,33 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { return set } -func (f *flusher) parseResponse(set *produceSet, response *ProduceResponse) { - // we iterate through the blocks in the request set, not the response, so that we notice - // if the response is missing a block completely - for topic, partitionSet := range set.msgs { - for partition, msgs := range partitionSet { - block := response.GetBlock(topic, partition) - if block == nil { - f.parent.returnErrors(msgs, ErrIncompleteResponse) - continue - } +func (f *flusher) parseBlock(topic string, partition int32, msgs []*ProducerMessage, response *ProduceResponse) { + block := response.GetBlock(topic, partition) + if block == nil { + f.parent.returnErrors(msgs, ErrIncompleteResponse) + return + } - switch block.Err { - // Success - case ErrNoError: - for i := range msgs { - msgs[i].Offset = block.Offset + int64(i) - } - f.parent.returnSuccesses(msgs) - // Retriable errors - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: - Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", - f.broker.ID(), topic, partition, block.Err) - if f.currentRetries[topic] == nil { - f.currentRetries[topic] = make(map[int32]error) - } - f.currentRetries[topic][partition] = block.Err - f.parent.retryMessages(msgs, block.Err) - // Other non-retriable errors - default: - f.parent.returnErrors(msgs, block.Err) - } - } + switch block.Err { + // Success + case ErrNoError: + for i := range msgs { + msgs[i].Offset = block.Offset + int64(i) + } + f.parent.returnSuccesses(msgs) + // Retriable errors + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", + f.broker.ID(), topic, partition, block.Err) + if f.currentRetries[topic] == nil { + f.currentRetries[topic] = make(map[int32]error) + } + f.currentRetries[topic][partition] = block.Err + f.parent.retryMessages(msgs, block.Err) + // Other non-retriable errors + default: + f.parent.returnErrors(msgs, block.Err) } } @@ -858,6 +856,14 @@ func (ps *produceSet) buildRequest() *ProduceRequest { return req } +func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs []*ProducerMessage)) { + for topic, partitionSet := range ps.msgs { + for partition, msgs := range partitionSet { + cb(topic, partition, msgs) + } + } +} + // utility functions func (p *asyncProducer) shutdown() { From a5d3f67385393ddb126fd3569da60c96c05e8286 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 19:48:19 -0400 Subject: [PATCH 04/26] Use eachPartition more Now that nobody uses `batch` after the set is constructed, we no longer have to nil out its entries when we return errors. --- async_producer.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/async_producer.go b/async_producer.go index 5a7388d7f..3dd428326 100644 --- a/async_producer.go +++ b/async_producer.go @@ -659,20 +659,26 @@ func (f *flusher) run() { case nil: break case PacketEncodingError: - f.parent.returnErrors(batch, err) + set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + f.parent.returnErrors(msgs, err) + }) continue default: Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err) f.parent.abandonBrokerConnection(f.broker) _ = f.broker.Close() closing = err - f.parent.retryMessages(batch, err) + set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + f.parent.retryMessages(msgs, err) + }) continue } if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - f.parent.returnSuccesses(batch) + set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + f.parent.returnSuccesses(msgs) + }) continue } @@ -689,12 +695,11 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { var err error set := newProduceSet(f.parent.conf) - for i, msg := range batch { + for _, msg := range batch { if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { // we're currently retrying this partition so we need to filter out this message f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - batch[i] = nil if msg.flags&chaser == chaser { // ...but now we can start processing future messages again @@ -709,7 +714,6 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { if msg.Key != nil { if msg.keyCache, err = msg.Key.Encode(); err != nil { f.parent.returnError(msg, err) - batch[i] = nil continue } } @@ -717,7 +721,6 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { if msg.Value != nil { if msg.valueCache, err = msg.Value.Encode(); err != nil { f.parent.returnError(msg, err) - batch[i] = nil continue } } From e7b10b0defeb15bfe58e2d31c588a3eba82341de Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 20:04:38 -0400 Subject: [PATCH 05/26] Construct the produceSet in the aggregator Temporary breakage ensues. --- async_producer.go | 51 ++++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/async_producer.go b/async_producer.go index 3dd428326..763ab7f72 100644 --- a/async_producer.go +++ b/async_producer.go @@ -512,13 +512,14 @@ func (pp *partitionProducer) updateLeader() error { // one per broker, constructs both an aggregator and a flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { input := make(chan *ProducerMessage) - bridge := make(chan []*ProducerMessage) + bridge := make(chan *produceSet) a := &aggregator{ parent: p, broker: broker, input: input, output: bridge, + buffer: newProduceSet(p.conf), } go withRecover(a.run) @@ -539,15 +540,16 @@ type aggregator struct { parent *asyncProducer broker *Broker input <-chan *ProducerMessage - output chan<- []*ProducerMessage + output chan<- *produceSet - buffer []*ProducerMessage + buffer *produceSet bufferBytes int + bufferCount int timer <-chan time.Time } func (a *aggregator) run() { - var output chan<- []*ProducerMessage + var output chan<- *produceSet for { select { @@ -563,8 +565,9 @@ func (a *aggregator) run() { output = nil } - a.buffer = append(a.buffer, msg) + a.buffer.add(msg) a.bufferBytes += msg.byteSize() + a.bufferCount++ if a.readyToFlush(msg) { output = a.output @@ -580,7 +583,7 @@ func (a *aggregator) run() { } shutdown: - if len(a.buffer) > 0 { + if a.bufferCount > 0 { a.output <- a.buffer } close(a.output) @@ -595,7 +598,7 @@ func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool { case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? - case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages: + case a.parent.conf.Producer.Flush.MaxMessages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.MaxMessages: return true default: return false @@ -611,7 +614,7 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { case msg.flags&chaser == chaser: return true // If we've passed the message trigger-point - case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages: + case a.parent.conf.Producer.Flush.Messages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.Messages: return true // If we've passed the byte trigger-point case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes: @@ -623,15 +626,16 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { func (a *aggregator) reset() { a.timer = nil - a.buffer = nil + a.buffer = newProduceSet(a.parent.conf) a.bufferBytes = 0 + a.bufferCount = 0 } // takes a batch at a time from the aggregator and sends to the broker type flusher struct { parent *asyncProducer broker *Broker - input <-chan []*ProducerMessage + input <-chan *produceSet currentRetries map[string]map[int32]error } @@ -641,13 +645,15 @@ func (f *flusher) run() { Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID()) - for batch := range f.input { + for set := range f.input { if closing != nil { - f.parent.retryMessages(batch, closing) + set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + f.parent.retryMessages(msgs, closing) + }) continue } - set := f.groupAndFilter(batch) + set.eachPartition(f.filter) request := set.buildRequest() if request == nil { continue @@ -691,15 +697,15 @@ func (f *flusher) run() { Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } -func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { +func (f *flusher) filter(topic string, partition int32, batch []*ProducerMessage) { var err error - set := newProduceSet(f.parent.conf) - for _, msg := range batch { + for i, msg := range batch { if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { // we're currently retrying this partition so we need to filter out this message f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) + batch[i] = nil if msg.flags&chaser == chaser { // ...but now we can start processing future messages again @@ -714,6 +720,7 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { if msg.Key != nil { if msg.keyCache, err = msg.Key.Encode(); err != nil { f.parent.returnError(msg, err) + batch[i] = nil continue } } @@ -721,14 +728,11 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet { if msg.Value != nil { if msg.valueCache, err = msg.Value.Encode(); err != nil { f.parent.returnError(msg, err) + batch[i] = nil continue } } - - set.add(msg) } - - return set } func (f *flusher) parseBlock(topic string, partition int32, msgs []*ProducerMessage, response *ProduceResponse) { @@ -742,7 +746,9 @@ func (f *flusher) parseBlock(topic string, partition int32, msgs []*ProducerMess // Success case ErrNoError: for i := range msgs { - msgs[i].Offset = block.Offset + int64(i) + if msgs[i] != nil { + msgs[i].Offset = block.Offset + int64(i) // FIXME: offsets wrong now + } } f.parent.returnSuccesses(msgs) // Retriable errors @@ -822,6 +828,9 @@ func (ps *produceSet) buildRequest() *ProduceRequest { setToSend := new(MessageSet) setSize := 0 for _, msg := range msgSet { + if msg == nil { + continue + } if ps.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > ps.conf.Producer.MaxMessageBytes { // compression causes message-sets to be wrapped as single messages, which have tighter // size requirements, so we have to respect those limits From 3d9d041cc9d3d83f5439d942116939b856970d42 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 20:09:57 -0400 Subject: [PATCH 06/26] Move byte and msg-count tracking into the set --- async_producer.go | 96 ++++++++++++++++++++++++----------------------- 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/async_producer.go b/async_producer.go index 763ab7f72..14bcb868a 100644 --- a/async_producer.go +++ b/async_producer.go @@ -542,10 +542,8 @@ type aggregator struct { input <-chan *ProducerMessage output chan<- *produceSet - buffer *produceSet - bufferBytes int - bufferCount int - timer <-chan time.Time + buffer *produceSet + timer <-chan time.Time } func (a *aggregator) run() { @@ -558,7 +556,7 @@ func (a *aggregator) run() { goto shutdown } - if a.wouldOverflow(msg) { + if a.buffer.wouldOverflow(msg) { Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID()) a.output <- a.buffer a.reset() @@ -566,10 +564,8 @@ func (a *aggregator) run() { } a.buffer.add(msg) - a.bufferBytes += msg.byteSize() - a.bufferCount++ - if a.readyToFlush(msg) { + if a.buffer.readyToFlush(msg) { output = a.output } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil { a.timer = time.After(a.parent.conf.Producer.Flush.Frequency) @@ -583,52 +579,15 @@ func (a *aggregator) run() { } shutdown: - if a.bufferCount > 0 { + if !a.buffer.empty() { a.output <- a.buffer } close(a.output) } -func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool { - switch { - // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. - case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): - return true - // Would we overflow the size-limit of a compressed message-batch? - case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes: - return true - // Would we overflow simply in number of messages? - case a.parent.conf.Producer.Flush.MaxMessages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.MaxMessages: - return true - default: - return false - } -} - -func (a *aggregator) readyToFlush(msg *ProducerMessage) bool { - switch { - // If all three config values are 0, we always flush as-fast-as-possible - case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0: - return true - // If the messages is a chaser we must flush to maintain the state-machine - case msg.flags&chaser == chaser: - return true - // If we've passed the message trigger-point - case a.parent.conf.Producer.Flush.Messages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.Messages: - return true - // If we've passed the byte trigger-point - case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes: - return true - default: - return false - } -} - func (a *aggregator) reset() { a.timer = nil a.buffer = newProduceSet(a.parent.conf) - a.bufferBytes = 0 - a.bufferCount = 0 } // takes a batch at a time from the aggregator and sends to the broker @@ -799,6 +758,9 @@ func (p *asyncProducer) retryHandler() { type produceSet struct { msgs map[string]map[int32][]*ProducerMessage conf *Config + + bufferBytes int + bufferCount int } func newProduceSet(conf *Config) *produceSet { @@ -816,6 +778,9 @@ func (ps *produceSet) add(msg *ProducerMessage) { } partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) + + ps.bufferBytes += msg.byteSize() + ps.bufferCount++ } func (ps *produceSet) buildRequest() *ProduceRequest { @@ -876,6 +841,45 @@ func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs } } +func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { + switch { + // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. + case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): + return true + // Would we overflow the size-limit of a compressed message-batch? + case ps.conf.Producer.Compression != CompressionNone && ps.bufferBytes+msg.byteSize() >= ps.conf.Producer.MaxMessageBytes: + return true + // Would we overflow simply in number of messages? + case ps.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.conf.Producer.Flush.MaxMessages: + return true + default: + return false + } +} + +func (ps *produceSet) readyToFlush(msg *ProducerMessage) bool { + switch { + // If all three config values are 0, we always flush as-fast-as-possible + case ps.conf.Producer.Flush.Frequency == 0 && ps.conf.Producer.Flush.Bytes == 0 && ps.conf.Producer.Flush.Messages == 0: + return true + // If the messages is a chaser we must flush to maintain the state-machine + case msg.flags&chaser == chaser: + return true + // If we've passed the message trigger-point + case ps.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.conf.Producer.Flush.Messages: + return true + // If we've passed the byte trigger-point + case ps.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.conf.Producer.Flush.Bytes: + return true + default: + return false + } +} + +func (ps *produceSet) empty() bool { + return ps.bufferCount == 0 +} + // utility functions func (p *asyncProducer) shutdown() { From f39d3be61572d25277c624550f908e59bb9b21e6 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 20:15:30 -0400 Subject: [PATCH 07/26] Move key and value encoding back to buildRequest Get rid of keyCache and valueCache --- async_producer.go | 78 ++++++++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 42 deletions(-) diff --git a/async_producer.go b/async_producer.go index 14bcb868a..679af90ea 100644 --- a/async_producer.go +++ b/async_producer.go @@ -137,8 +137,6 @@ type ProducerMessage struct { retries int flags flagSet - - keyCache, valueCache []byte } func (m *ProducerMessage) byteSize() int { @@ -155,8 +153,6 @@ func (m *ProducerMessage) byteSize() int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 - m.keyCache = nil - m.valueCache = nil } // ProducerError is the type of error generated when the producer fails to deliver a message. @@ -519,7 +515,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag broker: broker, input: input, output: bridge, - buffer: newProduceSet(p.conf), + buffer: newProduceSet(p), } go withRecover(a.run) @@ -587,7 +583,7 @@ shutdown: func (a *aggregator) reset() { a.timer = nil - a.buffer = newProduceSet(a.parent.conf) + a.buffer = newProduceSet(a.parent) } // takes a batch at a time from the aggregator and sends to the broker @@ -657,10 +653,7 @@ func (f *flusher) run() { } func (f *flusher) filter(topic string, partition int32, batch []*ProducerMessage) { - var err error - for i, msg := range batch { - if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { // we're currently retrying this partition so we need to filter out this message f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) @@ -675,22 +668,6 @@ func (f *flusher) filter(topic string, partition int32, batch []*ProducerMessage continue } - - if msg.Key != nil { - if msg.keyCache, err = msg.Key.Encode(); err != nil { - f.parent.returnError(msg, err) - batch[i] = nil - continue - } - } - - if msg.Value != nil { - if msg.valueCache, err = msg.Value.Encode(); err != nil { - f.parent.returnError(msg, err) - batch[i] = nil - continue - } - } } } @@ -756,17 +733,17 @@ func (p *asyncProducer) retryHandler() { // produceSet type produceSet struct { - msgs map[string]map[int32][]*ProducerMessage - conf *Config + msgs map[string]map[int32][]*ProducerMessage + parent *asyncProducer bufferBytes int bufferCount int } -func newProduceSet(conf *Config) *produceSet { +func newProduceSet(parent *asyncProducer) *produceSet { return &produceSet{ - msgs: make(map[string]map[int32][]*ProducerMessage), - conf: conf, + msgs: make(map[string]map[int32][]*ProducerMessage), + parent: parent, } } @@ -785,18 +762,35 @@ func (ps *produceSet) add(msg *ProducerMessage) { func (ps *produceSet) buildRequest() *ProduceRequest { - req := &ProduceRequest{RequiredAcks: ps.conf.Producer.RequiredAcks, Timeout: int32(ps.conf.Producer.Timeout / time.Millisecond)} + req := &ProduceRequest{RequiredAcks: ps.parent.conf.Producer.RequiredAcks, Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond)} empty := true for topic, partitionSet := range ps.msgs { for partition, msgSet := range partitionSet { setToSend := new(MessageSet) setSize := 0 - for _, msg := range msgSet { + for i, msg := range msgSet { if msg == nil { continue } - if ps.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > ps.conf.Producer.MaxMessageBytes { + var err error + var key, val []byte + if msg.Key != nil { + if key, err = msg.Key.Encode(); err != nil { + ps.parent.returnError(msg, err) + msgSet[i] = nil + continue + } + } + + if msg.Value != nil { + if val, err = msg.Value.Encode(); err != nil { + ps.parent.returnError(msg, err) + msgSet[i] = nil + continue + } + } + if ps.parent.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > ps.parent.conf.Producer.MaxMessageBytes { // compression causes message-sets to be wrapped as single messages, which have tighter // size requirements, so we have to respect those limits valBytes, err := encode(setToSend) @@ -804,17 +798,17 @@ func (ps *produceSet) buildRequest() *ProduceRequest { Logger.Println(err) // if this happens, it's basically our fault. panic(err) } - req.AddMessage(topic, partition, &Message{Codec: ps.conf.Producer.Compression, Key: nil, Value: valBytes}) + req.AddMessage(topic, partition, &Message{Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: valBytes}) setToSend = new(MessageSet) setSize = 0 } setSize += msg.byteSize() - setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache}) + setToSend.addMessage(&Message{Codec: CompressionNone, Key: key, Value: val}) empty = false } - if ps.conf.Producer.Compression == CompressionNone { + if ps.parent.conf.Producer.Compression == CompressionNone { req.AddSet(topic, partition, setToSend) } else { valBytes, err := encode(setToSend) @@ -822,7 +816,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { Logger.Println(err) // if this happens, it's basically our fault. panic(err) } - req.AddMessage(topic, partition, &Message{Codec: ps.conf.Producer.Compression, Key: nil, Value: valBytes}) + req.AddMessage(topic, partition, &Message{Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: valBytes}) } } } @@ -847,10 +841,10 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): return true // Would we overflow the size-limit of a compressed message-batch? - case ps.conf.Producer.Compression != CompressionNone && ps.bufferBytes+msg.byteSize() >= ps.conf.Producer.MaxMessageBytes: + case ps.parent.conf.Producer.Compression != CompressionNone && ps.bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? - case ps.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.conf.Producer.Flush.MaxMessages: + case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: return true default: return false @@ -860,16 +854,16 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { func (ps *produceSet) readyToFlush(msg *ProducerMessage) bool { switch { // If all three config values are 0, we always flush as-fast-as-possible - case ps.conf.Producer.Flush.Frequency == 0 && ps.conf.Producer.Flush.Bytes == 0 && ps.conf.Producer.Flush.Messages == 0: + case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0: return true // If the messages is a chaser we must flush to maintain the state-machine case msg.flags&chaser == chaser: return true // If we've passed the message trigger-point - case ps.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.conf.Producer.Flush.Messages: + case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages: return true // If we've passed the byte trigger-point - case ps.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.conf.Producer.Flush.Bytes: + case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes: return true default: return false From 3405614c7735747a4ffe2585550d2dea020d391c Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 20:18:21 -0400 Subject: [PATCH 08/26] Fix offsets --- async_producer.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/async_producer.go b/async_producer.go index 679af90ea..60a9bd2ff 100644 --- a/async_producer.go +++ b/async_producer.go @@ -681,9 +681,11 @@ func (f *flusher) parseBlock(topic string, partition int32, msgs []*ProducerMess switch block.Err { // Success case ErrNoError: - for i := range msgs { - if msgs[i] != nil { - msgs[i].Offset = block.Offset + int64(i) // FIXME: offsets wrong now + i := 0 + for _, msg := range msgs { + if msg != nil { + msg.Offset = block.Offset + int64(i) + i++ } } f.parent.returnSuccesses(msgs) From b72df33fe3923dd905369c30545cbf712352f499 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 20:37:47 -0400 Subject: [PATCH 09/26] Communicate responses back to the aggregator --- async_producer.go | 64 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/async_producer.go b/async_producer.go index 60a9bd2ff..18a7d9752 100644 --- a/async_producer.go +++ b/async_producer.go @@ -507,15 +507,21 @@ func (pp *partitionProducer) updateLeader() error { // one per broker, constructs both an aggregator and a flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { - input := make(chan *ProducerMessage) - bridge := make(chan *produceSet) + var ( + input = make(chan *ProducerMessage) + bridge = make(chan *produceSet) + responses = make(chan *ProduceResponse) + errors = make(chan error) + ) a := &aggregator{ - parent: p, - broker: broker, - input: input, - output: bridge, - buffer: newProduceSet(p), + parent: p, + broker: broker, + input: input, + output: bridge, + responses: responses, + errors: errors, + buffer: newProduceSet(p), } go withRecover(a.run) @@ -523,6 +529,8 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag parent: p, broker: broker, input: bridge, + responses: responses, + errors: errors, currentRetries: make(map[string]map[int32]error), } go withRecover(f.run) @@ -536,7 +544,10 @@ type aggregator struct { parent *asyncProducer broker *Broker input <-chan *ProducerMessage - output chan<- *produceSet + + output chan<- *produceSet + responses <-chan *ProduceResponse + errors <-chan error buffer *produceSet timer <-chan time.Time @@ -554,7 +565,14 @@ func (a *aggregator) run() { if a.buffer.wouldOverflow(msg) { Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID()) - a.output <- a.buffer + for a.buffer != nil { + select { + case a.output <- a.buffer: + a.buffer = nil + case <-a.responses: + case <-a.errors: + } + } a.reset() output = nil } @@ -571,12 +589,25 @@ func (a *aggregator) run() { case output <- a.buffer: a.reset() output = nil + case <-a.responses: + case <-a.errors: } } shutdown: if !a.buffer.empty() { - a.output <- a.buffer + for a.buffer != nil { + select { + case a.output <- a.buffer: + a.buffer = nil + case <-a.responses: + case <-a.errors: + } + } + select { + case <-a.responses: + case <-a.errors: + } } close(a.output) } @@ -588,9 +619,11 @@ func (a *aggregator) reset() { // takes a batch at a time from the aggregator and sends to the broker type flusher struct { - parent *asyncProducer - broker *Broker - input <-chan *produceSet + parent *asyncProducer + broker *Broker + input <-chan *produceSet + responses chan<- *ProduceResponse + errors chan<- error currentRetries map[string]map[int32]error } @@ -605,12 +638,14 @@ func (f *flusher) run() { set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { f.parent.retryMessages(msgs, closing) }) + f.errors <- closing continue } set.eachPartition(f.filter) request := set.buildRequest() if request == nil { + f.errors <- nil continue } @@ -623,6 +658,7 @@ func (f *flusher) run() { set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { f.parent.returnErrors(msgs, err) }) + f.errors <- err continue default: Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err) @@ -632,9 +668,11 @@ func (f *flusher) run() { set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { f.parent.retryMessages(msgs, err) }) + f.errors <- err continue } + f.responses <- response if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { From ef50cfe6b15eeabc45506a7379c7560a05fab8e6 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 20:50:08 -0400 Subject: [PATCH 10/26] Move closing and currentRetry logic into aggregator This simplifies a lot of things because we can filter out messages before we batch them (except when they fail to encode). --- async_producer.go | 267 +++++++++++++++++++++++----------------------- 1 file changed, 134 insertions(+), 133 deletions(-) diff --git a/async_producer.go b/async_producer.go index 18a7d9752..94c9a9ff3 100644 --- a/async_producer.go +++ b/async_producer.go @@ -515,24 +515,24 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag ) a := &aggregator{ - parent: p, - broker: broker, - input: input, - output: bridge, - responses: responses, - errors: errors, - buffer: newProduceSet(p), - } - go withRecover(a.run) - - f := &flusher{ parent: p, broker: broker, - input: bridge, + input: input, + output: bridge, responses: responses, errors: errors, + buffer: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } + go withRecover(a.run) + + f := &flusher{ + parent: p, + broker: broker, + input: bridge, + responses: responses, + errors: errors, + } go withRecover(f.run) return input @@ -549,8 +549,12 @@ type aggregator struct { responses <-chan *ProduceResponse errors <-chan error - buffer *produceSet - timer <-chan time.Time + buffer *produceSet + pending *produceSet + timer <-chan time.Time + + closing error + currentRetries map[string]map[int32]error } func (a *aggregator) run() { @@ -563,17 +567,38 @@ func (a *aggregator) run() { goto shutdown } + if a.closing != nil { + a.parent.retryMessages([]*ProducerMessage{msg}, a.closing) + continue + } + + if a.currentRetries[msg.Topic] != nil && a.currentRetries[msg.Topic][msg.Partition] != nil { + // we're currently retrying this partition so we need to filter out this message + a.parent.retryMessages([]*ProducerMessage{msg}, a.currentRetries[msg.Topic][msg.Partition]) + + if msg.flags&chaser == chaser { + // ...but now we can start processing future messages again + Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", + a.broker.ID(), msg.Topic, msg.Partition) + delete(a.currentRetries[msg.Topic], msg.Partition) + } + + continue + } + if a.buffer.wouldOverflow(msg) { Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID()) - for a.buffer != nil { + if a.pending != nil { select { - case a.output <- a.buffer: - a.buffer = nil - case <-a.responses: - case <-a.errors: + case response := <-a.responses: + a.handleResponse(response) + case err := <-a.errors: + a.handleError(err) } + } - a.reset() + a.output <- a.buffer + a.rollOver() output = nil } @@ -587,36 +612,107 @@ func (a *aggregator) run() { case <-a.timer: output = a.output case output <- a.buffer: - a.reset() + a.rollOver() output = nil - case <-a.responses: - case <-a.errors: + case response := <-a.responses: + a.handleResponse(response) + case err := <-a.errors: + a.handleError(err) } } shutdown: - if !a.buffer.empty() { - for a.buffer != nil { - select { - case a.output <- a.buffer: - a.buffer = nil - case <-a.responses: - case <-a.errors: - } + if a.pending != nil { + select { + case response := <-a.responses: + a.handleResponse(response) + case err := <-a.errors: + a.handleError(err) } + } + if !a.buffer.empty() { + a.output <- a.buffer + a.rollOver() select { - case <-a.responses: - case <-a.errors: + case response := <-a.responses: + a.handleResponse(response) + case err := <-a.errors: + a.handleError(err) } } close(a.output) } -func (a *aggregator) reset() { +func (a *aggregator) rollOver() { a.timer = nil + a.pending = a.buffer a.buffer = newProduceSet(a.parent) } +func (a *aggregator) handleResponse(response *ProduceResponse) { + // we iterate through the blocks in the request set, not the response, so that we notice + // if the response is missing a block completely + a.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + if response == nil { + // this only happens when RequiredAcks is NoResponse, so we have to assume success + a.parent.returnSuccesses(msgs) + return + } + + block := response.GetBlock(topic, partition) + if block == nil { + a.parent.returnErrors(msgs, ErrIncompleteResponse) + return + } + + switch block.Err { + // Success + case ErrNoError: + i := 0 + for _, msg := range msgs { + if msg != nil { + msg.Offset = block.Offset + int64(i) + i++ + } + } + a.parent.returnSuccesses(msgs) + // Retriable errors + case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, + ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: + Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", + a.broker.ID(), topic, partition, block.Err) + if a.currentRetries[topic] == nil { + a.currentRetries[topic] = make(map[int32]error) + } + a.currentRetries[topic][partition] = block.Err + a.parent.retryMessages(msgs, block.Err) + // Other non-retriable errors + default: + a.parent.returnErrors(msgs, block.Err) + } + }) + a.pending = nil +} + +func (a *aggregator) handleError(err error) { + switch err.(type) { + case nil: + break + case PacketEncodingError: + a.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + a.parent.returnErrors(msgs, err) + }) + default: + Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", a.broker.ID(), err) + a.parent.abandonBrokerConnection(a.broker) + _ = a.broker.Close() + a.closing = err + a.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + a.parent.retryMessages(msgs, err) + }) + } +} + // takes a batch at a time from the aggregator and sends to the broker type flusher struct { parent *asyncProducer @@ -624,25 +720,12 @@ type flusher struct { input <-chan *produceSet responses chan<- *ProduceResponse errors chan<- error - - currentRetries map[string]map[int32]error } func (f *flusher) run() { - var closing error - Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID()) for set := range f.input { - if closing != nil { - set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - f.parent.retryMessages(msgs, closing) - }) - f.errors <- closing - continue - } - - set.eachPartition(f.filter) request := set.buildRequest() if request == nil { f.errors <- nil @@ -651,96 +734,14 @@ func (f *flusher) run() { response, err := f.broker.Produce(request) - switch err.(type) { - case nil: - break - case PacketEncodingError: - set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - f.parent.returnErrors(msgs, err) - }) - f.errors <- err - continue - default: - Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err) - f.parent.abandonBrokerConnection(f.broker) - _ = f.broker.Close() - closing = err - set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - f.parent.retryMessages(msgs, err) - }) + if err != nil { f.errors <- err - continue - } - - f.responses <- response - if response == nil { - // this only happens when RequiredAcks is NoResponse, so we have to assume success - set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - f.parent.returnSuccesses(msgs) - }) - continue - } - - // we iterate through the blocks in the request set, not the response, so that we notice - // if the response is missing a block completely - set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - f.parseBlock(topic, partition, msgs, response) - }) - } - Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) -} - -func (f *flusher) filter(topic string, partition int32, batch []*ProducerMessage) { - for i, msg := range batch { - if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil { - // we're currently retrying this partition so we need to filter out this message - f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition]) - batch[i] = nil - - if msg.flags&chaser == chaser { - // ...but now we can start processing future messages again - Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", - f.broker.ID(), msg.Topic, msg.Partition) - delete(f.currentRetries[msg.Topic], msg.Partition) - } - - continue + } else { + f.responses <- response } } -} - -func (f *flusher) parseBlock(topic string, partition int32, msgs []*ProducerMessage, response *ProduceResponse) { - block := response.GetBlock(topic, partition) - if block == nil { - f.parent.returnErrors(msgs, ErrIncompleteResponse) - return - } - switch block.Err { - // Success - case ErrNoError: - i := 0 - for _, msg := range msgs { - if msg != nil { - msg.Offset = block.Offset + int64(i) - i++ - } - } - f.parent.returnSuccesses(msgs) - // Retriable errors - case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, - ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: - Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", - f.broker.ID(), topic, partition, block.Err) - if f.currentRetries[topic] == nil { - f.currentRetries[topic] = make(map[int32]error) - } - f.currentRetries[topic][partition] = block.Err - f.parent.retryMessages(msgs, block.Err) - // Other non-retriable errors - default: - f.parent.returnErrors(msgs, block.Err) - } + Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } // singleton From dc5a714d988aa920af2d0e21e9cd27c37bdf499a Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 20:55:36 -0400 Subject: [PATCH 11/26] Rename aggregator to brokerProducer Clean up logging --- async_producer.go | 154 +++++++++++++++++++++++----------------------- 1 file changed, 76 insertions(+), 78 deletions(-) diff --git a/async_producer.go b/async_producer.go index 94c9a9ff3..359032629 100644 --- a/async_producer.go +++ b/async_producer.go @@ -514,7 +514,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag errors = make(chan error) ) - a := &aggregator{ + a := &brokerProducer{ parent: p, broker: broker, input: input, @@ -539,8 +539,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag } // groups messages together into appropriately-sized batches for sending to the broker -// based on https://godoc.org/github.com/eapache/channels#BatchingChannel -type aggregator struct { +type brokerProducer struct { parent *asyncProducer broker *Broker input <-chan *ProducerMessage @@ -557,111 +556,114 @@ type aggregator struct { currentRetries map[string]map[int32]error } -func (a *aggregator) run() { +func (bp *brokerProducer) run() { var output chan<- *produceSet + Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID()) for { select { - case msg := <-a.input: + case msg := <-bp.input: if msg == nil { goto shutdown } - if a.closing != nil { - a.parent.retryMessages([]*ProducerMessage{msg}, a.closing) + if bp.closing != nil { + bp.parent.retryMessages([]*ProducerMessage{msg}, bp.closing) continue } - if a.currentRetries[msg.Topic] != nil && a.currentRetries[msg.Topic][msg.Partition] != nil { + if bp.currentRetries[msg.Topic] != nil && bp.currentRetries[msg.Topic][msg.Partition] != nil { // we're currently retrying this partition so we need to filter out this message - a.parent.retryMessages([]*ProducerMessage{msg}, a.currentRetries[msg.Topic][msg.Partition]) + bp.parent.retryMessages([]*ProducerMessage{msg}, bp.currentRetries[msg.Topic][msg.Partition]) if msg.flags&chaser == chaser { // ...but now we can start processing future messages again - Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n", - a.broker.ID(), msg.Topic, msg.Partition) - delete(a.currentRetries[msg.Topic], msg.Partition) + Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n", + bp.broker.ID(), msg.Topic, msg.Partition) + delete(bp.currentRetries[msg.Topic], msg.Partition) } continue } - if a.buffer.wouldOverflow(msg) { - Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID()) - if a.pending != nil { + if bp.buffer.wouldOverflow(msg) { + Logger.Printf("producer/broker/%d maximum request accumulated, forcing blocking flush\n", bp.broker.ID()) + if bp.pending != nil { select { - case response := <-a.responses: - a.handleResponse(response) - case err := <-a.errors: - a.handleError(err) + case response := <-bp.responses: + bp.handleResponse(response) + case err := <-bp.errors: + bp.handleError(err) } } - a.output <- a.buffer - a.rollOver() + bp.output <- bp.buffer + bp.rollOver() output = nil } - a.buffer.add(msg) + bp.buffer.add(msg) - if a.buffer.readyToFlush(msg) { - output = a.output - } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil { - a.timer = time.After(a.parent.conf.Producer.Flush.Frequency) + if bp.buffer.readyToFlush(msg) { + output = bp.output + } else if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { + bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency) } - case <-a.timer: - output = a.output - case output <- a.buffer: - a.rollOver() + case <-bp.timer: + output = bp.output + case output <- bp.buffer: + bp.rollOver() output = nil - case response := <-a.responses: - a.handleResponse(response) - case err := <-a.errors: - a.handleError(err) + case response := <-bp.responses: + bp.handleResponse(response) + case err := <-bp.errors: + bp.handleError(err) } } shutdown: - if a.pending != nil { + if bp.pending != nil { select { - case response := <-a.responses: - a.handleResponse(response) - case err := <-a.errors: - a.handleError(err) + case response := <-bp.responses: + bp.handleResponse(response) + case err := <-bp.errors: + bp.handleError(err) } } - if !a.buffer.empty() { - a.output <- a.buffer - a.rollOver() + if !bp.buffer.empty() { + bp.output <- bp.buffer + bp.rollOver() select { - case response := <-a.responses: - a.handleResponse(response) - case err := <-a.errors: - a.handleError(err) + case response := <-bp.responses: + bp.handleResponse(response) + case err := <-bp.errors: + bp.handleError(err) } } - close(a.output) + close(bp.output) + + Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } -func (a *aggregator) rollOver() { - a.timer = nil - a.pending = a.buffer - a.buffer = newProduceSet(a.parent) +func (bp *brokerProducer) rollOver() { + bp.timer = nil + bp.pending = bp.buffer + bp.buffer = newProduceSet(bp.parent) } -func (a *aggregator) handleResponse(response *ProduceResponse) { +func (bp *brokerProducer) handleResponse(response *ProduceResponse) { // we iterate through the blocks in the request set, not the response, so that we notice // if the response is missing a block completely - a.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success - a.parent.returnSuccesses(msgs) + bp.parent.returnSuccesses(msgs) return } block := response.GetBlock(topic, partition) if block == nil { - a.parent.returnErrors(msgs, ErrIncompleteResponse) + bp.parent.returnErrors(msgs, ErrIncompleteResponse) return } @@ -675,45 +677,45 @@ func (a *aggregator) handleResponse(response *ProduceResponse) { i++ } } - a.parent.returnSuccesses(msgs) + bp.parent.returnSuccesses(msgs) // Retriable errors case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: - Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n", - a.broker.ID(), topic, partition, block.Err) - if a.currentRetries[topic] == nil { - a.currentRetries[topic] = make(map[int32]error) + Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", + bp.broker.ID(), topic, partition, block.Err) + if bp.currentRetries[topic] == nil { + bp.currentRetries[topic] = make(map[int32]error) } - a.currentRetries[topic][partition] = block.Err - a.parent.retryMessages(msgs, block.Err) + bp.currentRetries[topic][partition] = block.Err + bp.parent.retryMessages(msgs, block.Err) // Other non-retriable errors default: - a.parent.returnErrors(msgs, block.Err) + bp.parent.returnErrors(msgs, block.Err) } }) - a.pending = nil + bp.pending = nil } -func (a *aggregator) handleError(err error) { +func (bp *brokerProducer) handleError(err error) { switch err.(type) { case nil: break case PacketEncodingError: - a.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - a.parent.returnErrors(msgs, err) + bp.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.parent.returnErrors(msgs, err) }) default: - Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", a.broker.ID(), err) - a.parent.abandonBrokerConnection(a.broker) - _ = a.broker.Close() - a.closing = err - a.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { - a.parent.retryMessages(msgs, err) + Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) + bp.parent.abandonBrokerConnection(bp.broker) + _ = bp.broker.Close() + bp.closing = err + bp.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.parent.retryMessages(msgs, err) }) } } -// takes a batch at a time from the aggregator and sends to the broker +// takes a set at a time from the brokerProducer and sends to the broker type flusher struct { parent *asyncProducer broker *Broker @@ -723,8 +725,6 @@ type flusher struct { } func (f *flusher) run() { - Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID()) - for set := range f.input { request := set.buildRequest() if request == nil { @@ -740,8 +740,6 @@ func (f *flusher) run() { f.responses <- response } } - - Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID()) } // singleton From 64ae47472bf2fe097f9fdce6b32affcd86b80618 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 21:09:55 -0400 Subject: [PATCH 12/26] Much needed cleanup --- async_producer.go | 75 +++++++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/async_producer.go b/async_producer.go index 359032629..a005b2d12 100644 --- a/async_producer.go +++ b/async_producer.go @@ -567,38 +567,14 @@ func (bp *brokerProducer) run() { goto shutdown } - if bp.closing != nil { - bp.parent.retryMessages([]*ProducerMessage{msg}, bp.closing) - continue - } - - if bp.currentRetries[msg.Topic] != nil && bp.currentRetries[msg.Topic][msg.Partition] != nil { - // we're currently retrying this partition so we need to filter out this message - bp.parent.retryMessages([]*ProducerMessage{msg}, bp.currentRetries[msg.Topic][msg.Partition]) - - if msg.flags&chaser == chaser { - // ...but now we can start processing future messages again - Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n", - bp.broker.ID(), msg.Topic, msg.Partition) - delete(bp.currentRetries[msg.Topic], msg.Partition) - } - + if reason := bp.retryReason(msg); reason != nil { + bp.parent.retryMessages([]*ProducerMessage{msg}, reason) continue } if bp.buffer.wouldOverflow(msg) { Logger.Printf("producer/broker/%d maximum request accumulated, forcing blocking flush\n", bp.broker.ID()) - if bp.pending != nil { - select { - case response := <-bp.responses: - bp.handleResponse(response) - case err := <-bp.errors: - bp.handleError(err) - } - - } - bp.output <- bp.buffer - bp.rollOver() + bp.flush() output = nil } @@ -622,27 +598,50 @@ func (bp *brokerProducer) run() { } shutdown: - if bp.pending != nil { - select { - case response := <-bp.responses: - bp.handleResponse(response) - case err := <-bp.errors: - bp.handleError(err) + if !bp.buffer.empty() { + bp.flush() + } + bp.wait() + close(bp.output) + + Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) +} + +func (bp *brokerProducer) retryReason(msg *ProducerMessage) error { + if bp.closing != nil { + return bp.closing + } + + if bp.currentRetries[msg.Topic] != nil { + err := bp.currentRetries[msg.Topic][msg.Partition] + if err != nil && msg.flags&chaser == chaser { + // we're currently retrying this partition so we need to filter out this message + // but now we can start processing future messages again + Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n", + bp.broker.ID(), msg.Topic, msg.Partition) + delete(bp.currentRetries[msg.Topic], msg.Partition) } + return err } - if !bp.buffer.empty() { - bp.output <- bp.buffer - bp.rollOver() + return nil +} + +func (bp *brokerProducer) wait() { + if bp.pending != nil { select { case response := <-bp.responses: bp.handleResponse(response) case err := <-bp.errors: bp.handleError(err) } + } - close(bp.output) +} - Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) +func (bp *brokerProducer) flush() { + bp.wait() + bp.output <- bp.buffer + bp.rollOver() } func (bp *brokerProducer) rollOver() { From 2fd7980456724990c862c5d7a9d0e6c6c3f0f470 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 21:31:01 -0400 Subject: [PATCH 13/26] Yet another useless (for now) abstraction --- async_producer.go | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/async_producer.go b/async_producer.go index a005b2d12..50f4185a6 100644 --- a/async_producer.go +++ b/async_producer.go @@ -139,8 +139,10 @@ type ProducerMessage struct { flags flagSet } +const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. + func (m *ProducerMessage) byteSize() int { - size := 26 // the metadata overhead of CRC, flags, etc. + size := producerMessageOverhead if m.Key != nil { size += m.Key.Length() } @@ -770,8 +772,14 @@ func (p *asyncProducer) retryHandler() { // produceSet +type partitionSet struct { + msgs []*ProducerMessage + setToSend *MessageSet + bufferBytes int +} + type produceSet struct { - msgs map[string]map[int32][]*ProducerMessage + msgs map[string]map[int32]*partitionSet parent *asyncProducer bufferBytes int @@ -780,20 +788,26 @@ type produceSet struct { func newProduceSet(parent *asyncProducer) *produceSet { return &produceSet{ - msgs: make(map[string]map[int32][]*ProducerMessage), + msgs: make(map[string]map[int32]*partitionSet), parent: parent, } } func (ps *produceSet) add(msg *ProducerMessage) { - partitionSet := ps.msgs[msg.Topic] - if partitionSet == nil { - partitionSet = make(map[int32][]*ProducerMessage) - ps.msgs[msg.Topic] = partitionSet + partitions := ps.msgs[msg.Topic] + if partitions == nil { + partitions = make(map[int32]*partitionSet) + ps.msgs[msg.Topic] = partitions } - partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg) + set := partitions[msg.Partition] + if set == nil { + set = &partitionSet{setToSend: new(MessageSet)} + partitions[msg.Partition] = set + } + set.msgs = append(set.msgs, msg) + set.bufferBytes += msg.byteSize() ps.bufferBytes += msg.byteSize() ps.bufferCount++ } @@ -807,7 +821,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { for partition, msgSet := range partitionSet { setToSend := new(MessageSet) setSize := 0 - for i, msg := range msgSet { + for i, msg := range msgSet.msgs { if msg == nil { continue } @@ -816,7 +830,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { if msg.Key != nil { if key, err = msg.Key.Encode(); err != nil { ps.parent.returnError(msg, err) - msgSet[i] = nil + msgSet.msgs[i] = nil continue } } @@ -824,7 +838,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { if msg.Value != nil { if val, err = msg.Value.Encode(); err != nil { ps.parent.returnError(msg, err) - msgSet[i] = nil + msgSet.msgs[i] = nil continue } } @@ -867,8 +881,8 @@ func (ps *produceSet) buildRequest() *ProduceRequest { func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs []*ProducerMessage)) { for topic, partitionSet := range ps.msgs { - for partition, msgs := range partitionSet { - cb(topic, partition, msgs) + for partition, set := range partitionSet { + cb(topic, partition, set.msgs) } } } From 72a1f9eea1ad31b68a8a1b51160050d2e44d5f58 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 21:41:41 -0400 Subject: [PATCH 14/26] Construct message-sets as we go This moves key and value encoding earlier in the process without the need for the old keyCache/valueCache. --- async_producer.go | 83 ++++++++++++++++-------------------------- async_producer_test.go | 4 +- 2 files changed, 34 insertions(+), 53 deletions(-) diff --git a/async_producer.go b/async_producer.go index 50f4185a6..ac4148545 100644 --- a/async_producer.go +++ b/async_producer.go @@ -794,6 +794,23 @@ func newProduceSet(parent *asyncProducer) *produceSet { } func (ps *produceSet) add(msg *ProducerMessage) { + var err error + var key, val []byte + + if msg.Key != nil { + if key, err = msg.Key.Encode(); err != nil { + ps.parent.returnError(msg, err) + return + } + } + + if msg.Value != nil { + if val, err = msg.Value.Encode(); err != nil { + ps.parent.returnError(msg, err) + return + } + } + partitions := ps.msgs[msg.Topic] if partitions == nil { partitions = make(map[int32]*partitionSet) @@ -807,63 +824,30 @@ func (ps *produceSet) add(msg *ProducerMessage) { } set.msgs = append(set.msgs, msg) - set.bufferBytes += msg.byteSize() - ps.bufferBytes += msg.byteSize() + set.setToSend.addMessage(&Message{Codec: CompressionNone, Key: key, Value: val}) + + size := producerMessageOverhead + len(key) + len(val) + set.bufferBytes += size + ps.bufferBytes += size ps.bufferCount++ } func (ps *produceSet) buildRequest() *ProduceRequest { + if ps.empty() { + return nil + } - req := &ProduceRequest{RequiredAcks: ps.parent.conf.Producer.RequiredAcks, Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond)} - empty := true + req := &ProduceRequest{ + RequiredAcks: ps.parent.conf.Producer.RequiredAcks, + Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond), + } for topic, partitionSet := range ps.msgs { - for partition, msgSet := range partitionSet { - setToSend := new(MessageSet) - setSize := 0 - for i, msg := range msgSet.msgs { - if msg == nil { - continue - } - var err error - var key, val []byte - if msg.Key != nil { - if key, err = msg.Key.Encode(); err != nil { - ps.parent.returnError(msg, err) - msgSet.msgs[i] = nil - continue - } - } - - if msg.Value != nil { - if val, err = msg.Value.Encode(); err != nil { - ps.parent.returnError(msg, err) - msgSet.msgs[i] = nil - continue - } - } - if ps.parent.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > ps.parent.conf.Producer.MaxMessageBytes { - // compression causes message-sets to be wrapped as single messages, which have tighter - // size requirements, so we have to respect those limits - valBytes, err := encode(setToSend) - if err != nil { - Logger.Println(err) // if this happens, it's basically our fault. - panic(err) - } - req.AddMessage(topic, partition, &Message{Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: valBytes}) - setToSend = new(MessageSet) - setSize = 0 - } - setSize += msg.byteSize() - - setToSend.addMessage(&Message{Codec: CompressionNone, Key: key, Value: val}) - empty = false - } - + for partition, set := range partitionSet { if ps.parent.conf.Producer.Compression == CompressionNone { - req.AddSet(topic, partition, setToSend) + req.AddSet(topic, partition, set.setToSend) } else { - valBytes, err := encode(setToSend) + valBytes, err := encode(set.setToSend) if err != nil { Logger.Println(err) // if this happens, it's basically our fault. panic(err) @@ -873,9 +857,6 @@ func (ps *produceSet) buildRequest() *ProduceRequest { } } - if empty { - return nil - } return req } diff --git a/async_producer_test.go b/async_producer_test.go index 7306256aa..9aa13da53 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -320,7 +320,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { leader.Returns(prodSuccess) config := NewConfig() - config.Producer.Flush.Messages = 3 + config.Producer.Flush.Messages = 1 config.Producer.Return.Successes = true config.Producer.Partitioner = NewManualPartitioner producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) @@ -330,8 +330,8 @@ func TestAsyncProducerEncoderFailures(t *testing.T) { for flush := 0; flush < 3; flush++ { producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)} - producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)} producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)} + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)} expectResults(t, producer, 1, 2) } From 4f2a39c4dfce509e4f4ae56bc61c1adc205dd13f Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 21:43:53 -0400 Subject: [PATCH 15/26] Remove useless flush trigger State management is now sane, we don't have to flush just because we see a chaser message. --- async_producer.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/async_producer.go b/async_producer.go index ac4148545..846fe7acb 100644 --- a/async_producer.go +++ b/async_producer.go @@ -889,9 +889,6 @@ func (ps *produceSet) readyToFlush(msg *ProducerMessage) bool { // If all three config values are 0, we always flush as-fast-as-possible case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0: return true - // If the messages is a chaser we must flush to maintain the state-machine - case msg.flags&chaser == chaser: - return true // If we've passed the message trigger-point case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages: return true From 54716f8ba5d0b2e30c97dcc689ddf04af51002b1 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 21:46:40 -0400 Subject: [PATCH 16/26] Don't set the timer if we couldn't add the message --- async_producer.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/async_producer.go b/async_producer.go index 846fe7acb..d550c7a6c 100644 --- a/async_producer.go +++ b/async_producer.go @@ -580,7 +580,10 @@ func (bp *brokerProducer) run() { output = nil } - bp.buffer.add(msg) + if err := bp.buffer.add(msg); err != nil { + bp.parent.returnError(msg, err) + continue + } if bp.buffer.readyToFlush(msg) { output = bp.output @@ -793,21 +796,19 @@ func newProduceSet(parent *asyncProducer) *produceSet { } } -func (ps *produceSet) add(msg *ProducerMessage) { +func (ps *produceSet) add(msg *ProducerMessage) error { var err error var key, val []byte if msg.Key != nil { if key, err = msg.Key.Encode(); err != nil { - ps.parent.returnError(msg, err) - return + return err } } if msg.Value != nil { if val, err = msg.Value.Encode(); err != nil { - ps.parent.returnError(msg, err) - return + return err } } @@ -830,6 +831,8 @@ func (ps *produceSet) add(msg *ProducerMessage) { set.bufferBytes += size ps.bufferBytes += size ps.bufferCount++ + + return nil } func (ps *produceSet) buildRequest() *ProduceRequest { From 02e6f7c03e2d81e42ed322c007e490519a587d3c Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 21:49:09 -0400 Subject: [PATCH 17/26] Enable per-partition compressed size limits --- async_producer.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/async_producer.go b/async_producer.go index d550c7a6c..0faff1735 100644 --- a/async_producer.go +++ b/async_producer.go @@ -876,8 +876,10 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)): return true - // Would we overflow the size-limit of a compressed message-batch? - case ps.parent.conf.Producer.Compression != CompressionNone && ps.bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes: + // Would we overflow the size-limit of a compressed message-batch for this partition? + case ps.parent.conf.Producer.Compression != CompressionNone && + ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && + ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: From 50a7916464dba2e7ebb5c0dbc5f6f0a0a567fab8 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 21:51:46 -0400 Subject: [PATCH 18/26] Nil checks are now unnecessary --- async_producer.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/async_producer.go b/async_producer.go index 0faff1735..1fd16136d 100644 --- a/async_producer.go +++ b/async_producer.go @@ -944,17 +944,12 @@ func (p *asyncProducer) returnError(msg *ProducerMessage, err error) { func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) { for _, msg := range batch { - if msg != nil { - p.returnError(msg, err) - } + p.returnError(msg, err) } } func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { for _, msg := range batch { - if msg == nil { - continue - } if p.conf.Producer.Return.Successes { msg.clear() p.successes <- msg @@ -965,9 +960,6 @@ func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) { for _, msg := range batch { - if msg == nil { - continue - } if msg.retries >= p.conf.Producer.Retry.Max { p.returnError(msg, err) } else { From ac14315ffabcc9d83a4aaa7f67e378eda5bb42ca Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 21:54:22 -0400 Subject: [PATCH 19/26] Remove more dead code We never build the request now unless we have messages, which means the request can never be nil, which means the input into the errors channel can never be nil. --- async_producer.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/async_producer.go b/async_producer.go index 1fd16136d..3a91b7f68 100644 --- a/async_producer.go +++ b/async_producer.go @@ -702,8 +702,6 @@ func (bp *brokerProducer) handleResponse(response *ProduceResponse) { func (bp *brokerProducer) handleError(err error) { switch err.(type) { - case nil: - break case PacketEncodingError: bp.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { bp.parent.returnErrors(msgs, err) @@ -731,10 +729,6 @@ type flusher struct { func (f *flusher) run() { for set := range f.input { request := set.buildRequest() - if request == nil { - f.errors <- nil - continue - } response, err := f.broker.Produce(request) @@ -836,10 +830,6 @@ func (ps *produceSet) add(msg *ProducerMessage) error { } func (ps *produceSet) buildRequest() *ProduceRequest { - if ps.empty() { - return nil - } - req := &ProduceRequest{ RequiredAcks: ps.parent.conf.Producer.RequiredAcks, Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond), From 3a54e972f6615d8bcfdfbddaf3b13318fb2ae999 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 21:56:08 -0400 Subject: [PATCH 20/26] Remove dead parameter --- async_producer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async_producer.go b/async_producer.go index 3a91b7f68..6d97ddd97 100644 --- a/async_producer.go +++ b/async_producer.go @@ -585,7 +585,7 @@ func (bp *brokerProducer) run() { continue } - if bp.buffer.readyToFlush(msg) { + if bp.buffer.readyToFlush() { output = bp.output } else if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency) @@ -879,7 +879,7 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { } } -func (ps *produceSet) readyToFlush(msg *ProducerMessage) bool { +func (ps *produceSet) readyToFlush() bool { switch { // If all three config values are 0, we always flush as-fast-as-possible case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0: From 8a9748bae28d3b7cd1f5ff0b26f5640eeadc3806 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sat, 26 Sep 2015 22:04:54 -0400 Subject: [PATCH 21/26] Misc. fixes and cleanup --- async_producer.go | 58 +++++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/async_producer.go b/async_producer.go index 6d97ddd97..5ce52a071 100644 --- a/async_producer.go +++ b/async_producer.go @@ -570,7 +570,7 @@ func (bp *brokerProducer) run() { } if reason := bp.retryReason(msg); reason != nil { - bp.parent.retryMessages([]*ProducerMessage{msg}, reason) + bp.parent.retryMessage(msg, reason) continue } @@ -620,26 +620,27 @@ func (bp *brokerProducer) retryReason(msg *ProducerMessage) error { if bp.currentRetries[msg.Topic] != nil { err := bp.currentRetries[msg.Topic][msg.Partition] if err != nil && msg.flags&chaser == chaser { - // we're currently retrying this partition so we need to filter out this message - // but now we can start processing future messages again + // we were retrying this partition but we can start processing again Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n", bp.broker.ID(), msg.Topic, msg.Partition) delete(bp.currentRetries[msg.Topic], msg.Partition) } return err } + return nil } func (bp *brokerProducer) wait() { - if bp.pending != nil { - select { - case response := <-bp.responses: - bp.handleResponse(response) - case err := <-bp.errors: - bp.handleError(err) - } + if bp.pending == nil { + return + } + select { + case response := <-bp.responses: + bp.handleResponse(response) + case err := <-bp.errors: + bp.handleError(err) } } @@ -674,12 +675,8 @@ func (bp *brokerProducer) handleResponse(response *ProduceResponse) { switch block.Err { // Success case ErrNoError: - i := 0 - for _, msg := range msgs { - if msg != nil { - msg.Offset = block.Offset + int64(i) - i++ - } + for i, msg := range msgs { + msg.Offset = block.Offset + int64(i) } bp.parent.returnSuccesses(msgs) // Retriable errors @@ -692,11 +689,12 @@ func (bp *brokerProducer) handleResponse(response *ProduceResponse) { } bp.currentRetries[topic][partition] = block.Err bp.parent.retryMessages(msgs, block.Err) - // Other non-retriable errors + // Other non-retriable errors default: bp.parent.returnErrors(msgs, block.Err) } }) + bp.pending = nil } @@ -715,6 +713,8 @@ func (bp *brokerProducer) handleError(err error) { bp.parent.retryMessages(msgs, err) }) } + + bp.pending = nil } // takes a set at a time from the brokerProducer and sends to the broker @@ -776,8 +776,8 @@ type partitionSet struct { } type produceSet struct { - msgs map[string]map[int32]*partitionSet parent *asyncProducer + msgs map[string]map[int32]*partitionSet bufferBytes int bufferCount int @@ -845,7 +845,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest { Logger.Println(err) // if this happens, it's basically our fault. panic(err) } - req.AddMessage(topic, partition, &Message{Codec: ps.parent.conf.Producer.Compression, Key: nil, Value: valBytes}) + req.AddMessage(topic, partition, &Message{ + Codec: ps.parent.conf.Producer.Compression, + Key: nil, + Value: valBytes, + }) } } } @@ -948,14 +952,18 @@ func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { } } +func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) { + if msg.retries >= p.conf.Producer.Retry.Max { + p.returnError(msg, err) + } else { + msg.retries++ + p.retries <- msg + } +} + func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) { for _, msg := range batch { - if msg.retries >= p.conf.Producer.Retry.Max { - p.returnError(msg, err) - } else { - msg.retries++ - p.retries <- msg - } + p.retryMessage(msg, err) } } From 7d7bf29d0ae05182321bfb517b106c792b251a62 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sun, 27 Sep 2015 09:11:10 -0400 Subject: [PATCH 22/26] Comment to explain kafka's weird compression handling --- async_producer.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/async_producer.go b/async_producer.go index 5ce52a071..17a035cfc 100644 --- a/async_producer.go +++ b/async_producer.go @@ -840,7 +840,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest { if ps.parent.conf.Producer.Compression == CompressionNone { req.AddSet(topic, partition, set.setToSend) } else { - valBytes, err := encode(set.setToSend) + // When compression is enabled, the entire set for each partition is compressed + // and sent as the payload of a single fake "message" with the appropriate codec + // set and no key. When the server sees a message with a compression codec, it + // decompresses the payload and treats the result as its message set. + payload, err := encode(set.setToSend) if err != nil { Logger.Println(err) // if this happens, it's basically our fault. panic(err) @@ -848,7 +852,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest { req.AddMessage(topic, partition, &Message{ Codec: ps.parent.conf.Producer.Compression, Key: nil, - Value: valBytes, + Value: payload, }) } } From 5363d82a9687675466643e06962aaeafd383704f Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sun, 27 Sep 2015 09:25:20 -0400 Subject: [PATCH 23/26] Get rid of `pending` in the brokerProducer That was a bad pattern because it could easily have gotten out-of-sync with what was actually sent to the flusher due to subtle bugs. Instead, have the flusher pass back the set it sent alongside the response. This forces us to use a struct, which lets us combine the `responses` and `errors` channels too. --- async_producer.go | 92 +++++++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 47 deletions(-) diff --git a/async_producer.go b/async_producer.go index 17a035cfc..720230ffa 100644 --- a/async_producer.go +++ b/async_producer.go @@ -507,13 +507,12 @@ func (pp *partitionProducer) updateLeader() error { }) } -// one per broker, constructs both an aggregator and a flusher +// one per broker; also constructs an associated flusher func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage { var ( input = make(chan *ProducerMessage) bridge = make(chan *produceSet) - responses = make(chan *ProduceResponse) - errors = make(chan error) + responses = make(chan *brokerProducerResponse) ) a := &brokerProducer{ @@ -522,7 +521,6 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag input: input, output: bridge, responses: responses, - errors: errors, buffer: newProduceSet(p), currentRetries: make(map[string]map[int32]error), } @@ -533,26 +531,30 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag broker: broker, input: bridge, responses: responses, - errors: errors, } go withRecover(f.run) return input } +type brokerProducerResponse struct { + set *produceSet + err error + res *ProduceResponse +} + // groups messages together into appropriately-sized batches for sending to the broker +// handles state related to retries etc type brokerProducer struct { parent *asyncProducer broker *Broker - input <-chan *ProducerMessage + input <-chan *ProducerMessage output chan<- *produceSet - responses <-chan *ProduceResponse - errors <-chan error + responses <-chan *brokerProducerResponse - buffer *produceSet - pending *produceSet - timer <-chan time.Time + buffer *produceSet + timer <-chan time.Time closing error currentRetries map[string]map[int32]error @@ -597,8 +599,6 @@ func (bp *brokerProducer) run() { output = nil case response := <-bp.responses: bp.handleResponse(response) - case err := <-bp.errors: - bp.handleError(err) } } @@ -606,8 +606,10 @@ shutdown: if !bp.buffer.empty() { bp.flush() } - bp.wait() close(bp.output) + for response := range bp.responses { + bp.handleResponse(response) + } Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } @@ -631,35 +633,35 @@ func (bp *brokerProducer) retryReason(msg *ProducerMessage) error { return nil } -func (bp *brokerProducer) wait() { - if bp.pending == nil { - return - } - - select { - case response := <-bp.responses: - bp.handleResponse(response) - case err := <-bp.errors: - bp.handleError(err) - } -} - func (bp *brokerProducer) flush() { - bp.wait() - bp.output <- bp.buffer - bp.rollOver() + for { + select { + case response := <-bp.responses: + bp.handleResponse(response) + case bp.output <- bp.buffer: + bp.rollOver() + return + } + } } func (bp *brokerProducer) rollOver() { bp.timer = nil - bp.pending = bp.buffer bp.buffer = newProduceSet(bp.parent) } -func (bp *brokerProducer) handleResponse(response *ProduceResponse) { +func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) { + if response.err != nil { + bp.handleError(response.set, response.err) + } else { + bp.handleSuccess(response.set, response.res) + } +} + +func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) { // we iterate through the blocks in the request set, not the response, so that we notice // if the response is missing a block completely - bp.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { if response == nil { // this only happens when RequiredAcks is NoResponse, so we have to assume success bp.parent.returnSuccesses(msgs) @@ -694,14 +696,12 @@ func (bp *brokerProducer) handleResponse(response *ProduceResponse) { bp.parent.returnErrors(msgs, block.Err) } }) - - bp.pending = nil } -func (bp *brokerProducer) handleError(err error) { +func (bp *brokerProducer) handleError(sent *produceSet, err error) { switch err.(type) { case PacketEncodingError: - bp.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { bp.parent.returnErrors(msgs, err) }) default: @@ -709,21 +709,18 @@ func (bp *brokerProducer) handleError(err error) { bp.parent.abandonBrokerConnection(bp.broker) _ = bp.broker.Close() bp.closing = err - bp.pending.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { bp.parent.retryMessages(msgs, err) }) } - - bp.pending = nil } -// takes a set at a time from the brokerProducer and sends to the broker +// very minimal, takes a set at a time from the brokerProducer and sends to the broker type flusher struct { parent *asyncProducer broker *Broker input <-chan *produceSet - responses chan<- *ProduceResponse - errors chan<- error + responses chan<- *brokerProducerResponse } func (f *flusher) run() { @@ -732,12 +729,13 @@ func (f *flusher) run() { response, err := f.broker.Produce(request) - if err != nil { - f.errors <- err - } else { - f.responses <- response + f.responses <- &brokerProducerResponse{ + set: set, + err: err, + res: response, } } + close(f.responses) } // singleton From 6516b87910c0a8b53b9bdd8b59501688b45654ea Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sun, 27 Sep 2015 09:59:45 -0400 Subject: [PATCH 24/26] This refactor just got a lot harder to swallow The problem is that receiving a response can change the retry state of a partition (or the entire broker). This potentially requires us to retry any messages we'd had buffered for that partition, which changes the size of the produceSet we'd accumulated, potentially even making it empty again. Things are complicated further by the fact that we must be prepared to handle responses while forcing blocking flushes, midway through the message processing. This does a quick fix, but I'm not really happy with it: - Flush the buffer in the appropriate response/error cases. - Move the blocking flush to the beginning of the message processing so any state changes it causes are picked up correctly for that message. - When we handle a response which leaves our buffer empty, don't try and send it. This isn't perfect in that a response which shrinks our buffer below the flush threshold may still end up getting flushed, but at least we won't flush a buffer that's entirely empty. --- async_producer.go | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/async_producer.go b/async_producer.go index 720230ffa..247796da6 100644 --- a/async_producer.go +++ b/async_producer.go @@ -571,17 +571,17 @@ func (bp *brokerProducer) run() { goto shutdown } - if reason := bp.retryReason(msg); reason != nil { - bp.parent.retryMessage(msg, reason) - continue - } - if bp.buffer.wouldOverflow(msg) { Logger.Printf("producer/broker/%d maximum request accumulated, forcing blocking flush\n", bp.broker.ID()) bp.flush() output = nil } + if reason := bp.retryReason(msg); reason != nil { + bp.parent.retryMessage(msg, reason) + continue + } + if err := bp.buffer.add(msg); err != nil { bp.parent.returnError(msg, err) continue @@ -599,6 +599,11 @@ func (bp *brokerProducer) run() { output = nil case response := <-bp.responses: bp.handleResponse(response) + if bp.buffer.empty() { + // this can happen if the response was an error + output = nil + bp.timer = nil + } } } @@ -638,6 +643,9 @@ func (bp *brokerProducer) flush() { select { case response := <-bp.responses: bp.handleResponse(response) + if bp.buffer.empty() { + return // this can happen if the response was an error + } case bp.output <- bp.buffer: bp.rollOver() return @@ -691,6 +699,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo } bp.currentRetries[topic][partition] = block.Err bp.parent.retryMessages(msgs, block.Err) + bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err) // Other non-retriable errors default: bp.parent.returnErrors(msgs, block.Err) @@ -712,6 +721,10 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { bp.parent.retryMessages(msgs, err) }) + bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) { + bp.parent.retryMessages(msgs, err) + }) + bp.rollOver() } } @@ -867,6 +880,20 @@ func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs } } +func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage { + if ps.msgs[topic] == nil { + return nil + } + set := ps.msgs[topic][partition] + if set == nil { + return nil + } + ps.bufferBytes -= set.bufferBytes + ps.bufferCount -= len(set.msgs) + delete(ps.msgs[topic], partition) + return set.msgs +} + func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { switch { // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. From 5fc13db29166455b2cdf283c8d96e0451a95413d Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sun, 27 Sep 2015 11:14:32 -0400 Subject: [PATCH 25/26] Refactor brokerProducer again Turn the "if wouldOverflow then flush" logic into a more complete `waitForSpace` method which handles all of that logic including re-checking error states and buffer sizes as necessary. Extract the "when should we be setting `output`" logic into the end of every single iteration. This requires tracking an additional `timerFired` instance variable on the `brokerProducer`, but is much easier to follow and allows us to be more precise when e.g. a response flushes part but not all of our buffer. Extract the "if this is a chaser, change state" logic out of `retryReason` and rename it to `needsRetry`. This makes it a pure function and avoids unnecessary work in `waitForSpace`. --- async_producer.go | 91 ++++++++++++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/async_producer.go b/async_producer.go index 247796da6..017de5d75 100644 --- a/async_producer.go +++ b/async_producer.go @@ -553,8 +553,9 @@ type brokerProducer struct { output chan<- *produceSet responses <-chan *brokerProducerResponse - buffer *produceSet - timer <-chan time.Time + buffer *produceSet + timer <-chan time.Time + timerFired bool closing error currentRetries map[string]map[int32]error @@ -571,45 +572,57 @@ func (bp *brokerProducer) run() { goto shutdown } - if bp.buffer.wouldOverflow(msg) { - Logger.Printf("producer/broker/%d maximum request accumulated, forcing blocking flush\n", bp.broker.ID()) - bp.flush() - output = nil - } - - if reason := bp.retryReason(msg); reason != nil { + if reason := bp.needsRetry(msg); reason != nil { bp.parent.retryMessage(msg, reason) + + if bp.closing == nil && msg.flags&chaser == chaser { + // we were retrying this partition but we can start processing again + delete(bp.currentRetries[msg.Topic], msg.Partition) + Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n", + bp.broker.ID(), msg.Topic, msg.Partition) + } + continue } + if bp.buffer.wouldOverflow(msg) { + if err := bp.waitForSpace(msg); err != nil { + bp.parent.retryMessage(msg, err) + continue + } + } + if err := bp.buffer.add(msg); err != nil { bp.parent.returnError(msg, err) continue } - if bp.buffer.readyToFlush() { - output = bp.output - } else if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { + if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil { bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency) } case <-bp.timer: - output = bp.output + bp.timerFired = true case output <- bp.buffer: bp.rollOver() - output = nil case response := <-bp.responses: bp.handleResponse(response) - if bp.buffer.empty() { - // this can happen if the response was an error - output = nil - bp.timer = nil - } + } + + if bp.timerFired || bp.buffer.readyToFlush() { + output = bp.output + } else { + output = nil } } shutdown: - if !bp.buffer.empty() { - bp.flush() + for !bp.buffer.empty() { + select { + case response := <-bp.responses: + bp.handleResponse(response) + case bp.output <- bp.buffer: + bp.rollOver() + } } close(bp.output) for response := range bp.responses { @@ -619,42 +632,41 @@ shutdown: Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID()) } -func (bp *brokerProducer) retryReason(msg *ProducerMessage) error { +func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error { if bp.closing != nil { return bp.closing } - if bp.currentRetries[msg.Topic] != nil { - err := bp.currentRetries[msg.Topic][msg.Partition] - if err != nil && msg.flags&chaser == chaser { - // we were retrying this partition but we can start processing again - Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n", - bp.broker.ID(), msg.Topic, msg.Partition) - delete(bp.currentRetries[msg.Topic], msg.Partition) - } - return err + if bp.currentRetries[msg.Topic] == nil { + return nil } - return nil + return bp.currentRetries[msg.Topic][msg.Partition] } -func (bp *brokerProducer) flush() { +func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error { + Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID()) + for { select { case response := <-bp.responses: bp.handleResponse(response) - if bp.buffer.empty() { - return // this can happen if the response was an error + // handling a response can change our state, so re-check some things + if reason := bp.needsRetry(msg); reason != nil { + return reason + } else if !bp.buffer.wouldOverflow(msg) { + return nil } case bp.output <- bp.buffer: bp.rollOver() - return + return nil } } } func (bp *brokerProducer) rollOver() { bp.timer = nil + bp.timerFired = false bp.buffer = newProduceSet(bp.parent) } @@ -664,6 +676,10 @@ func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) { } else { bp.handleSuccess(response.set, response.res) } + + if bp.buffer.empty() { + bp.rollOver() // this can happen if the response invalidated our buffer + } } func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) { @@ -914,6 +930,9 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { func (ps *produceSet) readyToFlush() bool { switch { + // If we don't have any messages, nothing else matters + case ps.empty(): + return false // If all three config values are 0, we always flush as-fast-as-possible case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0: return true From 78bec97cb4e05894b4ef8f66c5434243fa301b45 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Sun, 27 Sep 2015 15:31:07 -0400 Subject: [PATCH 26/26] Destructure the flusher, it does nothing complex --- async_producer.go | 45 +++++++++++++++------------------------------ 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/async_producer.go b/async_producer.go index 017de5d75..0aef9b2dd 100644 --- a/async_producer.go +++ b/async_producer.go @@ -526,13 +526,21 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag } go withRecover(a.run) - f := &flusher{ - parent: p, - broker: broker, - input: bridge, - responses: responses, - } - go withRecover(f.run) + // minimal bridge to make the network response `select`able + go withRecover(func() { + for set := range bridge { + request := set.buildRequest() + + response, err := broker.Produce(request) + + responses <- &brokerProducerResponse{ + set: set, + err: err, + res: response, + } + } + close(responses) + }) return input } @@ -744,29 +752,6 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) { } } -// very minimal, takes a set at a time from the brokerProducer and sends to the broker -type flusher struct { - parent *asyncProducer - broker *Broker - input <-chan *produceSet - responses chan<- *brokerProducerResponse -} - -func (f *flusher) run() { - for set := range f.input { - request := set.buildRequest() - - response, err := f.broker.Produce(request) - - f.responses <- &brokerProducerResponse{ - set: set, - err: err, - res: response, - } - } - close(f.responses) -} - // singleton // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel