diff --git a/CHANGELOG.md b/CHANGELOG.md index f133a68641..37b8757e3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ Bug Fixes: - Fix the producer's internal reference counting in certain unusual scenarios ([#367](https://github.com/Shopify/sarama/pull/367)). + - Fix the consumer's internal reference counting in certain unusual scenarios + ([#369](https://github.com/Shopify/sarama/pull/369)). #### Version 1.0.0 (2015-03-17) diff --git a/consumer.go b/consumer.go index 48f22c137a..98e656d462 100644 --- a/consumer.go +++ b/consumer.go @@ -115,10 +115,10 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) return nil, err } - if leader, err := c.client.Leader(child.topic, child.partition); err != nil { + var leader *Broker + var err error + if leader, err = c.client.Leader(child.topic, child.partition); err != nil { return nil, err - } else { - child.broker = leader } if err := c.addChild(child); err != nil { @@ -127,8 +127,8 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) go withRecover(child.dispatcher) - brokerWorker := c.refBrokerConsumer(child.broker) - brokerWorker.input <- child + child.broker = c.refBrokerConsumer(leader) + child.broker.input <- child return child, nil } @@ -171,31 +171,39 @@ func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer { newSubscriptions: make(chan []*partitionConsumer), wait: make(chan none), subscriptions: make(map[*partitionConsumer]none), - refs: 1, + refs: 0, } go withRecover(brokerWorker.subscriptionManager) go withRecover(brokerWorker.subscriptionConsumer) c.brokerConsumers[broker] = brokerWorker - } else { - brokerWorker.refs++ } + brokerWorker.refs++ + return brokerWorker } -func (c *consumer) unrefBrokerConsumer(broker *Broker) { +func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) { c.lock.Lock() defer c.lock.Unlock() - brokerWorker := c.brokerConsumers[broker] brokerWorker.refs-- if brokerWorker.refs == 0 { close(brokerWorker.input) - delete(c.brokerConsumers, broker) + if c.brokerConsumers[brokerWorker.broker] == brokerWorker { + delete(c.brokerConsumers, brokerWorker.broker) + } } } +func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) { + c.lock.Lock() + defer c.lock.Unlock() + + delete(c.brokerConsumers, brokerWorker.broker) +} + // PartitionConsumer // PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close() @@ -237,7 +245,7 @@ type partitionConsumer struct { topic string partition int32 - broker *Broker + broker *brokerConsumer messages chan *ConsumerMessage errors chan *ConsumerError trigger, dying chan none @@ -291,15 +299,15 @@ func (child *partitionConsumer) dispatch() error { return err } - if leader, err := child.consumer.client.Leader(child.topic, child.partition); err != nil { + var leader *Broker + var err error + if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil { return err - } else { - child.broker = leader } - brokerWorker := child.consumer.refBrokerConsumer(child.broker) + child.broker = child.consumer.refBrokerConsumer(leader) - brokerWorker.input <- child + child.broker.input <- child return nil } @@ -463,6 +471,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo } func (w *brokerConsumer) abort(err error) { + w.consumer.abandonBrokerConsumer(w) _ = w.broker.Close() // we don't care about the error this might return, we already have one for child := range w.subscriptions { diff --git a/consumer_test.go b/consumer_test.go index bf491491f8..42d32741fd 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -297,6 +297,95 @@ func TestConsumerInterleavedClose(t *testing.T) { seedBroker.Close() } +func TestConsumerBounceWithReferenceOpen(t *testing.T) { + seedBroker := newMockBroker(t, 1) + leader := newMockBroker(t, 2) + leaderAddr := leader.Addr() + + metadataResponse := new(MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError) + metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError) + seedBroker.Returns(metadataResponse) + + config := NewConfig() + config.Consumer.Return.Errors = true + config.Consumer.Retry.Backoff = 0 + config.ChannelBufferSize = 0 + master, err := NewConsumer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + c0, err := master.ConsumePartition("my_topic", 0, 0) + if err != nil { + t.Fatal(err) + } + + c1, err := master.ConsumePartition("my_topic", 1, 0) + if err != nil { + t.Fatal(err) + } + + fetchResponse := new(FetchResponse) + fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) + fetchResponse.AddError("my_topic", 1, ErrNoError) + leader.Returns(fetchResponse) + <-c0.Messages() + + fetchResponse = new(FetchResponse) + fetchResponse.AddError("my_topic", 0, ErrNoError) + fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0)) + leader.Returns(fetchResponse) + <-c1.Messages() + + leader.Close() + leader = newMockBrokerAddr(t, 2, leaderAddr) + + // unblock one of the two (it doesn't matter which) + select { + case <-c0.Errors(): + case <-c1.Errors(): + } + // send it back to the same broker + seedBroker.Returns(metadataResponse) + + fetchResponse = new(FetchResponse) + fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) + fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1)) + leader.Returns(fetchResponse) + + time.Sleep(5 * time.Millisecond) + + // unblock the other one + select { + case <-c0.Errors(): + case <-c1.Errors(): + } + // send it back to the same broker + seedBroker.Returns(metadataResponse) + + select { + case <-c0.Messages(): + case <-c1.Messages(): + } + + leader.Close() + seedBroker.Close() + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + _ = c0.Close() + wg.Done() + }() + go func() { + _ = c1.Close() + wg.Done() + }() + wg.Wait() + safeClose(t, master) +} + // This example has the simplest use case of the consumer. It simply // iterates over the messages channel using a for/range loop. Because // a producer never stopsunless requested, a signal handler is registered