Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structure the aggregator and flusher goroutines #510

Merged
merged 1 commit into from
Aug 13, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 79 additions & 48 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type asyncProducer struct {
input, successes, retries chan *ProducerMessage
inFlight sync.WaitGroup

brokers map[*Broker]chan *ProducerMessage
brokerRefs map[chan *ProducerMessage]int
brokers map[*Broker]chan<- *ProducerMessage
brokerRefs map[chan<- *ProducerMessage]int
brokerLock sync.Mutex
}

Expand Down Expand Up @@ -91,8 +91,8 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]chan *ProducerMessage),
brokerRefs: make(map[chan *ProducerMessage]int),
brokers: make(map[*Broker]chan<- *ProducerMessage),
brokerRefs: make(map[chan<- *ProducerMessage]int),
}

// launch our singleton dispatchers
Expand Down Expand Up @@ -347,7 +347,7 @@ type partitionProducer struct {

leader *Broker
breaker *breaker.Breaker
output chan *ProducerMessage
output chan<- *ProducerMessage

// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
Expand Down Expand Up @@ -491,37 +491,63 @@ func (pp *partitionProducer) updateLeader() error {
})
}

func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
input := make(chan *ProducerMessage)
bridge := make(chan []*ProducerMessage)

a := &aggregator{
parent: p,
broker: broker,
input: input,
output: bridge,
}
go withRecover(a.run)

f := &flusher{
parent: p,
broker: broker,
input: bridge,
}
go withRecover(f.run)

return input
}

// one per broker
// groups messages together into appropriately-sized batches for sending to the broker
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) {
type aggregator struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- []*ProducerMessage
}

func (a *aggregator) run() {
var (
timer <-chan time.Time
buffer []*ProducerMessage
flushTriggered chan []*ProducerMessage
flushTriggered chan<- []*ProducerMessage
bytesAccumulated int
defaultFlush bool
)

if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 {
defaultFlush = true
}

output := make(chan []*ProducerMessage)
go withRecover(func() { p.flusher(broker, output) })

for {
select {
case msg := <-input:
case msg := <-a.input:
if msg == nil {
goto shutdown
}

if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID())
output <- buffer
(a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) ||
(a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) {
Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
a.output <- buffer
timer = nil
buffer = nil
flushTriggered = nil
Expand All @@ -533,14 +559,14 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer

if defaultFlush ||
msg.flags&chaser == chaser ||
(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
flushTriggered = output
} else if p.conf.Producer.Flush.Frequency > 0 && timer == nil {
timer = time.After(p.conf.Producer.Flush.Frequency)
(a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) ||
(a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) {
flushTriggered = a.output
} else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil {
timer = time.After(a.parent.conf.Producer.Flush.Frequency)
}
case <-timer:
flushTriggered = output
flushTriggered = a.output
case flushTriggered <- buffer:
timer = nil
buffer = nil
Expand All @@ -551,21 +577,27 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer

shutdown:
if len(buffer) > 0 {
output <- buffer
a.output <- buffer
}
close(output)
close(a.output)
}

// one per broker
// takes a batch at a time from the messageAggregator and sends to the broker
func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) {
type flusher struct {
parent *asyncProducer
broker *Broker
input <-chan []*ProducerMessage
}

func (f *flusher) run() {
var closing error
currentRetries := make(map[string]map[int32]error)
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())

for batch := range input {
for batch := range f.input {
if closing != nil {
p.retryMessages(batch, closing)
f.parent.retryMessages(batch, closing)
continue
}

Expand All @@ -576,10 +608,10 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
if msg.flags&chaser == chaser {
// we can start processing this topic/partition again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
broker.ID(), msg.Topic, msg.Partition)
f.broker.ID(), msg.Topic, msg.Partition)
currentRetries[msg.Topic][msg.Partition] = nil
}
p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
f.parent.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
batch[i] = nil // to prevent it being returned/retried twice
continue
}
Expand All @@ -593,31 +625,31 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
}

request := p.buildRequest(msgSets)
request := f.parent.buildRequest(msgSets)
if request == nil {
continue
}

response, err := broker.Produce(request)
response, err := f.broker.Produce(request)

switch err.(type) {
case nil:
break
case PacketEncodingError:
p.returnErrors(batch, err)
f.parent.returnErrors(batch, err)
continue
default:
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
p.abandonBrokerConnection(broker)
_ = broker.Close()
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
f.parent.abandonBrokerConnection(f.broker)
_ = f.broker.Close()
closing = err
p.retryMessages(batch, err)
f.parent.retryMessages(batch, err)
continue
}

if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
p.returnSuccesses(batch)
f.parent.returnSuccesses(batch)
continue
}

Expand All @@ -628,7 +660,7 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)

block := response.GetBlock(topic, partition)
if block == nil {
p.returnErrors(msgs, ErrIncompleteResponse)
f.parent.returnErrors(msgs, ErrIncompleteResponse)
continue
}

Expand All @@ -638,23 +670,23 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
p.returnSuccesses(msgs)
f.parent.returnSuccesses(msgs)
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
broker.ID(), topic, partition, block.Err)
f.broker.ID(), topic, partition, block.Err)
if currentRetries[topic] == nil {
currentRetries[topic] = make(map[int32]error)
}
currentRetries[topic][partition] = block.Err
p.retryMessages(msgs, block.Err)
f.parent.retryMessages(msgs, block.Err)
default:
p.returnErrors(msgs, block.Err)
f.parent.returnErrors(msgs, block.Err)
}
}
}
}
Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
}

// singleton
Expand Down Expand Up @@ -814,25 +846,24 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
}
}

func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

bp := p.brokers[broker]

if bp == nil {
bp = make(chan *ProducerMessage)
bp = p.newBrokerProducer(broker)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
go withRecover(func() { p.messageAggregator(broker, bp) })
}

p.brokerRefs[bp]++

return bp
}

func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) {
func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

Expand Down