Skip to content

Commit

Permalink
feat: add methods to pause/resume consumer's consumption (#2005)
Browse files Browse the repository at this point in the history
* feat: add methods to pause/resume consumers

It aims to allow consumption control, providing some methods to pause
and resume consumer consumption.

When your data destination is offline it becomes pointless to continue
to consume new messages from the broker once it certainly will result in
an error. The Java library already provides something thing similar to
it.

Note that the consumption state is not preserved between the rebalance
process, so it's the user responsibility to manage it using the
callbacks.
  • Loading branch information
raulnegreiros authored Jan 22, 2022
1 parent f1ea13a commit 31d757b
Show file tree
Hide file tree
Showing 8 changed files with 472 additions and 21 deletions.
111 changes: 110 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,26 @@ type Consumer interface {
// Close shuts down the consumer. It must be called after all child
// PartitionConsumers have already been closed.
Close() error

// Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
// records from these partitions until they have been resumed using Resume()/ResumeAll().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
Pause(topicPartitions map[string][]int32)

// Resume resumes specified partitions which have been paused with Pause()/PauseAll().
// New calls to the broker will return records from these partitions if there are any to be fetched.
Resume(topicPartitions map[string][]int32)

// Pause suspends fetching from all partitions. Future calls to the broker will not return any
// records from these partitions until they have been resumed using Resume()/ResumeAll().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
PauseAll()

// Resume resumes all partitions which have been paused with Pause()/PauseAll().
// New calls to the broker will return records from these partitions if there are any to be fetched.
ResumeAll()
}

type consumer struct {
Expand Down Expand Up @@ -245,6 +265,62 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
delete(c.brokerConsumers, brokerWorker.broker)
}

// Pause implements Consumer.
func (c *consumer) Pause(topicPartitions map[string][]int32) {
c.lock.Lock()
defer c.lock.Unlock()

for topic, partitions := range topicPartitions {
for _, partition := range partitions {
if topicConsumers, ok := c.children[topic]; ok {
if partitionConsumer, ok := topicConsumers[partition]; ok {
partitionConsumer.Pause()
}
}
}
}
}

// Resume implements Consumer.
func (c *consumer) Resume(topicPartitions map[string][]int32) {
c.lock.Lock()
defer c.lock.Unlock()

for topic, partitions := range topicPartitions {
for _, partition := range partitions {
if topicConsumers, ok := c.children[topic]; ok {
if partitionConsumer, ok := topicConsumers[partition]; ok {
partitionConsumer.Resume()
}
}
}
}
}

// PauseAll implements Consumer.
func (c *consumer) PauseAll() {
c.lock.Lock()
defer c.lock.Unlock()

for _, partitions := range c.children {
for _, partitionConsumer := range partitions {
partitionConsumer.Pause()
}
}
}

// ResumeAll implements Consumer.
func (c *consumer) ResumeAll() {
c.lock.Lock()
defer c.lock.Unlock()

for _, partitions := range c.children {
for _, partitionConsumer := range partitions {
partitionConsumer.Resume()
}
}
}

// PartitionConsumer

// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
Expand Down Expand Up @@ -292,6 +368,20 @@ type PartitionConsumer interface {
// i.e. the offset that will be used for the next message that will be produced.
// You can use this to determine how far behind the processing is.
HighWaterMarkOffset() int64

// Pause suspends fetching from this partition. Future calls to the broker will not return
// any records from these partition until it have been resumed using Resume().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
Pause()

// Resume resumes this partition which have been paused with Pause().
// New calls to the broker will return records from these partitions if there are any to be fetched.
// If the partition was not previously paused, this method is a no-op.
Resume()

// IsPaused indicates if this partition consumer is paused or not
IsPaused() bool
}

type partitionConsumer struct {
Expand All @@ -314,6 +404,8 @@ type partitionConsumer struct {
fetchSize int32
offset int64
retries int32

paused int32
}

var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
Expand Down Expand Up @@ -737,6 +829,21 @@ func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
}
}

// Pause implements PartitionConsumer.
func (child *partitionConsumer) Pause() {
atomic.StoreInt32(&child.paused, 1)
}

// Resume implements PartitionConsumer.
func (child *partitionConsumer) Resume() {
atomic.StoreInt32(&child.paused, 0)
}

// IsPaused implements PartitionConsumer.
func (child *partitionConsumer) IsPaused() bool {
return atomic.LoadInt32(&child.paused) == 1
}

type brokerConsumer struct {
consumer *consumer
broker *Broker
Expand Down Expand Up @@ -962,7 +1069,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
if !child.IsPaused() {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
}
}

return bc.broker.Fetch(request)
Expand Down
40 changes: 40 additions & 0 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,26 @@ type ConsumerGroup interface {
// Close stops the ConsumerGroup and detaches any running sessions. It is required to call
// this function before the object passes out of scope, as it will otherwise leak memory.
Close() error

// Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
// records from these partitions until they have been resumed using Resume()/ResumeAll().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
Pause(partitions map[string][]int32)

// Resume resumes specified partitions which have been paused with Pause()/PauseAll().
// New calls to the broker will return records from these partitions if there are any to be fetched.
Resume(partitions map[string][]int32)

// Pause suspends fetching from all partitions. Future calls to the broker will not return any
// records from these partitions until they have been resumed using Resume()/ResumeAll().
// Note that this method does not affect partition subscription.
// In particular, it does not cause a group rebalance when automatic assignment is used.
PauseAll()

// Resume resumes all partitions which have been paused with Pause()/PauseAll().
// New calls to the broker will return records from these partitions if there are any to be fetched.
ResumeAll()
}

type consumerGroup struct {
Expand Down Expand Up @@ -188,6 +208,26 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
return sess.release(true)
}

// Pause implements ConsumerGroup.
func (c *consumerGroup) Pause(partitions map[string][]int32) {
c.consumer.Pause(partitions)
}

// Resume implements ConsumerGroup.
func (c *consumerGroup) Resume(partitions map[string][]int32) {
c.consumer.Resume(partitions)
}

// PauseAll implements ConsumerGroup.
func (c *consumerGroup) PauseAll() {
c.consumer.PauseAll()
}

// ResumeAll implements ConsumerGroup.
func (c *consumerGroup) ResumeAll() {
c.consumer.ResumeAll()
}

func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
select {
case <-c.closed:
Expand Down
81 changes: 81 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,87 @@ func TestConsumerOffsetManual(t *testing.T) {
broker0.Close()
}

func TestPauseResumeConsumption(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 0)

const newestOffsetBroker = 1233
const maxOffsetBroker = newestOffsetBroker + 10
offsetBroker := newestOffsetBroker
offsetClient := offsetBroker

mockFetchResponse := NewMockFetchResponse(t, 1)
mockFetchResponse.SetMessage("my_topic", 0, int64(newestOffsetBroker), testMsg)
offsetBroker++

brokerResponses := map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetOffset("my_topic", 0, OffsetOldest, 0).
SetOffset("my_topic", 0, OffsetNewest, int64(newestOffsetBroker)),
"FetchRequest": mockFetchResponse,
}

broker0.SetHandlerByMap(brokerResponses)

// When
master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
t.Fatal(err)
}

// pause the consumption
consumer.Pause()

// set more msgs on broker
for ; offsetBroker < maxOffsetBroker; offsetBroker++ {
mockFetchResponse = mockFetchResponse.SetMessage("my_topic", 0, int64(offsetBroker), testMsg)
}
brokerResponses["FetchRequest"] = mockFetchResponse
broker0.SetHandlerByMap(brokerResponses)

keepConsuming := true
for keepConsuming {
select {
case message := <-consumer.Messages():
// only the first msg is expected to be consumed
offsetClient++
assertMessageOffset(t, message, int64(newestOffsetBroker))
case err := <-consumer.Errors():
t.Fatal(err)
case <-time.After(time.Second):
// is expected to timedout once the consumption is pauses
keepConsuming = false
}
}

// lets resume the consumption in order to consume the new msgs
consumer.Resume()

for offsetClient < maxOffsetBroker {
select {
case message := <-consumer.Messages():
assertMessageOffset(t, message, int64(offsetClient))
offsetClient += 1
case err := <-consumer.Errors():
t.Fatal("Error: ", err)
case <-time.After(time.Second * 10):
t.Fatal("consumer timed out . Offset: ", offsetClient)
}
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

// If `OffsetNewest` is passed as the initial offset then the first consumed
// message indeed corresponds to the offset that broker claims to be the
// newest in its metadata response.
Expand Down
4 changes: 3 additions & 1 deletion examples/consumergroup/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ This example shows you how to use the Sarama consumer group consumer. The exampl

```bash
$ go run main.go -brokers="127.0.0.1:9092" -topics="sarama" -group="example"
```
```

You can also toggle (pause/resume) the consumption by sending SIGUSR1
35 changes: 30 additions & 5 deletions examples/consumergroup/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package main

// SIGUSR1 toggle the pause/resume consumption
import (
"context"
"flag"
Expand Down Expand Up @@ -48,6 +49,7 @@ func init() {
}

func main() {
keepRunning := true
log.Println("Starting a new Sarama consumer")

if verbose {
Expand Down Expand Up @@ -94,6 +96,7 @@ func main() {
log.Panicf("Error creating consumer group client: %v", err)
}

consumptionIsPaused := false
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
Expand All @@ -116,13 +119,23 @@ func main() {
<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")

sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
case <-sigterm:
log.Println("terminating: via signal")

for keepRunning {
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
case <-sigusr1:
toggleConsumptionFlow(client, &consumptionIsPaused)
}
}
cancel()
wg.Wait()
Expand All @@ -131,6 +144,18 @@ func main() {
}
}

func toggleConsumptionFlow(client sarama.ConsumerGroup, isPaused *bool) {
if *isPaused {
client.ResumeAll()
log.Println("Resuming consumption")
} else {
client.PauseAll()
log.Println("Pausing consumption")
}

*isPaused = !*isPaused
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
Expand Down
Loading

0 comments on commit 31d757b

Please sign in to comment.