Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track if a GoodJob::Job has been subsequently retried #331

Merged
merged 1 commit into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class CreateGoodJobs < ActiveRecord::Migration[5.2]
t.uuid :active_job_id
t.text :concurrency_key
t.text :cron_key
t.uuid :retried_good_job_id
end

add_index :good_jobs, :scheduled_at, where: "(finished_at IS NULL)", name: "index_good_jobs_on_scheduled_at"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true
class AddRetriedGoodJobIdToGoodJobs < ActiveRecord::Migration[5.2]
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_jobs, :retried_good_job_id)
end
end

add_column :good_jobs, :retried_good_job_id, :uuid
end
end
20 changes: 13 additions & 7 deletions lib/good_job/current_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ module GoodJob
# Thread-local attributes for passing values from Instrumentation.
# (Cannot use ActiveSupport::CurrentAttributes because ActiveJob resets it)
module CurrentExecution
# @!attribute [rw] active_job_id
# @!scope class
# ActiveJob ID
# @return [String, nil]
thread_mattr_accessor :active_job_id

# @!attribute [rw] cron_key
# @!scope class
# Cron Key
Expand All @@ -29,14 +23,26 @@ module CurrentExecution
# @return [Exception, nil]
thread_mattr_accessor :error_on_retry

# @!attribute [rw] good_job
# @!scope class
# Cron Key
# @return [GoodJob::Job, nil]
thread_mattr_accessor :good_job

# Resets attributes
# @return [void]
def self.reset
self.active_job_id = nil
self.cron_key = nil
self.good_job = nil
self.error_on_discard = nil
self.error_on_retry = nil
end

# @return [String] UUID of the currently executing GoodJob::Job
def self.active_job_id
good_job&.active_job_id
end

# @return [Integer] Current process ID
def self.process_id
Process.pid
Expand Down
85 changes: 51 additions & 34 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ def self.queue_parser(string)
end
end

def self._migration_pending_warning
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
GoodJob has pending database migrations. To create the migration files, run:

rails generate good_job:update

To apply the migration files, run:

rails db:migrate

DEPRECATION
nil
end

# Get Jobs with given class name
# @!method with_job_class
# @!scope class
Expand Down Expand Up @@ -110,6 +124,18 @@ def self.queue_parser(string)
# @return [ActiveRecord::Relation]
scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) }

# Get Jobs that do not have subsequent retries
# @!method running
# @!scope class
# @return [ActiveRecord::Relation]
scope :head, -> { where(retried_good_job_id: nil) }

# Get Jobs have errored that will not be retried further
# @!method running
# @!scope class
# @return [ActiveRecord::Relation]
scope :dead, -> { head.where.not(error: nil) }

# Get Jobs on queues that match the given queue string.
# @!method queue_string(string)
# @!scope class
Expand Down Expand Up @@ -199,7 +225,6 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
good_job_args = {
cron_key: CurrentExecution.cron_key,
queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
priority: active_job.priority || DEFAULT_PRIORITY,
serialized_params: active_job.serialize,
Expand All @@ -210,31 +235,23 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
if column_names.include?('active_job_id')
good_job_args[:active_job_id] = active_job.job_id
else
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
GoodJob has pending database migrations. To create the migration files, run:

rails generate good_job:update

To apply the migration files, run:

rails db:migrate

DEPRECATION
_migration_pending_warning
end

if column_names.include?('concurrency_key')
good_job_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)
else
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
GoodJob has pending database migrations. To create the migration files, run:

rails generate good_job:update

To apply the migration files, run:

rails db:migrate
_migration_pending_warning
end

DEPRECATION
if column_names.include?('cron_key')
if CurrentExecution.cron_key
good_job_args[:cron_key] = CurrentExecution.cron_key
elsif CurrentExecution.active_job_id == active_job.job_id
good_job_args[:cron_key] = CurrentExecution.good_job.cron_key
end
else
_migration_pending_warning
end

good_job = GoodJob::Job.new(**good_job_args)
Expand All @@ -244,6 +261,12 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
good_job.save!
active_job.provider_job_id = good_job.id

if column_names.include?('retried_good_job_id')
CurrentExecution.good_job.retried_good_job_id = good_job.id if CurrentExecution.good_job && CurrentExecution.good_job.active_job_id == active_job.job_id
else
_migration_pending_warning
end

good_job
end
end
Expand Down Expand Up @@ -283,24 +306,19 @@ def executable?
end

def active_job_id
super || serialized_params['job_id']
if self.class.column_names.include?('active_job_id')
super
else
self.class._migration_pending_warning
serialized_params['job_id']
end
end

def cron_key
if self.class.column_names.include?('cron_key')
super
else
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
GoodJob has pending database migrations. To create the migration files, run:

rails generate good_job:update

To apply the migration files, run:

rails db:migrate

DEPRECATION

self.class._migration_pending_warning
nil
end
end
Expand All @@ -314,8 +332,7 @@ def execute
)

GoodJob::CurrentExecution.reset
GoodJob::CurrentExecution.active_job_id = active_job_id
GoodJob::CurrentExecution.cron_key = cron_key
GoodJob::CurrentExecution.good_job = self
ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do
value = ActiveJob::Base.execute(params)

Expand Down
14 changes: 5 additions & 9 deletions spec/lib/good_job/current_execution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,13 @@
end

describe '.active_job_id' do
it 'is assignable, thread-safe, and resettable' do
described_class.active_job_id = 'duck'
let!(:good_job) { GoodJob::Job.create! active_job_id: SecureRandom.uuid }

Thread.new do
described_class.active_job_id = 'bear'
end.join
it 'delegates to good_job' do
expect(described_class.active_job_id).to be_nil

expect(described_class.active_job_id).to eq 'duck'

described_class.reset
expect(described_class.active_job_id).to eq nil
described_class.good_job = good_job
expect(described_class.active_job_id).to eq good_job.active_job_id
end
end
end
26 changes: 25 additions & 1 deletion spec/lib/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,12 @@ def perform(result_value = nil, raise_error: false)

context 'when there is an error' do
let(:active_job) { TestJob.new("whoops", raise_error: true) }
let!(:good_job) { described_class.enqueue(active_job) }
let!(:good_job) do
GoodJob::CurrentExecution.cron_key = "test_key"
good_job = described_class.enqueue(active_job)
GoodJob::CurrentExecution.cron_key = nil
good_job
end

it 'returns the error' do
result = good_job.perform
Expand All @@ -226,6 +231,25 @@ def perform(result_value = nil, raise_error: false)
expect(result.unhandled_error).to be_an_instance_of TestJob::ExpectedError
end

context 'when there is a retry handler' do
before do
ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)
allow(GoodJob).to receive(:preserve_job_records).and_return(true)
TestJob.retry_on(TestJob::ExpectedError, attempts: 2)
end

it 'copies job info, including the cron key to the new record' do
new_record = described_class.order(created_at: :asc).last
expect(new_record.active_job_id).to eq good_job.active_job_id
expect(new_record.cron_key).to eq "test_key"
end

it 'records the new job UUID on the executing record' do
good_job.perform
expect(good_job.reload.retried_good_job_id).to be_present
end
end

context 'when there is an retry handler with exhausted attempts' do
before do
TestJob.retry_on(TestJob::ExpectedError, attempts: 1)
Expand Down
1 change: 1 addition & 0 deletions spec/support/reset_good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

RSpec.configure do |config|
config.before do
GoodJob::CurrentExecution.reset
GoodJob.preserve_job_records = false

PgLock.advisory_lock.owns.all?(&:unlock) if PgLock.advisory_lock.owns.count > 0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# frozen_string_literal: true
class AddRetriedGoodJobIdToGoodJobs < ActiveRecord::Migration[5.2]
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_jobs, :retried_good_job_id)
end
end

add_column :good_jobs, :retried_good_job_id, :uuid
end
end
3 changes: 2 additions & 1 deletion spec/test_app/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2021_06_23_192051) do
ActiveRecord::Schema.define(version: 2021_08_15_160735) do

# These are extensions that must be enabled in order to support this database
enable_extension "pgcrypto"
Expand All @@ -29,6 +29,7 @@
t.uuid "active_job_id"
t.text "concurrency_key"
t.text "cron_key"
t.uuid "retried_good_job_id"
t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at"
t.index ["concurrency_key"], name: "index_good_jobs_on_concurrency_key_when_unfinished", where: "(finished_at IS NULL)"
t.index ["cron_key", "created_at"], name: "index_good_jobs_on_cron_key_and_created_at"
Expand Down