From 2058930213b6d987542dc19a1bbd65878066dcaa Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Sun, 15 Aug 2021 20:47:10 -0700 Subject: [PATCH] Track if a GoodJob::Job has been subsequently retried --- .../migrations/create_good_jobs.rb.erb | 1 + ...03_add_retried_good_job_id_to_good_jobs.rb | 14 +++ lib/good_job/current_execution.rb | 20 +++-- lib/good_job/job.rb | 85 +++++++++++-------- spec/lib/good_job/current_execution_spec.rb | 14 ++- spec/lib/good_job/job_spec.rb | 26 +++++- spec/support/reset_good_job.rb | 1 + ...35_add_retried_good_job_id_to_good_jobs.rb | 14 +++ spec/test_app/db/schema.rb | 3 +- 9 files changed, 126 insertions(+), 52 deletions(-) create mode 100644 lib/generators/good_job/templates/update/migrations/03_add_retried_good_job_id_to_good_jobs.rb create mode 100644 spec/test_app/db/migrate/20210815160735_add_retried_good_job_id_to_good_jobs.rb diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index 6b988b2ca..82b0fe1a7 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -16,6 +16,7 @@ class CreateGoodJobs < ActiveRecord::Migration[5.2] t.uuid :active_job_id t.text :concurrency_key t.text :cron_key + t.uuid :retried_good_job_id end add_index :good_jobs, :scheduled_at, where: "(finished_at IS NULL)", name: "index_good_jobs_on_scheduled_at" diff --git a/lib/generators/good_job/templates/update/migrations/03_add_retried_good_job_id_to_good_jobs.rb b/lib/generators/good_job/templates/update/migrations/03_add_retried_good_job_id_to_good_jobs.rb new file mode 100644 index 000000000..3d7f4cca4 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/03_add_retried_good_job_id_to_good_jobs.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true +class AddRetriedGoodJobIdToGoodJobs < ActiveRecord::Migration[5.2] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_jobs, :retried_good_job_id) + end + end + + add_column :good_jobs, :retried_good_job_id, :uuid + end +end diff --git a/lib/good_job/current_execution.rb b/lib/good_job/current_execution.rb index 7a40d9c1d..3237d0ac5 100644 --- a/lib/good_job/current_execution.rb +++ b/lib/good_job/current_execution.rb @@ -5,12 +5,6 @@ module GoodJob # Thread-local attributes for passing values from Instrumentation. # (Cannot use ActiveSupport::CurrentAttributes because ActiveJob resets it) module CurrentExecution - # @!attribute [rw] active_job_id - # @!scope class - # ActiveJob ID - # @return [String, nil] - thread_mattr_accessor :active_job_id - # @!attribute [rw] cron_key # @!scope class # Cron Key @@ -29,14 +23,26 @@ module CurrentExecution # @return [Exception, nil] thread_mattr_accessor :error_on_retry + # @!attribute [rw] good_job + # @!scope class + # Cron Key + # @return [GoodJob::Job, nil] + thread_mattr_accessor :good_job + # Resets attributes # @return [void] def self.reset - self.active_job_id = nil + self.cron_key = nil + self.good_job = nil self.error_on_discard = nil self.error_on_retry = nil end + # @return [String] UUID of the currently executing GoodJob::Job + def self.active_job_id + good_job&.active_job_id + end + # @return [Integer] Current process ID def self.process_id Process.pid diff --git a/lib/good_job/job.rb b/lib/good_job/job.rb index 12dd5983f..f2a2d101f 100644 --- a/lib/good_job/job.rb +++ b/lib/good_job/job.rb @@ -52,6 +52,20 @@ def self.queue_parser(string) end end + def self._migration_pending_warning + ActiveSupport::Deprecation.warn(<<~DEPRECATION) + GoodJob has pending database migrations. To create the migration files, run: + + rails generate good_job:update + + To apply the migration files, run: + + rails db:migrate + + DEPRECATION + nil + end + # Get Jobs with given class name # @!method with_job_class # @!scope class @@ -110,6 +124,18 @@ def self.queue_parser(string) # @return [ActiveRecord::Relation] scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) } + # Get Jobs that do not have subsequent retries + # @!method running + # @!scope class + # @return [ActiveRecord::Relation] + scope :head, -> { where(retried_good_job_id: nil) } + + # Get Jobs have errored that will not be retried further + # @!method running + # @!scope class + # @return [ActiveRecord::Relation] + scope :dead, -> { head.where.not(error: nil) } + # Get Jobs on queues that match the given queue string. # @!method queue_string(string) # @!scope class @@ -199,7 +225,6 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| good_job_args = { - cron_key: CurrentExecution.cron_key, queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, @@ -210,31 +235,23 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false if column_names.include?('active_job_id') good_job_args[:active_job_id] = active_job.job_id else - ActiveSupport::Deprecation.warn(<<~DEPRECATION) - GoodJob has pending database migrations. To create the migration files, run: - - rails generate good_job:update - - To apply the migration files, run: - - rails db:migrate - - DEPRECATION + _migration_pending_warning end if column_names.include?('concurrency_key') good_job_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) else - ActiveSupport::Deprecation.warn(<<~DEPRECATION) - GoodJob has pending database migrations. To create the migration files, run: - - rails generate good_job:update - - To apply the migration files, run: - - rails db:migrate + _migration_pending_warning + end - DEPRECATION + if column_names.include?('cron_key') + if CurrentExecution.cron_key + good_job_args[:cron_key] = CurrentExecution.cron_key + elsif CurrentExecution.active_job_id == active_job.job_id + good_job_args[:cron_key] = CurrentExecution.good_job.cron_key + end + else + _migration_pending_warning end good_job = GoodJob::Job.new(**good_job_args) @@ -244,6 +261,12 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false good_job.save! active_job.provider_job_id = good_job.id + if column_names.include?('retried_good_job_id') + CurrentExecution.good_job.retried_good_job_id = good_job.id if CurrentExecution.good_job && CurrentExecution.good_job.active_job_id == active_job.job_id + else + _migration_pending_warning + end + good_job end end @@ -283,24 +306,19 @@ def executable? end def active_job_id - super || serialized_params['job_id'] + if self.class.column_names.include?('active_job_id') + super + else + self.class._migration_pending_warning + serialized_params['job_id'] + end end def cron_key if self.class.column_names.include?('cron_key') super else - ActiveSupport::Deprecation.warn(<<~DEPRECATION) - GoodJob has pending database migrations. To create the migration files, run: - - rails generate good_job:update - - To apply the migration files, run: - - rails db:migrate - - DEPRECATION - + self.class._migration_pending_warning nil end end @@ -314,8 +332,7 @@ def execute ) GoodJob::CurrentExecution.reset - GoodJob::CurrentExecution.active_job_id = active_job_id - GoodJob::CurrentExecution.cron_key = cron_key + GoodJob::CurrentExecution.good_job = self ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do value = ActiveJob::Base.execute(params) diff --git a/spec/lib/good_job/current_execution_spec.rb b/spec/lib/good_job/current_execution_spec.rb index 061d0401f..fa5f01746 100644 --- a/spec/lib/good_job/current_execution_spec.rb +++ b/spec/lib/good_job/current_execution_spec.rb @@ -55,17 +55,13 @@ end describe '.active_job_id' do - it 'is assignable, thread-safe, and resettable' do - described_class.active_job_id = 'duck' + let!(:good_job) { GoodJob::Job.create! active_job_id: SecureRandom.uuid } - Thread.new do - described_class.active_job_id = 'bear' - end.join + it 'delegates to good_job' do + expect(described_class.active_job_id).to be_nil - expect(described_class.active_job_id).to eq 'duck' - - described_class.reset - expect(described_class.active_job_id).to eq nil + described_class.good_job = good_job + expect(described_class.active_job_id).to eq good_job.active_job_id end end end diff --git a/spec/lib/good_job/job_spec.rb b/spec/lib/good_job/job_spec.rb index 810221bb9..74d8e1b27 100644 --- a/spec/lib/good_job/job_spec.rb +++ b/spec/lib/good_job/job_spec.rb @@ -217,7 +217,12 @@ def perform(result_value = nil, raise_error: false) context 'when there is an error' do let(:active_job) { TestJob.new("whoops", raise_error: true) } - let!(:good_job) { described_class.enqueue(active_job) } + let!(:good_job) do + GoodJob::CurrentExecution.cron_key = "test_key" + good_job = described_class.enqueue(active_job) + GoodJob::CurrentExecution.cron_key = nil + good_job + end it 'returns the error' do result = good_job.perform @@ -226,6 +231,25 @@ def perform(result_value = nil, raise_error: false) expect(result.unhandled_error).to be_an_instance_of TestJob::ExpectedError end + context 'when there is a retry handler' do + before do + ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline) + allow(GoodJob).to receive(:preserve_job_records).and_return(true) + TestJob.retry_on(TestJob::ExpectedError, attempts: 2) + end + + it 'copies job info, including the cron key to the new record' do + new_record = described_class.order(created_at: :asc).last + expect(new_record.active_job_id).to eq good_job.active_job_id + expect(new_record.cron_key).to eq "test_key" + end + + it 'records the new job UUID on the executing record' do + good_job.perform + expect(good_job.reload.retried_good_job_id).to be_present + end + end + context 'when there is an retry handler with exhausted attempts' do before do TestJob.retry_on(TestJob::ExpectedError, attempts: 1) diff --git a/spec/support/reset_good_job.rb b/spec/support/reset_good_job.rb index 407352a2a..a107603c2 100644 --- a/spec/support/reset_good_job.rb +++ b/spec/support/reset_good_job.rb @@ -3,6 +3,7 @@ RSpec.configure do |config| config.before do + GoodJob::CurrentExecution.reset GoodJob.preserve_job_records = false PgLock.advisory_lock.owns.all?(&:unlock) if PgLock.advisory_lock.owns.count > 0 diff --git a/spec/test_app/db/migrate/20210815160735_add_retried_good_job_id_to_good_jobs.rb b/spec/test_app/db/migrate/20210815160735_add_retried_good_job_id_to_good_jobs.rb new file mode 100644 index 000000000..3d7f4cca4 --- /dev/null +++ b/spec/test_app/db/migrate/20210815160735_add_retried_good_job_id_to_good_jobs.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true +class AddRetriedGoodJobIdToGoodJobs < ActiveRecord::Migration[5.2] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_jobs, :retried_good_job_id) + end + end + + add_column :good_jobs, :retried_good_job_id, :uuid + end +end diff --git a/spec/test_app/db/schema.rb b/spec/test_app/db/schema.rb index 8b394e6c8..840f3f947 100644 --- a/spec/test_app/db/schema.rb +++ b/spec/test_app/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2021_06_23_192051) do +ActiveRecord::Schema.define(version: 2021_08_15_160735) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" @@ -29,6 +29,7 @@ t.uuid "active_job_id" t.text "concurrency_key" t.text "cron_key" + t.uuid "retried_good_job_id" t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at" t.index ["concurrency_key"], name: "index_good_jobs_on_concurrency_key_when_unfinished", where: "(finished_at IS NULL)" t.index ["cron_key", "created_at"], name: "index_good_jobs_on_cron_key_and_created_at"