Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strict priority polling w/ backmerge #288

Merged
merged 5 commits into from
Dec 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 118 additions & 18 deletions lib/shoryuken/polling.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,53 @@ def ==(other)
end

alias_method :eql?, :==

def to_s
if options.empty?
name
else
"#<QueueConfiguration #{name} options=#{options.inspect}>"
end
end
end

class WeightedRoundRobin
class BaseStrategy
include Util

def next_queue
fail NotImplementedError
end

def messages_found(queue, messages_found)
fail NotImplementedError
end

def active_queues
fail NotImplementedError
end

def ==(other)
case other
when Array
@queues == other
else
if other.respond_to?(:active_queues)
active_queues == other.active_queues
else
false
end
end
end

private

def delay
Shoryuken.options[:delay].to_f
end
end

class WeightedRoundRobin < BaseStrategy

def initialize(queues)
@initial_queues = queues
@queues = queues.dup.uniq
Expand Down Expand Up @@ -57,25 +99,8 @@ def active_queues
unparse_queues(@queues)
end

def ==(other)
case other
when Array
@queues == other
else
if other.respond_to?(:active_queues)
active_queues == other.active_queues
else
false
end
end
end

private

def delay
Shoryuken.options[:delay].to_f
end

def pause(queue)
return unless @queues.delete(queue)
@paused_queues << [Time.now + delay, queue]
Expand All @@ -102,5 +127,80 @@ def queue_weight(queues, queue)
queues.count { |q| q == queue }
end
end

class StrictPriority < BaseStrategy

def initialize(queues)
# Priority ordering of the queues, highest priority first
@queues = queues
.group_by { |q| q }
.sort_by { |_, qs| -qs.count }
.map(&:first)

# Pause status of the queues, default to past time (unpaused)
@paused_until = queues
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

.each_with_object(Hash.new) { |queue, h| h[queue] = Time.at(0) }

# Start queues at 0
reset_next_queue
end

def next_queue
next_queue = next_active_queue
next_queue.nil? ? nil : QueueConfiguration.new(next_queue, {})
end

def messages_found(queue, messages_found)
if messages_found == 0
pause(queue)
else
reset_next_queue
end
end

def active_queues
@queues
.reverse
.map.with_index(1)
.reject { |q, _| queue_paused?(q) }
.reverse
end

private

def next_active_queue
reset_next_queue if queues_unpaused_since?

size = @queues.length
size.times do
queue = @queues[@next_queue_index]
@next_queue_index = (@next_queue_index + 1) % size
return queue unless queue_paused?(queue)
end

return nil
end

def queues_unpaused_since?
last = @last_unpause_check
now = @last_unpause_check = Time.now

last && @paused_until.values.any? { |t| t > last && t <= now }
end

def reset_next_queue
@next_queue_index = 0
end

def queue_paused?(queue)
@paused_until[queue] > Time.now
end

def pause(queue)
return unless delay > 0
@paused_until[queue] = Time.now + delay
logger.debug "Paused '#{queue}'"
end
end
end
end
139 changes: 139 additions & 0 deletions spec/shoryuken/polling_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,142 @@
end
end
end

describe Shoryuken::Polling::StrictPriority do
let(:queue1) { 'shoryuken' }
let(:queue2) { 'uppercut' }
let(:queue3) { 'other' }
let(:queues) { Array.new }
subject { Shoryuken::Polling::StrictPriority.new(queues) }

describe '#next_queue' do
it 'cycles when declared desc' do
# [shoryuken, 2]
# [uppercut, 1]
queues << queue1
queues << queue1
queues << queue2

expect(subject.next_queue).to eq(queue1)
expect(subject.next_queue).to eq(queue2)
expect(subject.next_queue).to eq(queue1)
expect(subject.next_queue).to eq(queue2)
end

it 'cycles when declared asc' do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why looks the same as previous one, while priorities are different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The priorities aren't different, the priorities are the number. They are declaration order independent.

Therefore

[shoryuken, 2]
[uppercut,  1]

and

[uppercut,  1]
[shoryuken, 2]

Have the same meaning, because they have the same priority numbers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, did not see that queues are reversed!

# [uppercut, 1]
# [shoryuken, 2]
queues << queue2
queues << queue1
queues << queue1

expect(subject.next_queue).to eq(queue1)
expect(subject.next_queue).to eq(queue2)
expect(subject.next_queue).to eq(queue1)
expect(subject.next_queue).to eq(queue2)
end

it 'returns nil if there are no active queues' do
expect(subject.next_queue).to eq(nil)
end

it 'unpauses queues whose pause is expired' do
# [shoryuken, 3]
# [uppercut, 2]
# [other, 1]
queues << queue1
queues << queue1
queues << queue1
queues << queue2
queues << queue2
queues << queue3

allow(subject).to receive(:delay).and_return(10)

now = Time.now
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good practice 👍

allow(Time).to receive(:now).and_return(now)

# pause the second queue, see it loop between 1 and 3
subject.messages_found(queue2, 0)
expect(subject.next_queue).to eq(queue1)
expect(subject.next_queue).to eq(queue3)
expect(subject.next_queue).to eq(queue1)

now += 5
allow(Time).to receive(:now).and_return(now)

# pause the first queue, see it repeat 3
subject.messages_found(queue1, 0)
expect(subject.next_queue).to eq(queue3)
expect(subject.next_queue).to eq(queue3)

# pause the third queue, see it have nothing
subject.messages_found(queue3, 0)
expect(subject.next_queue).to eq(nil)

# unpause queue 2
now += 6
allow(Time).to receive(:now).and_return(now)
expect(subject.next_queue).to eq(queue2)

# unpause queues 1 and 3
now += 6
allow(Time).to receive(:now).and_return(now)
expect(subject.next_queue).to eq(queue1)
expect(subject.next_queue).to eq(queue2)
expect(subject.next_queue).to eq(queue3)
end
end

describe '#messages_found' do
it 'pauses a queue if there are no messages found' do
# [shoryuken, 2]
# [uppercut, 1]
queues << queue1
queues << queue1
queues << queue2

expect(subject.active_queues).to eq([[queue1, 2], [queue2, 1]])
expect(subject).to receive(:pause).with(queue1).and_call_original
subject.messages_found(queue1, 0)
expect(subject.active_queues).to eq([[queue2, 1]])
end

it 'continues to queue the highest priority queue if messages are found' do
# [shoryuken, 3]
# [uppercut, 2]
# [other, 1]
queues << queue1
queues << queue1
queues << queue1
queues << queue2
queues << queue2
queues << queue3

expect(subject.next_queue).to eq(queue1)
subject.messages_found(queue1, 1)
expect(subject.next_queue).to eq(queue1)
subject.messages_found(queue1, 1)
expect(subject.next_queue).to eq(queue1)
end

it 'resets the priorities if messages are found part way' do
# [shoryuken, 3]
# [uppercut, 2]
# [other, 1]
queues << queue1
queues << queue1
queues << queue1
queues << queue2
queues << queue2
queues << queue3

expect(subject.next_queue).to eq(queue1)
expect(subject.next_queue).to eq(queue2)
subject.messages_found(queue2, 1)
expect(subject.next_queue).to eq(queue1)
expect(subject.next_queue).to eq(queue2)
expect(subject.next_queue).to eq(queue3)
end
end
end