From 9d159b4cf0e92ccf059835f875c6257e8e250e0d Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 26 Jul 2016 12:06:35 -0400 Subject: [PATCH] Consumer: reduce timer allocations Rather than allocating a new timer with `time.After` on every message we consume, allocate one for the `responseFeeder` and just keep resetting it. Thanks to @Tevic for suggesting this approach. Fixes #707. --- CHANGELOG.md | 2 ++ consumer.go | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 390d86c7a..4b4bde6ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,8 @@ Improvements: ([#634](https://github.com/Shopify/sarama/pull/634)). - Pre-allocate decoding errors, greatly reducing heap usage and GC time against misbehaving brokers ([#690](https://github.com/Shopify/sarama/pull/690)). + - Re-use consumer expiry timers, removing one allocation per consumed message + ([#707](https://github.com/Shopify/sarama/pull/707)). Bug Fixes: - Actually default the client ID to "sarama" like we say we do diff --git a/consumer.go b/consumer.go index c001794e1..c70b528f0 100644 --- a/consumer.go +++ b/consumer.go @@ -413,15 +413,18 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 { func (child *partitionConsumer) responseFeeder() { var msgs []*ConsumerMessage + expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime) feederLoop: for response := range child.feeder { msgs, child.responseResult = child.parseResponse(response) for i, msg := range msgs { + expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) + select { case child.messages <- msg: - case <-time.After(child.conf.Consumer.MaxProcessingTime): + case <-expiryTimer.C: child.responseResult = errTimedOut child.broker.acks.Done() for _, msg = range msgs[i:] {