Skip to content

Commit

Permalink
Consumer: reduce timer allocations
Browse files Browse the repository at this point in the history
Rather than allocating a new timer with `time.After` on every message we
consume, allocate one for the `responseFeeder` and just keep resetting it.
Thanks to @Tevic for suggesting this approach.

Fixes #707.
  • Loading branch information
eapache committed Jul 26, 2016
1 parent 146ec3d commit 9d159b4
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Improvements:
([#634](https://github.com/Shopify/sarama/pull/634)).
- Pre-allocate decoding errors, greatly reducing heap usage and GC time against
misbehaving brokers ([#690](https://github.com/Shopify/sarama/pull/690)).
- Re-use consumer expiry timers, removing one allocation per consumed message
([#707](https://github.com/Shopify/sarama/pull/707)).

Bug Fixes:
- Actually default the client ID to "sarama" like we say we do
Expand Down
5 changes: 4 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,18 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {

func (child *partitionConsumer) responseFeeder() {
var msgs []*ConsumerMessage
expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)

feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)

for i, msg := range msgs {
expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)

select {
case child.messages <- msg:
case <-time.After(child.conf.Consumer.MaxProcessingTime):
case <-expiryTimer.C:
child.responseResult = errTimedOut
child.broker.acks.Done()
for _, msg = range msgs[i:] {
Expand Down

0 comments on commit 9d159b4

Please sign in to comment.