From 071d28e68413fd35a710f5e05056a7031e29e954 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 7 Jul 2015 16:31:36 -0400 Subject: [PATCH] Refactor producer goroutines somewhat - rename topicDispatcher to dispatcher - make a topicProducer struct to handle the old partitionDispatcher - make a partitionProducer struct to handle the old leaderDispatcher - move and rename methods for a much more readable structure Part 1 of #300. --- async_producer.go | 338 ++++++++++++++++++++++++++-------------------- 1 file changed, 188 insertions(+), 150 deletions(-) diff --git a/async_producer.go b/async_producer.go index 770e9f1be..b6e3eef0f 100644 --- a/async_producer.go +++ b/async_producer.go @@ -96,7 +96,7 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) { } // launch our singleton dispatchers - go withRecover(p.topicDispatcher) + go withRecover(p.dispatcher) go withRecover(p.retryHandler) return p, nil @@ -200,16 +200,9 @@ func (p *asyncProducer) AsyncClose() { go withRecover(p.shutdown) } -/////////////////////////////////////////// -// In normal processing, a message flows through the following functions from top to bottom, -// starting at topicDispatcher (which reads from Producer.input) and ending in flusher -// (which sends the message to the broker). In cases where a message must be retried, it goes -// through retryHandler before being returned to the top of the flow. -/////////////////////////////////////////// - // singleton // dispatches messages by topic -func (p *asyncProducer) topicDispatcher() { +func (p *asyncProducer) dispatcher() { handlers := make(map[string]chan *ProducerMessage) shuttingDown := false @@ -247,10 +240,8 @@ func (p *asyncProducer) topicDispatcher() { handler := handlers[msg.Topic] if handler == nil { - newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize) - topic := msg.Topic // block local because go's closure semantics suck - go withRecover(func() { p.partitionDispatcher(topic, newHandler) }) - handler = newHandler + handler = make(chan *ProducerMessage, p.conf.ChannelBufferSize) + p.newTopicProducer(msg.Topic, handler) handlers[msg.Topic] = handler } @@ -264,133 +255,158 @@ func (p *asyncProducer) topicDispatcher() { // one per topic // partitions messages, then dispatches them by partition -func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *ProducerMessage) { - handlers := make(map[int32]chan *ProducerMessage) - partitioner := p.conf.Producer.Partitioner(topic) - breaker := breaker.New(3, 1, 10*time.Second) +type topicProducer struct { + parent *asyncProducer + topic string + input <-chan *ProducerMessage + + breaker *breaker.Breaker + handlers map[int32]chan *ProducerMessage + partitioner Partitioner +} + +func (p *asyncProducer) newTopicProducer(topic string, input <-chan *ProducerMessage) *topicProducer { + tp := &topicProducer{ + parent: p, + topic: topic, + input: input, + breaker: breaker.New(3, 1, 10*time.Second), + handlers: make(map[int32]chan *ProducerMessage), + partitioner: p.conf.Producer.Partitioner(topic), + } + go withRecover(tp.dispatch) + return tp +} - for msg := range input { +func (tp *topicProducer) dispatch() { + for msg := range tp.input { if msg.retries == 0 { - if err := p.assignPartition(breaker, partitioner, msg); err != nil { - p.returnError(msg, err) + if err := tp.partitionMessage(msg); err != nil { + tp.parent.returnError(msg, err) continue } } - handler := handlers[msg.Partition] + handler := tp.handlers[msg.Partition] if handler == nil { - newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize) - topic := msg.Topic // block local because go's closure semantics suck - partition := msg.Partition // block local because go's closure semantics suck - go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) }) - handler = newHandler - handlers[msg.Partition] = handler + handler = make(chan *ProducerMessage, tp.parent.conf.ChannelBufferSize) + tp.parent.newPartitionProducer(msg.Topic, msg.Partition, handler) + tp.handlers[msg.Partition] = handler } handler <- msg } - for _, handler := range handlers { + for _, handler := range tp.handlers { close(handler) } } -// one per partition per topic -// dispatches messages to the appropriate broker -// also responsible for maintaining message order during retries -func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input <-chan *ProducerMessage) { - var leader *Broker - var output chan *ProducerMessage +func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error { + var partitions []int32 - breaker := breaker.New(3, 1, 10*time.Second) - doUpdate := func() (err error) { - if err = p.client.RefreshMetadata(topic); err != nil { - return err + err := tp.breaker.Run(func() (err error) { + if tp.partitioner.RequiresConsistency() { + partitions, err = tp.parent.client.Partitions(msg.Topic) + } else { + partitions, err = tp.parent.client.WritablePartitions(msg.Topic) } + return + }) - if leader, err = p.client.Leader(topic, partition); err != nil { - return err - } + if err != nil { + return err + } - output = p.getBrokerProducer(leader) - return nil + numPartitions := int32(len(partitions)) + + if numPartitions == 0 { + return ErrLeaderNotAvailable } - // try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch - // on the first message - leader, _ = p.client.Leader(topic, partition) - if leader != nil { - output = p.getBrokerProducer(leader) + choice, err := tp.partitioner.Partition(msg, numPartitions) + + if err != nil { + return err + } else if choice < 0 || choice >= numPartitions { + return ErrInvalidPartition } + msg.Partition = partitions[choice] + + return nil +} + +// one per partition per topic +// dispatches messages to the appropriate broker +// also responsible for maintaining message order during retries +type partitionProducer struct { + parent *asyncProducer + topic string + partition int32 + input <-chan *ProducerMessage + + leader *Broker + breaker *breaker.Breaker + output chan *ProducerMessage + // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through, // all other messages get buffered in retryState[msg.retries].buf to preserve ordering // retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and // therefore whether our buffer is complete and safe to flush) - highWatermark := 0 - retryState := make([]struct { - buf []*ProducerMessage - expectChaser bool - }, p.conf.Producer.Retry.Max+1) - - for msg := range input { - if msg.retries > highWatermark { - // new, higher, retry level; send off a chaser so that we know when everything "in between" has made it - // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages) - highWatermark = msg.retries - Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark) - retryState[msg.retries].expectChaser = true - p.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight - output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1} - Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", topic, partition, leader.ID()) - p.unrefBrokerProducer(leader, output) - output = nil - time.Sleep(p.conf.Producer.Retry.Backoff) - } else if highWatermark > 0 { + highWatermark int + retryState []partitionRetryState +} + +type partitionRetryState struct { + buf []*ProducerMessage + expectChaser bool +} + +func (p *asyncProducer) newPartitionProducer(topic string, partition int32, input <-chan *ProducerMessage) *partitionProducer { + pp := &partitionProducer{ + parent: p, + topic: topic, + partition: partition, + input: input, + + breaker: breaker.New(3, 1, 10*time.Second), + retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1), + } + go withRecover(pp.dispatch) + return pp +} + +func (pp *partitionProducer) dispatch() { + // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader` + // on the first message + pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition) + if pp.leader != nil { + pp.output = pp.parent.getBrokerProducer(pp.leader) + } + + for msg := range pp.input { + if msg.retries > pp.highWatermark { + // a new, higher, retry level; handle it and then back off + pp.newHighWatermark(msg.retries) + time.Sleep(pp.parent.conf.Producer.Retry.Backoff) + } else if pp.highWatermark > 0 { // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level - if msg.retries < highWatermark { + if msg.retries < pp.highWatermark { // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser) if msg.flags&chaser == chaser { - retryState[msg.retries].expectChaser = false - p.inFlight.Done() // this chaser is now handled and will be garbage collected + pp.retryState[msg.retries].expectChaser = false + pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected } else { - retryState[msg.retries].buf = append(retryState[msg.retries].buf, msg) + pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg) } continue } else if msg.flags&chaser == chaser { // this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set, // meaning this retry level is done and we can go down (at least) one level and flush that - retryState[highWatermark].expectChaser = false - Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", topic, partition, highWatermark) - for { - highWatermark-- - - if output == nil { - if err := breaker.Run(doUpdate); err != nil { - p.returnErrors(retryState[highWatermark].buf, err) - goto flushDone - } - Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID()) - } - - for _, msg := range retryState[highWatermark].buf { - output <- msg - } - - flushDone: - retryState[highWatermark].buf = nil - if retryState[highWatermark].expectChaser { - Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark) - break - } else { - if highWatermark == 0 { - Logger.Printf("producer/leader/%s/%d state change to [normal]\n", topic, partition) - break - } - } - - } - p.inFlight.Done() // this chaser is now handled and will be garbage collected + pp.retryState[pp.highWatermark].expectChaser = false + pp.flushRetryBuffers() + pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected continue } } @@ -398,23 +414,83 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input <- // if we made it this far then the current msg contains real data, and can be sent to the next goroutine // without breaking any of our ordering guarantees - if output == nil { - if err := breaker.Run(doUpdate); err != nil { - p.returnError(msg, err) - time.Sleep(p.conf.Producer.Retry.Backoff) + if pp.output == nil { + if err := pp.updateLeader(); err != nil { + pp.parent.returnError(msg, err) + time.Sleep(pp.parent.conf.Producer.Retry.Backoff) continue } - Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID()) + Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) } - output <- msg + pp.output <- msg } - if output != nil { - p.unrefBrokerProducer(leader, output) + if pp.output != nil { + pp.parent.unrefBrokerProducer(pp.leader, pp.output) } } +func (pp *partitionProducer) newHighWatermark(hwm int) { + Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm) + pp.highWatermark = hwm + + // send off a chaser so that we know when everything "in between" has made it + // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages) + pp.retryState[pp.highWatermark].expectChaser = true + pp.parent.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight + pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: chaser, retries: pp.highWatermark - 1} + + // a new HWM means that our current broker selection is out of date + Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID()) + pp.parent.unrefBrokerProducer(pp.leader, pp.output) + pp.output = nil +} + +func (pp *partitionProducer) flushRetryBuffers() { + Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark) + for { + pp.highWatermark-- + + if pp.output == nil { + if err := pp.updateLeader(); err != nil { + pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err) + goto flushDone + } + Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID()) + } + + for _, msg := range pp.retryState[pp.highWatermark].buf { + pp.output <- msg + } + + flushDone: + pp.retryState[pp.highWatermark].buf = nil + if pp.retryState[pp.highWatermark].expectChaser { + Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark) + break + } else if pp.highWatermark == 0 { + Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition) + break + } + } +} + +func (pp *partitionProducer) updateLeader() error { + return pp.breaker.Run(func() (err error) { + if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil { + return err + } + + if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil { + return err + } + + pp.output = pp.parent.getBrokerProducer(pp.leader) + return nil + }) +} + // one per broker // groups messages together into appropriately-sized batches for sending to the broker // based on https://godoc.org/github.com/eapache/channels#BatchingChannel @@ -582,7 +658,7 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) } // singleton -// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock +// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel func (p *asyncProducer) retryHandler() { var msg *ProducerMessage @@ -608,9 +684,6 @@ func (p *asyncProducer) retryHandler() { } } -/////////////////////////////////////////// -/////////////////////////////////////////// - // utility functions func (p *asyncProducer) shutdown() { @@ -633,41 +706,6 @@ func (p *asyncProducer) shutdown() { close(p.successes) } -func (p *asyncProducer) assignPartition(breaker *breaker.Breaker, partitioner Partitioner, msg *ProducerMessage) error { - var partitions []int32 - - err := breaker.Run(func() (err error) { - if partitioner.RequiresConsistency() { - partitions, err = p.client.Partitions(msg.Topic) - } else { - partitions, err = p.client.WritablePartitions(msg.Topic) - } - return - }) - - if err != nil { - return err - } - - numPartitions := int32(len(partitions)) - - if numPartitions == 0 { - return ErrLeaderNotAvailable - } - - choice, err := partitioner.Partition(msg, numPartitions) - - if err != nil { - return err - } else if choice < 0 || choice >= numPartitions { - return ErrInvalidPartition - } - - msg.Partition = partitions[choice] - - return nil -} - func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest { req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}