Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow schedulers to be restarted; separate unit tests from integration tests #66

Merged
merged 1 commit into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ Rails/ApplicationRecord:
RSpec/AnyInstance:
Enabled: false

RSpec/DescribeClass:
Exclude:
- spec/integration/*

RSpec/ExampleLength:
Enabled: false

Expand Down
10 changes: 4 additions & 6 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,21 @@ def start
ActiveRecord::Base.connection_pool.size
).to_i

queue_names = (
queue_string = (
options[:queues] ||
ENV['GOOD_JOB_QUEUES'] ||
'*'
).split(',').map(&:strip)
)

poll_interval = (
options[:poll_interval] ||
ENV['GOOD_JOB_POLL_INTERVAL']
).to_i

job_query = GoodJob::Job.all.priority_ordered
queue_names_without_all = queue_names.reject { |q| q == '*' }
job_query = job_query.where(queue_name: queue_names_without_all) unless queue_names_without_all.size.zero?
job_query = GoodJob::Job.queue_string(queue_string)
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock)

$stdout.puts "GoodJob worker starting with max_threads=#{max_threads} on queues=#{queue_names.join(',')}"
$stdout.puts "GoodJob worker starting with max_threads=#{max_threads} on queues=#{queue_string}"

timer_options = {}
timer_options[:execution_interval] = poll_interval if poll_interval.positive?
Expand Down
8 changes: 6 additions & 2 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ class Job < ActiveRecord::Base
end
end)
scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) }
scope :priority_ordered, -> { order(priority: :desc) }
scope :priority_ordered, -> { order('priority DESC NULLS LAST') }
scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(timestamp)) : where.not(finished_at: nil) }
scope :queue_string, (lambda do |string|
queue_names_without_all = (string.presence || '*').split(',').map(&:strip).reject { |q| q == '*' }
where(queue_name: queue_names_without_all) unless queue_names_without_all.size.zero?
end)

def self.perform_with_advisory_lock
good_job = nil
result = nil
error = nil

unfinished.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
break unless good_job

Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class LogSubscriber < ActiveSupport::LogSubscriber
def create(event)
good_job = event.payload[:good_job]

info do
debug do
"Created GoodJob resource with id #{good_job.id}"
end
end
Expand Down
31 changes: 21 additions & 10 deletions lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,23 @@ def initialize(performer, timer_options: {}, pool_options: {})
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

@performer = performer
@pool = ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
@timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
create_thread
end
@timer.add_observer(self, :timer_observer)
@timer.execute
end
@pool_options = DEFAULT_POOL_OPTIONS.merge(pool_options)
@timer_options = DEFAULT_TIMER_OPTIONS.merge(timer_options)

def execute
create_pools
end

def shutdown(wait: true)
@_shutdown = true

ActiveSupport::Notifications.instrument("scheduler_start_shutdown.good_job", { wait: wait })
ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait }) do
if @timer.running?
if @timer&.running?
@timer.shutdown
@timer.wait_for_termination if wait
end

if @pool.running?
if @pool&.running?
@pool.shutdown
@pool.wait_for_termination if wait
end
Expand All @@ -56,6 +51,11 @@ def shutdown?
@_shutdown
end

def restart(wait: true)
shutdown(wait: wait) unless shutdown?
create_pools
end

def create_thread
return false unless @pool.ready_worker_count.positive?

Expand Down Expand Up @@ -88,5 +88,16 @@ def ready_worker_count
end
end
end

private

def create_pools
@pool = ThreadPoolExecutor.new(@pool_options)
return unless @timer_options[:execution_interval].positive?

@timer = Concurrent::TimerTask.new(@timer_options) { create_thread }
@timer.add_observer(self, :timer_observer)
@timer.execute
end
end
end
3 changes: 3 additions & 0 deletions spec/dummy/config/application.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ class Application < Rails::Application
# Application configuration can go into files in config/initializers
# -- all .rb files in that directory are automatically loaded after loading
# the framework and any gems in your application.
#

config.log_level = :debug
end
end

65 changes: 31 additions & 34 deletions spec/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,50 @@

RSpec.describe GoodJob::Adapter do
let(:adapter) { described_class.new }
let(:active_job) { instance_double(ApplicationJob) }
let(:good_job) { instance_double(GoodJob::Job) }

before do
stub_const 'ExampleJob', (Class.new(ApplicationJob) do
self.queue_name = 'test'
self.priority = 50

def perform(*args, **kwargs)
end
end)
end

around do |example|
original_adapter = ActiveJob::Base.queue_adapter
ActiveJob::Base.queue_adapter = adapter
example.run
ActiveJob::Base.queue_adapter = original_adapter
describe '#initialize' do
it 'guards against improper execution modes' do
expect do
described_class.new(execution_mode: :blarg)
end.to raise_error ArgumentError
end
end

describe '#enqueue' do
it 'performs the job directly' do
ExampleJob.perform_later('first', 'second', keyword_arg: 'keyword_arg')
it 'calls GoodJob::Job.enqueue with parameters' do
allow(GoodJob::Job).to receive(:enqueue).and_return(:good_job)

good_job = GoodJob::Job.last
expect(good_job.queue_name).to eq 'test'
expect(good_job.priority).to eq 50
adapter.enqueue(active_job)

expect(GoodJob::Job).to have_received(:enqueue).with(
active_job,
create_with_advisory_lock: false,
scheduled_at: nil
)
end
end

describe '#enqueue_at' do
it 'assigns parameters' do
expect do
ExampleJob.set(wait: 1.minute).perform_later('first', 'second', keyword_arg: 'keyword_arg')
end.to change(GoodJob::Job, :count).by(1)
it 'calls GoodJob::Job.enqueue with parameters' do
allow(GoodJob::Job).to receive(:enqueue).and_return(:good_job)

scheduled_at = 1.minute.from_now

good_job = GoodJob::Job.last
expect(good_job.queue_name).to eq 'test'
expect(good_job.priority).to eq 50
expect(good_job.scheduled_at).to be_within(1.second).of 1.minute.from_now
adapter.enqueue_at(active_job, scheduled_at.to_i)

expect(GoodJob::Job).to have_received(:enqueue).with(
active_job,
create_with_advisory_lock: false,
scheduled_at: scheduled_at.change(usec: 0)
)
end
end

describe '#provider_job_id' do
it 'is assigned at creation' do
enqueued_job = ExampleJob.perform_later
good_job = GoodJob::Job.find(enqueued_job.provider_job_id)

expect(enqueued_job.provider_job_id).to eq good_job.id
describe '#shutdown' do
it 'is callable' do
adapter.shutdown
end
end
end
2 changes: 1 addition & 1 deletion spec/good_job/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
expect(GoodJob::Scheduler).to have_received(:new).with(a_kind_of(GoodJob::Performer), a_kind_of(Hash))

performer_query = performer.instance_variable_get(:@target)
expect(performer_query.to_sql).to eq GoodJob::Job.where(queue_name: %w[mice elephant]).priority_ordered.to_sql
expect(performer_query.to_sql).to eq GoodJob::Job.where(queue_name: %w[mice elephant]).to_sql
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def perform(result_value = nil, raise_error: false)
end)
end

it_behaves_like 'promotable'
it_behaves_like 'lockable'

describe '.enqueue' do
let(:active_job) { ExampleJob.new }
Expand Down
88 changes: 11 additions & 77 deletions spec/good_job/scheduler_spec.rb
Original file line number Diff line number Diff line change
@@ -1,94 +1,28 @@
require 'rails_helper'

RSpec.describe GoodJob::Scheduler do
before do
ActiveJob::Base.queue_adapter = adapter
let(:performer) { instance_double(GoodJob::Performer, next: nil) }

stub_const "RUN_JOBS", Concurrent::Array.new
stub_const "THREAD_JOBS", Concurrent::Hash.new(Concurrent::Array.new)

stub_const 'ExampleJob', (Class.new(ApplicationJob) do
self.queue_name = 'test'
self.priority = 50

def perform(*_args, **_kwargs)
thread_name = Thread.current.name || Thread.current.object_id

RUN_JOBS << provider_job_id
THREAD_JOBS[thread_name] << provider_job_id
end
end)

stub_const 'RetryableError', Class.new(StandardError)
stub_const 'ErrorJob', (Class.new(ApplicationJob) do
self.queue_name = 'test'
self.priority = 50
retry_on(RetryableError, wait: 0, attempts: 3) do |job, error|
# puts "FAILED"
end

def perform(*args, **kwargs)
thread_name = Thread.current.name || Thread.current.object_id

RUN_JOBS << { args: args, kwargs: kwargs }
THREAD_JOBS[thread_name] << provider_job_id

raise RetryableError
end
end), transfer_nested_constants: true
end

let(:adapter) { GoodJob::Adapter.new }

context 'when there are a large number of jobs' do
let(:number_of_jobs) { 250 }

let!(:good_jobs) do
number_of_jobs.times do |i|
ExampleJob.perform_later(i)
end
end

it 'pops items off of the queue and runs them' do
performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
describe '#shutdown' do
it 'shuts down the theadpools' do
scheduler = described_class.new(performer)

sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }

if RUN_JOBS.size != number_of_jobs
jobs = THREAD_JOBS.values.flatten

jobs_tally = jobs.each_with_object(Hash.new(0)) do |job_id, hash|
hash[job_id] += 1
end

rerun_jobs = jobs_tally.select { |_key, value| value > 1 }

rerun_jobs.each do |job_id, tally|
rerun_threads = THREAD_JOBS.select { |_thread, thread_jobs| thread_jobs.include? job_id }.keys

puts "Ran job id #{job_id} for #{tally} times on threads #{rerun_threads}"
end
end

scheduler.shutdown

expect(GoodJob::Job.count).to eq(0), -> { "Unworked jobs are #{GoodJob::Job.all.map(&:id)}" }
expect(rerun_jobs).to be_nil
expect(RUN_JOBS.size).to eq number_of_jobs
expect(scheduler.instance_variable_get(:@timer).running?).to be false
expect(scheduler.instance_variable_get(:@pool).running?).to be false
end
end

context 'when job has errors' do
let!(:jobs) { ErrorJob.perform_later }

it "handles and retries jobs with errors" do
performer = GoodJob::Performer.new(GoodJob::Job.all, :perform_with_advisory_lock)
describe '#restart' do
it 'restarts the threadpools' do
scheduler = described_class.new(performer)
scheduler.shutdown

sleep_until(max: 5, increments_of: 0.5) { GoodJob::Job.count == 0 }
scheduler.restart

scheduler.shutdown
expect(scheduler.instance_variable_get(:@timer).running?).to be true
expect(scheduler.instance_variable_get(:@pool).running?).to be true
end
end
end
Loading