Skip to content

Commit

Permalink
Merge pull request #341 from lairen/pause-bug
Browse files Browse the repository at this point in the history
Fix pause and resume bug
  • Loading branch information
dasch authored Oct 9, 2023
2 parents c44330f + 3f1c35a commit 24855a0
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 4 deletions.
11 changes: 11 additions & 0 deletions lib/racecar/consumer_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def initialize(config, logger, instrumenter = NullInstrumenter)
@previous_retries = 0

@last_poll_read_nil_message = false
@paused_tpls = Hash.new { |h, k| h[k] = {} }
end

def poll(max_wait_time_ms = @config.max_wait_time_ms)
Expand Down Expand Up @@ -65,6 +66,7 @@ def commit

def close
each_subscribed(&:close)
@paused_tpls.clear
end

def current
Expand Down Expand Up @@ -100,16 +102,25 @@ def pause(topic, partition, offset)
consumer.pause(filtered_tpl)
fake_msg = OpenStruct.new(topic: topic, partition: partition, offset: offset)
consumer.seek(fake_msg)

@paused_tpls[topic][partition] = [consumer, filtered_tpl]
end

def resume(topic, partition)
consumer, filtered_tpl = find_consumer_by(topic, partition)

if !consumer && @paused_tpls[topic][partition]
consumer, filtered_tpl = @paused_tpls[topic][partition]
end

if !consumer
@logger.info "Attempted to resume #{topic}/#{partition}, but we're not subscribed to it"
return
end

consumer.resume(filtered_tpl)
@paused_tpls[topic].delete(partition)
@paused_tpls.delete(topic) if @paused_tpls[topic].empty?
end

alias :each :each_subscribed
Expand Down
89 changes: 85 additions & 4 deletions spec/consumer_set_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ def subscription(name)
Racecar::Consumer::Subscription.new(name, true, 1048576, {})
end

def tpl(subscription)
def tpl(subscription, partitions=[0])
Rdkafka::Consumer::TopicPartitionList.new.tap do |tpl|
tpl.add_topic(subscription.topic, 1)
tpl.add_topic(subscription.topic, partitions)
end
end

Expand Down Expand Up @@ -88,6 +88,16 @@ def message_generator(messages)
consumer_set.pause("greetings", 0, 123456)
end

it "#pause keeps tracked of paused tpls and consumers" do
allow(rdconsumer).to receive(:pause)
allow(rdconsumer).to receive(:seek)

expect do
consumer_set.pause("greetings", 0, 123456)
end.to change { consumer_set.instance_variable_get(:@paused_tpls) }
.to({"greetings" => {0 => [rdconsumer, tpl(subscriptions.first)]}})
end

it "#resume allows to resume known partitions" do
expect(rdconsumer).to receive(:resume) do |tpl|
expect(tpl.count).to eq 1
Expand All @@ -100,6 +110,38 @@ def message_generator(messages)
expect(rdconsumer).not_to receive(:resume)
consumer_set.resume("greetings", 1)
end

it "#resume allows to resume paused partitions that are no longer assigned to consumer" do
expect(rdconsumer).to receive(:pause)
allow(rdconsumer).to receive(:seek)
consumer_set.pause("greetings", 0, 12345)

new_tpl_assignment = tpl(subscription("greetings"), [1])
expect(rdconsumer).to receive(:assignment).and_return(new_tpl_assignment)

paused_tpl = tpl(subscription("greetings"), [0])
expect(rdconsumer).to receive(:resume) do |tpl|
expect(tpl.count).to eq 1
expect(tpl).to be_kind_of Rdkafka::Consumer::TopicPartitionList
expect(tpl).to match(paused_tpl)
end
consumer_set.resume("greetings", 0)
end

it "#resume removes topic/ partition from paused_tpls hash" do
allow(rdconsumer).to receive(:resume)
partition_0_tpl = tpl(subscriptions.first, [0])
partition_1_tpl = tpl(subscriptions.first, [1])
consumer_set.instance_variable_set(:@paused_tpls, {"greetings" => {
0 => [rdconsumer, partition_0_tpl],
1 => [rdconsumer, partition_1_tpl]
}})
expect do
consumer_set.resume("greetings", 0)
end.to change {
consumer_set.instance_variable_get(:@paused_tpls)
}.to({"greetings" => {1 => [rdconsumer, partition_1_tpl]}})
end
end

describe "#poll" do
Expand Down Expand Up @@ -243,10 +285,10 @@ def message_generator(messages)
end

describe "#store_offset" do
it "raises ErroneousStateError when RD_KAFKA_RESP_ERR__STATE(-172) is not raised" do
it "does not raise ErroneousStateError when RD_KAFKA_RESP_ERR__STATE(-172) is raised" do
allow(logger).to receive(:warn)
allow(rdconsumer).to receive(:store_offset).with(:message).and_raise(Rdkafka::RdkafkaError, -172) # state
expect {consumer_set.store_offset(:message) }.not_to raise_error(Racecar::ErroneousStateError)
expect { consumer_set.store_offset(:message) }.not_to raise_error
expect(logger).to have_received(:warn)
end

Expand All @@ -273,6 +315,16 @@ def message_generator(messages)
expect(rdconsumer).to receive(:close).once
consumer_set.close
end

it "clears paused_tpls" do
allow(rdconsumer).to receive(:close)
consumer_set.instance_variable_set(:@paused_tpls, {"topic" => {0 => []}})
expect do
consumer_set.close
end.to change {
consumer_set.instance_variable_get(:@paused_tpls)
}.to({})
end
end

describe "#current" do
Expand Down Expand Up @@ -398,6 +450,35 @@ def message_generator(messages)
expect(rdconsumer3).not_to receive(:resume)
consumer_set.resume("unknowntopic", 0)
end

it "#resume allows to resume paused partitions that are no longer assigned to consumer" do
allow(rdconsumer1).to receive(:pause)
allow(rdconsumer1).to receive(:seek)
consumer_set.pause("feature", 0, 12345)

new_tpl_assignment = tpl(subscription("feature"), [1])
expect(rdconsumer1).to receive(:assignment).and_return(new_tpl_assignment)

paused_tpl = tpl(subscription("feature"), [0])
expect(rdconsumer1).to receive(:resume) do |tpl|
expect(tpl.count).to eq 1
expect(tpl).to be_kind_of Rdkafka::Consumer::TopicPartitionList
expect(tpl).to match(paused_tpl)
end
expect(rdconsumer2).to_not receive(:resume)
expect(rdconsumer3).to_not receive(:resume)
consumer_set.resume("feature", 0)
end

it "#resume removes the topic/partition from the paused_tpls hash" do
allow(rdconsumer1).to receive(:resume)
consumer_set.instance_variable_set(:@paused_tpls, {"feature" => {0 => []}})
expect do
consumer_set.resume("feature", 0)
end.to change {
consumer_set.instance_variable_get(:@paused_tpls)
}.to({})
end
end

it "#poll retries upon max poll exceeded" do
Expand Down

0 comments on commit 24855a0

Please sign in to comment.