Skip to content

Commit

Permalink
Allow schedulers to be restarted; separate unit tests from integratio…
Browse files Browse the repository at this point in the history
…n tests
  • Loading branch information
bensheldon committed Aug 3, 2020
1 parent c254589 commit 35bf224
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 134 deletions.
4 changes: 4 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ Rails/ApplicationRecord:
RSpec/AnyInstance:
Enabled: false

RSpec/DescribeClass:
Exclude:
- spec/integration/*

RSpec/ExampleLength:
Enabled: false

Expand Down
10 changes: 4 additions & 6 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,21 @@ def start
ActiveRecord::Base.connection_pool.size
).to_i

queue_names = (
queue_string = (
options[:queues] ||
ENV['GOOD_JOB_QUEUES'] ||
'*'
).split(',').map(&:strip)
)

poll_interval = (
options[:poll_interval] ||
ENV['GOOD_JOB_POLL_INTERVAL']
).to_i

job_query = GoodJob::Job.all.priority_ordered
queue_names_without_all = queue_names.reject { |q| q == '*' }
job_query = job_query.where(queue_name: queue_names_without_all) unless queue_names_without_all.size.zero?
job_query = GoodJob::Job.queue_string(queue_string)
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock)

$stdout.puts "GoodJob worker starting with max_threads=#{max_threads} on queues=#{queue_names.join(',')}"
$stdout.puts "GoodJob worker starting with max_threads=#{max_threads} on queues=#{queue_string}"

timer_options = {}
timer_options[:execution_interval] = poll_interval if poll_interval.positive?
Expand Down
8 changes: 6 additions & 2 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ class Job < ActiveRecord::Base
end
end)
scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) }
scope :priority_ordered, -> { order(priority: :desc) }
scope :priority_ordered, -> { order('priority DESC NULLS LAST') }
scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(timestamp)) : where.not(finished_at: nil) }
scope :queue_string, (lambda do |string|
queue_names_without_all = (string.presence || '*').split(',').map(&:strip).reject { |q| q == '*' }
where(queue_name: queue_names_without_all) unless queue_names_without_all.size.zero?
end)

def self.perform_with_advisory_lock
good_job = nil
result = nil
error = nil

unfinished.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
break unless good_job

Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class LogSubscriber < ActiveSupport::LogSubscriber
def create(event)
good_job = event.payload[:good_job]

info do
debug do
"Created GoodJob resource with id #{good_job.id}"
end
end
Expand Down
31 changes: 21 additions & 10 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,23 @@ def initialize(performer, timer_options: {}, pool_options: {})
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

@performer = performer
@pool = ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
create_thread
end
@timer.add_observer(self, :timer_observer)
@timer.execute
end
@pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options)
@timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options)

def execute
create_pools
end

def shutdown(wait: true)
@_shutdown = true

ActiveSupport::Notifications.instrument("scheduler_start_shutdown.good_job", { wait: wait })
ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait }) do
if @timer.running?
if @timer&.running?
@timer.shutdown
@timer.wait_for_termination if wait
end

if @pool.running?
if @pool&.running?
@pool.shutdown
@pool.wait_for_termination if wait
end
Expand All @@ -56,6 +51,11 @@ def shutdown?
@_shutdown
end

def restart(wait: true)
shutdown(wait: wait) unless shutdown?
create_pools
end

def create_thread
return false unless @pool.ready_worker_count.positive?

Expand Down Expand Up @@ -88,5 +88,16 @@ def ready_worker_count
end
end
end

private

def create_pools
@pool = ThreadPoolExecutor.new(@pool_options)
return unless @timer_options[:execution_interval].positive?

@timer = Concurrent::TimerTask.new(@timer_options) { create_thread }
@timer.add_observer(self, :timer_observer)
@timer.execute
end
end
end
3 changes: 3 additions & 0 deletions spec/dummy/config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class Application < Rails::Application
# Application configuration can go into files in config/initializers
# -- all .rb files in that directory are automatically loaded after loading
# the framework and any gems in your application.
#

config.log_level = :debug
end
end

65 changes: 31 additions & 34 deletions spec/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,50 @@

RSpec.describe GoodJob::Adapter do
let(:adapter) { described_class.new }
let(:active_job) { instance_double(ApplicationJob) }
let(:good_job) { instance_double(GoodJob::Job) }

before do
stub_const 'ExampleJob', (Class.new(ApplicationJob) do
self.queue_name = 'test'
self.priority = 50

def perform(*args, **kwargs)
end
end)
end

around do |example|
original_adapter = ActiveJob::Base.queue_adapter
ActiveJob::Base.queue_adapter = adapter
example.run
ActiveJob::Base.queue_adapter = original_adapter
describe '#initialize' do
it 'guards against improper execution modes' do
expect do
described_class.new(execution_mode: :blarg)
end.to raise_error ArgumentError
end
end

describe '#enqueue' do
it 'performs the job directly' do
ExampleJob.perform_later('first', 'second', keyword_arg: 'keyword_arg')
it 'calls GoodJob::Job.enqueue with parameters' do
allow(GoodJob::Job).to receive(:enqueue).and_return(:good_job)

good_job = GoodJob::Job.last
expect(good_job.queue_name).to eq 'test'
expect(good_job.priority).to eq 50
adapter.enqueue(active_job)

expect(GoodJob::Job).to have_received(:enqueue).with(
active_job,
create_with_advisory_lock: false,
scheduled_at: nil
)
end
end

describe '#enqueue_at' do
it 'assigns parameters' do
expect do
ExampleJob.set(wait: 1.minute).perform_later('first', 'second', keyword_arg: 'keyword_arg')
end.to change(GoodJob::Job, :count).by(1)
it 'calls GoodJob::Job.enqueue with parameters' do
allow(GoodJob::Job).to receive(:enqueue).and_return(:good_job)

scheduled_at = 1.minute.from_now

good_job = GoodJob::Job.last
expect(good_job.queue_name).to eq 'test'
expect(good_job.priority).to eq 50
expect(good_job.scheduled_at).to be_within(1.second).of 1.minute.from_now
adapter.enqueue_at(active_job, scheduled_at.to_i)

expect(GoodJob::Job).to have_received(:enqueue).with(
active_job,
create_with_advisory_lock: false,
scheduled_at: scheduled_at.change(usec: 0)
)
end
end

describe '#provider_job_id' do
it 'is assigned at creation' do
enqueued_job = ExampleJob.perform_later
good_job = GoodJob::Job.find(enqueued_job.provider_job_id)

expect(enqueued_job.provider_job_id).to eq good_job.id
describe '#shutdown' do
it 'is callable' do
adapter.shutdown
end
end
end
2 changes: 1 addition & 1 deletion spec/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), a_kind_of(Hash))

performer_query = performer.instance_variable_get(:@target)
expect(performer_query.to_sql).to eq GoodJob::Job.where(queue_name: %w[mice elephant]).priority_ordered.to_sql
expect(performer_query.to_sql).to eq GoodJob::Job.where(queue_name: %w[mice elephant]).to_sql
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def perform(result_value = nil, raise_error: false)
end)
end

it_behaves_like 'promotable'
it_behaves_like 'lockable'

describe '.enqueue' do
let(:active_job) { ExampleJob.new }
Expand Down
88 changes: 11 additions & 77 deletions spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
@@ -1,94 +1,28 @@
require 'rails_helper'

RSpec.describe GoodJob::Scheduler do
before do
ActiveJob::Base.queue_adapter = adapter
let(:performer) { instance_double(GoodJob::Performer, next: nil) }

stub_const "RUN_JOBS", Concurrent::Array.new
stub_const "THREAD_JOBS", Concurrent::Hash.new(Concurrent::Array.new)

stub_const 'ExampleJob', (Class.new(ApplicationJob) do
self.queue_name = 'test'
self.priority = 50

def perform(*_args, **_kwargs)
thread_name = Thread.current.name || Thread.current.object_id

RUN_JOBS << provider_job_id
THREAD_JOBS[thread_name] << provider_job_id
end
end)

stub_const 'RetryableError', Class.new(StandardError)
stub_const 'ErrorJob', (Class.new(ApplicationJob) do
self.queue_name = 'test'
self.priority = 50
retry_on(RetryableError, wait: 0, attempts: 3) do |job, error|
# puts "FAILED"
end

def perform(*args, **kwargs)
thread_name = Thread.current.name || Thread.current.object_id

RUN_JOBS << { args: args, kwargs: kwargs }
THREAD_JOBS[thread_name] << provider_job_id

raise RetryableError
end
end), transfer_nested_constants: true
end

let(:adapter) { GoodJob::Adapter.new }

context 'when there are a large number of jobs' do
let(:number_of_jobs) { 250 }

let!(:good_jobs) do
number_of_jobs.times do |i|
ExampleJob.perform_later(i)
end
end

it 'pops items off of the queue and runs them' do
performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
describe '#shutdown' do
it 'shuts down the theadpools' do
scheduler = described_class.new(performer)

sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }

if RUN_JOBS.size != number_of_jobs
jobs = THREAD_JOBS.values.flatten

jobs_tally = jobs.each_with_object(Hash.new(0)) do |job_id, hash|
hash[job_id] += 1
end

rerun_jobs = jobs_tally.select { |_key, value| value > 1 }

rerun_jobs.each do |job_id, tally|
rerun_threads = THREAD_JOBS.select { |_thread, thread_jobs| thread_jobs.include? job_id }.keys

puts "Ran job id #{job_id} for #{tally} times on threads #{rerun_threads}"
end
end

scheduler.shutdown

expect(GoodJob::Job.count).to eq(0), -> { "Unworked jobs are #{GoodJob::Job.all.map(&:id)}" }
expect(rerun_jobs).to be_nil
expect(RUN_JOBS.size).to eq number_of_jobs
expect(scheduler.instance_variable_get(:@timer).running?).to be false
expect(scheduler.instance_variable_get(:@pool).running?).to be false
end
end

context 'when job has errors' do
let!(:jobs) { ErrorJob.perform_later }

it "handles and retries jobs with errors" do
performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
describe '#restart' do
it 'restarts the threadpools' do
scheduler = described_class.new(performer)
scheduler.shutdown

sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }
scheduler.restart

scheduler.shutdown
expect(scheduler.instance_variable_get(:@timer).running?).to be true
expect(scheduler.instance_variable_get(:@pool).running?).to be true
end
end
end
Loading

0 comments on commit 35bf224

Please sign in to comment.