Skip to content

Commit

Permalink
producer: bugfix for broker flushers getting stuck
Browse files Browse the repository at this point in the history
In certain unusual circumstances, the producer could have added new references
to a flusher that was shutting down, preventing it from shutting down and
causing it to try to produce on a network connection that was already closed.

Track "current" and "active" flushers separately - remove flushers from the
"current" set immediately when they begin shutdown so that nothing else tries to
take a reference, but leave them in "active" so that they can be cleaned up
properly when their reference count hits 0.

Add a test which fails without this fix in place.
  • Loading branch information
eapache committed Mar 18, 2015
1 parent 4f41ea6 commit 7214846
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 34 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

#### Unreleased

Bug Fixes:
- Fix the producer's internal reference counting in certain unusual scenarios
([#367](https://github.com/Shopify/sarama/pull/367/files)).

#### Version 1.0.0 (2015-03-17)

Version 1.0.0 is the first tagged version, and is almost a complete rewrite. The primary differences with previous untagged versions are:
Expand Down
73 changes: 39 additions & 34 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type asyncProducer struct {
errors chan *ProducerError
input, successes, retries chan *ProducerMessage

brokers map[*Broker]*brokerProducer
brokers map[*Broker]chan *ProducerMessage
brokerRefs map[chan *ProducerMessage]int
brokerLock sync.Mutex
}

Expand Down Expand Up @@ -82,13 +83,14 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
}

p := &asyncProducer{
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]*brokerProducer),
client: client,
conf: client.Config(),
errors: make(chan *ProducerError),
input: make(chan *ProducerMessage),
successes: make(chan *ProducerMessage),
retries: make(chan *ProducerMessage),
brokers: make(map[*Broker]chan *ProducerMessage),
brokerRefs: make(map[chan *ProducerMessage]int),
}

// launch our singleton dispatchers
Expand Down Expand Up @@ -340,7 +342,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
retryState[msg.retries].expectChaser = true
output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
p.unrefBrokerProducer(leader)
p.unrefBrokerProducer(leader, output)
output = nil
time.Sleep(p.conf.Producer.Retry.Backoff)
} else if highWatermark > 0 {
Expand Down Expand Up @@ -406,7 +408,9 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
output <- msg
}

p.unrefBrokerProducer(leader)
if output != nil {
p.unrefBrokerProducer(leader, output)
}
p.retries <- &ProducerMessage{flags: unref}
}

Expand Down Expand Up @@ -529,9 +533,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
continue
default:
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
closing = err
_ = broker.Close()
p.abandonBrokerConnection(broker)
p.retryMessages(batch, err)
_ = broker.Close()
closing = err
continue
}

Expand Down Expand Up @@ -769,43 +774,43 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
}
}

type brokerProducer struct {
input chan *ProducerMessage
refs int
}

func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

producer := p.brokers[broker]
bp := p.brokers[broker]

if producer == nil {
if bp == nil {
p.retries <- &ProducerMessage{flags: ref}
producer = &brokerProducer{
refs: 1,
input: make(chan *ProducerMessage),
}
p.brokers[broker] = producer
go withRecover(func() { p.messageAggregator(broker, producer.input) })
} else {
producer.refs++
bp = make(chan *ProducerMessage)
p.brokers[broker] = bp
p.brokerRefs[bp] = 0
go withRecover(func() { p.messageAggregator(broker, bp) })
}

return producer.input
p.brokerRefs[bp]++

return bp
}

func (p *asyncProducer) unrefBrokerProducer(broker *Broker) {
func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

producer := p.brokers[broker]
p.brokerRefs[bp]--
if p.brokerRefs[bp] == 0 {
close(bp)
delete(p.brokerRefs, bp)

if producer != nil {
producer.refs--
if producer.refs == 0 {
close(producer.input)
if p.brokers[broker] == bp {
delete(p.brokers, broker)
}
}
}

func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
p.brokerLock.Lock()
defer p.brokerLock.Unlock()

delete(p.brokers, broker)
}
69 changes: 69 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,75 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
safeClose(t, producer)
}

func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)
leaderAddr := leader.Addr()

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)

config := NewConfig()
config.Producer.Return.Successes = true
config.Producer.Retry.Backoff = 0
config.Producer.Retry.Max = 1
config.Producer.Partitioner = NewRoundRobinPartitioner
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

// prime partition 0
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}

// prime partition 1
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}

// reboot the broker (the producer will get EOF on its existing connection)
leader.Close()
leader = newMockBrokerAddr(t, 2, leaderAddr)

// send another message on partition 0 to trigger the EOF and retry
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}

// tell partition 0 to go to that broker again
seedBroker.Returns(metadataResponse)

// succeed this time
prodSuccess = new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
leader.Returns(prodSuccess)
select {
case msg := <-producer.Errors():
t.Error(msg.Err)
case <-producer.Successes():
}

// shutdown
closeProducer(t, producer)
seedBroker.Close()
leader.Close()
}

// This example shows how to use the producer while simultaneously
// reading the Errors channel to know about any failures.
func ExampleAsyncProducer_select() {
Expand Down

0 comments on commit 7214846

Please sign in to comment.