Skip to content

Commit

Permalink
Process results of a single fetch request in parallel
Browse files Browse the repository at this point in the history
And abandon partitions that take too long (e.g. because the client is stuck).
  • Loading branch information
eapache committed Mar 23, 2015
1 parent efedcc1 commit 8196ab1
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 31 deletions.
58 changes: 45 additions & 13 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
partition: partition,
messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
responses: make(chan *FetchResponse, 1),
results: make(chan error, 1),
trigger: make(chan none, 1),
dying: make(chan none),
fetchSize: c.conf.Consumer.Fetch.Default,
Expand All @@ -126,6 +128,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
}

go withRecover(child.dispatcher)
go withRecover(child.responseHandler)

child.broker = c.refBrokerConsumer(leader)
child.broker.input <- child
Expand Down Expand Up @@ -245,9 +248,12 @@ type partitionConsumer struct {
topic string
partition int32

broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError

responses chan *FetchResponse
results chan error
trigger, dying chan none

fetchSize int32
Expand Down Expand Up @@ -293,6 +299,7 @@ func (child *partitionConsumer) dispatcher() {
child.consumer.removeChild(child)
close(child.messages)
close(child.errors)
close(child.responses)
}

func (child *partitionConsumer) dispatch() error {
Expand Down Expand Up @@ -367,6 +374,13 @@ func (child *partitionConsumer) Close() error {
return nil
}

func (child *partitionConsumer) responseHandler() {
for response := range child.responses {
ret := child.handleResponse(response)
child.results <- ret
}
}

func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
block := response.GetBlock(child.topic, child.partition)
if block == nil {
Expand Down Expand Up @@ -508,17 +522,35 @@ func (w *brokerConsumer) subscriptionConsumer() {
}

for child := range w.subscriptions {
if err := child.handleResponse(response); err != nil {
switch err {
default:
child.sendError(err)
fallthrough
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.trigger <- none{}
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err)
child.responses <- response
}
for child := range w.subscriptions {
select {
case err := <-child.results:
if err != nil {
switch err {
default:
child.sendError(err)
fallthrough
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
// these three are not fatal errors, but do require redispatching
child.trigger <- none{}
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err)
}
}
case <-time.After(1 * time.Second):
delete(w.subscriptions, child)
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because it wasn't being consumed\n", w.broker.ID(), child.topic, child.partition)
go func(child *partitionConsumer) {
switch err := <-child.results; err {
case nil, ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
break
default:
child.sendError(err)
}
child.trigger <- none{}
}(child)
}
}
}
Expand Down
24 changes: 6 additions & 18 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func TestConsumerOffsetManual(t *testing.T) {
}
}

leader.Close()
safeClose(t, consumer)
safeClose(t, master)
leader.Close()
}

func TestConsumerLatestOffset(t *testing.T) {
Expand Down Expand Up @@ -237,26 +237,14 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
leader0.Returns(fetchResponse)

// leader0 provides last message on partition 1
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader0.Returns(fetchResponse)

// leader1 provides last message on partition 0
fetchResponse = new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
leader1.Returns(fetchResponse)

wg.Wait()
leader1.Close()
leader0.Close()
seedBroker.Close()
wg.Wait()
safeClose(t, master)
}

func TestConsumerInterleavedClose(t *testing.T) {
t.Skip("Enable once bug #325 is fixed.")

seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 2)

Expand All @@ -278,15 +266,15 @@ func TestConsumerInterleavedClose(t *testing.T) {
t.Fatal(err)
}

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

c1, err := master.ConsumePartition("my_topic", 1, 0)
if err != nil {
t.Fatal(err)
}

time.Sleep(50 * time.Millisecond)

fetchResponse := new(FetchResponse)
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
leader.Returns(fetchResponse)

Expand Down

0 comments on commit 8196ab1

Please sign in to comment.