Skip to content

Commit

Permalink
Do not use advisory lock on heartbeat in production (#1451)
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon authored Aug 8, 2024
1 parent 4201534 commit a19bd1d
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ Available configuration options are:
- `inline_execution_respects_schedule` (boolean) Opt-in to future behavior of inline execution respecting scheduled jobs. Defaults to `false`.
- `logger` ([Rails Logger](https://api.rubyonrails.org/classes/ActiveSupport/Logger.html)) lets you set a custom logger for GoodJob. It should be an instance of a Rails `Logger` (Default: `Rails.logger`).
- `preserve_job_records` (boolean) keeps job records in your database even after jobs are completed. (Default: `true`)
- `advisory_lock_heartbeat` (boolean) whether to use an advisory lock for the purpose of determining whether an execeution process is active. (Default `true` in Development; `false` in other environments)
- `retry_on_unhandled_error` (boolean) causes jobs to be re-queued and retried if they raise an instance of `StandardError`. Be advised this may lead to jobs being repeated infinitely ([see below for more on retries](#retries)). Instances of `Exception`, like SIGINT, will *always* be retried, regardless of this attribute’s value. (Default: `false`)
- `on_thread_error` (proc, lambda, or callable) will be called when there is an Exception. It can be useful for logging errors to bug tracking services, like Sentry or Airbrake. Example:
Expand Down
13 changes: 12 additions & 1 deletion app/models/good_job/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def self.cleanup
end
end

def self.create_record(id:, with_advisory_lock: false)
def self.find_or_create_record(id:, with_advisory_lock: false)
attributes = {
id: id,
state: process_state,
Expand All @@ -66,6 +66,17 @@ def self.create_record(id:, with_advisory_lock: false)
attributes[:lock_type] = :advisory
end
create!(attributes)
rescue ActiveRecord::RecordNotUnique
find_by(id: id).tap do |existing_record|
next unless existing_record

if with_advisory_lock
existing_record.advisory_lock!
existing_record.update(lock_type: :advisory, state: process_state, updated_at: Time.current)
else
existing_record.update(lock_type: nil, state: process_state, updated_at: Time.current)
end
end
end

def self.process_state
Expand Down
4 changes: 2 additions & 2 deletions lib/good_job/capsule_tracker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def id_for_lock
if @record
@record.refresh_if_stale
else
@record = GoodJob::Process.create_record(id: @record_id)
@record = GoodJob::Process.find_or_create_record(id: @record_id)
create_refresh_task
end
value = @record&.id
Expand Down Expand Up @@ -89,7 +89,7 @@ def register(with_advisory_lock: false)
@advisory_locked_connection = WeakRef.new(@record.class.connection)
end
else
@record = GoodJob::Process.create_record(id: @record_id, with_advisory_lock: true)
@record = GoodJob::Process.find_or_create_record(id: @record_id, with_advisory_lock: true)
@advisory_locked_connection = WeakRef.new(@record.class.connection)
create_refresh_task
end
Expand Down
10 changes: 10 additions & 0 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,16 @@ def in_webserver?
end || false
end

# Whether to take an advisory lock on the process record in the notifier reactor.
# @return [Boolean]
def advisory_lock_heartbeat
return options[:advisory_lock_heartbeat] unless options[:advisory_lock_heartbeat].nil?
return rails_config[:advisory_lock_heartbeat] unless rails_config[:advisory_lock_heartbeat].nil?
return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_ADVISORY_LOCK_HEARTBEAT']) unless env['GOOD_JOB_ADVISORY_LOCK_HEARTBEAT'].nil?

Rails.env.development?
end

private

def rails_config
Expand Down
5 changes: 3 additions & 2 deletions lib/good_job/notifier/process_heartbeat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ module ProcessHeartbeat

# Registers the current process.
def register_process
@advisory_lock_heartbeat = GoodJob.configuration.advisory_lock_heartbeat
GoodJob::Process.override_connection(connection) do
GoodJob::Process.cleanup
@capsule.tracker.register(with_advisory_lock: true)
@capsule.tracker.register(with_advisory_lock: @advisory_lock_heartbeat)
end
end

Expand All @@ -33,7 +34,7 @@ def refresh_process
# Deregisters the current process.
def deregister_process
GoodJob::Process.override_connection(connection) do
@capsule.tracker.unregister(with_advisory_lock: true)
@capsule.tracker.unregister(with_advisory_lock: @advisory_lock_heartbeat)
end
end
end
Expand Down
17 changes: 17 additions & 0 deletions spec/app/models/good_job/process_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@
end
end

describe 'find_or_create_record' do
let(:record_id) { '67160140-1bec-4c3b-bc34-1a8b36f87b21' }

it 'creates a new record' do
record = described_class.find_or_create_record(id: record_id)
expect(record).to be_a(described_class)
end

it 'updates an existing record' do
record = described_class.find_or_create_record(id: record_id)
record.update!(updated_at: 1.day.ago)
updated_record = described_class.find_or_create_record(id: record_id)
expect(updated_record).to eq(record)
expect(updated_record.updated_at).to be_within(1.second).of(Time.current)
end
end

describe '#basename' do
let(:process) { described_class.new state: {} }

Expand Down
31 changes: 31 additions & 0 deletions spec/lib/good_job/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -318,4 +318,35 @@
expect(configuration.dashboard_live_poll_enabled).to eq true
end
end

describe '#advisory_lock_heartbeat' do
it 'defaults to true in development' do
allow(Rails).to receive(:env) { "development".inquiry }
configuration = described_class.new({})
expect(configuration.advisory_lock_heartbeat).to be true
end

it 'defaults to false in other environments' do
allow(Rails).to receive(:env) { "production".inquiry }
configuration = described_class.new({})
expect(configuration.advisory_lock_heartbeat).to be false
end

it 'can be overridden by options' do
configuration = described_class.new({ advisory_lock_heartbeat: true })
expect(configuration.advisory_lock_heartbeat).to be true
end

it 'can be overridden by rails config' do
allow(Rails.application.config).to receive(:good_job).and_return({ advisory_lock_heartbeat: true })
configuration = described_class.new({})
expect(configuration.advisory_lock_heartbeat).to be true
end

it 'can be overridden by environment variable' do
stub_const 'ENV', ENV.to_hash.merge({ 'GOOD_JOB_ADVISORY_LOCK_HEARTBEAT' => 'true' })
configuration = described_class.new({})
expect(configuration.advisory_lock_heartbeat).to be true
end
end
end
28 changes: 25 additions & 3 deletions spec/lib/good_job/notifier_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

expect(notifier.connected?(timeout: 5)).to be true
expect(notifier.listening?(timeout: 1)).to be false

sleep 1
notifier.shutdown

Expand All @@ -85,6 +86,7 @@
expect(notifier).to be_listening(timeout: 2)
described_class.notify(true)

wait_until { expect(GoodJob.capsule.tracker.id_for_lock).to be_present }
wait_until(max: 5) { expect(refreshes.value).to be > 0 }

notifier.shutdown
Expand Down Expand Up @@ -195,14 +197,34 @@
it 'creates and destroys a new Process record' do
notifier = described_class.new(enable_listening: true)

wait_until { expect(GoodJob::Process.count).to eq 1 }
wait_until { expect(GoodJob.capsule.tracker.locks).to eq 1 }

# Process record won't be created until the first lock is acquired when not advisory locked
id_for_lock = GoodJob.capsule.tracker.id_for_lock
process = GoodJob::Process.first
expect(process.id).to eq GoodJob.capsule.tracker.id_for_lock
expect(process).to be_advisory_locked
expect(process.id).to eq id_for_lock
expect(process).not_to be_advisory_locked

notifier.shutdown
expect { process.reload }.to raise_error ActiveRecord::RecordNotFound
end

context 'when advisory_lock_heartbeat is true' do
before do
allow(GoodJob.configuration).to receive(:advisory_lock_heartbeat).and_return(true)
end

it 'takes an advisory lock on the process record' do
notifier = described_class.new(enable_listening: true)

wait_until { expect(GoodJob::Process.count).to eq 1 }

process = GoodJob::Process.first
expect(process.id).to eq GoodJob.capsule.tracker.id_for_lock

notifier.shutdown
expect { process.reload }.to raise_error ActiveRecord::RecordNotFound
end
end
end
end

0 comments on commit a19bd1d

Please sign in to comment.