From d934264042f25a08b3cd71a56ba3791bb09ba2b9 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Tue, 16 Jul 2024 09:57:21 -0700 Subject: [PATCH] Replace references to "Discrete" executions with simply Executions; deprecate `GoodJob::DiscreteExecution` (#1427) --- .../good_job/performance_index_chart.rb | 2 +- app/charts/good_job/performance_show_chart.rb | 2 +- .../good_job/performance_controller.rb | 2 +- app/models/good_job/base_execution.rb | 605 ------------------ app/models/good_job/discrete_execution.rb | 65 +- app/models/good_job/execution.rb | 63 +- app/models/good_job/job.rb | 574 ++++++++++++++++- lib/good_job.rb | 10 +- .../active_job_extensions/concurrency.rb | 6 +- spec/app/jobs/example_job_spec.rb | 18 +- spec/app/models/good_job/job_spec.rb | 44 +- spec/integration/adapter_spec.rb | 2 +- spec/integration/batch_spec.rb | 6 +- spec/integration/capsule_spec.rb | 2 +- spec/integration/jobs_spec.rb | 14 +- spec/integration/scheduler_spec.rb | 2 +- .../active_job_extensions/concurrency_spec.rb | 6 +- .../interrupt_errors_spec.rb | 62 +- .../active_job_extensions/labels_spec.rb | 2 +- .../notify_options_spec.rb | 2 +- spec/lib/good_job_spec.rb | 8 +- .../requests/good_job/jobs_controller_spec.rb | 4 +- spec/support/example_app_helper.rb | 3 +- 23 files changed, 727 insertions(+), 777 deletions(-) delete mode 100644 app/models/good_job/base_execution.rb diff --git a/app/charts/good_job/performance_index_chart.rb b/app/charts/good_job/performance_index_chart.rb index a35a2ba47..df50ab9c7 100644 --- a/app/charts/good_job/performance_index_chart.rb +++ b/app/charts/good_job/performance_index_chart.rb @@ -3,7 +3,7 @@ module GoodJob class PerformanceIndexChart < BaseChart def data - table_name = GoodJob::DiscreteExecution.table_name + table_name = GoodJob::Execution.table_name sum_query = Arel.sql(GoodJob::Job.pg_or_jdbc_query(<<~SQL.squish)) SELECT * diff --git a/app/charts/good_job/performance_show_chart.rb b/app/charts/good_job/performance_show_chart.rb index 703270376..08c8f82e8 100644 --- a/app/charts/good_job/performance_show_chart.rb +++ b/app/charts/good_job/performance_show_chart.rb @@ -18,7 +18,7 @@ def initialize(job_class) end def data - table_name = GoodJob::DiscreteExecution.table_name + table_name = GoodJob::Execution.table_name interval_entries = BUCKET_INTERVALS.map { "interval '#{_1}'" }.join(",") sum_query = Arel.sql(GoodJob::Job.pg_or_jdbc_query(<<~SQL.squish)) diff --git a/app/controllers/good_job/performance_controller.rb b/app/controllers/good_job/performance_controller.rb index 2151e21a1..dd03b4990 100644 --- a/app/controllers/good_job/performance_controller.rb +++ b/app/controllers/good_job/performance_controller.rb @@ -3,7 +3,7 @@ module GoodJob class PerformanceController < ApplicationController def index - @performances = GoodJob::DiscreteExecution + @performances = GoodJob::Execution .where.not(job_class: nil) .group(:job_class) .select(" diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb deleted file mode 100644 index f387d56db..000000000 --- a/app/models/good_job/base_execution.rb +++ /dev/null @@ -1,605 +0,0 @@ -# frozen_string_literal: true - -module GoodJob - # Active Record model to share behavior between {Job} and {Execution} models - # which both read out of the same table. - class BaseExecution < BaseRecord - self.abstract_class = true - - include AdvisoryLockable - include ErrorEvents - include Filterable - include Reportable - - # Raised if something attempts to execute a previously completed Execution again. - PreviouslyPerformedError = Class.new(StandardError) - - # String separating Error Class from Error Message - ERROR_MESSAGE_SEPARATOR = ": " - - # ActiveJob jobs without a +queue_name+ attribute are placed on this queue. - DEFAULT_QUEUE_NAME = 'default' - # ActiveJob jobs without a +priority+ attribute are given this priority. - DEFAULT_PRIORITY = 0 - - self.advisory_lockable_column = 'active_job_id' - self.implicit_order_column = 'created_at' - - self.ignored_columns += ["is_discrete"] - - define_model_callbacks :perform - define_model_callbacks :perform_unlocked, only: :after - - set_callback :perform, :around, :reset_batch_values - set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch - - # Parse a string representing a group of queues into a more readable data - # structure. - # @param string [String] Queue string - # @return [Hash] - # How to match a given queue. It can have the following keys and values: - # - +{ all: true }+ indicates that all queues match. - # - +{ exclude: Array }+ indicates the listed queue names should - # not match. - # - +{ include: Array }+ indicates the listed queue names should - # match. - # - +{ include: Array, ordered_queues: true }+ indicates the listed - # queue names should match, and dequeue should respect queue order. - # @example - # GoodJob::Execution.queue_parser('-queue1,queue2') - # => { exclude: [ 'queue1', 'queue2' ] } - def self.queue_parser(string) - string = string.strip.presence || '*' - - case string.first - when '-' - exclude_queues = true - string = string[1..] - when '+' - ordered_queues = true - string = string[1..] - end - - queues = string.split(',').map(&:strip) - - if queues.include?('*') - { all: true } - elsif exclude_queues - { exclude: queues } - elsif ordered_queues - { - include: queues, - ordered_queues: true, - } - else - { include: queues } - end - end - - # With a given class name - # @!method job_class(name) - # @!scope class - # @param name [String] Job class name - # @return [ActiveRecord::Relation] - scope :job_class, ->(name) { where(params_job_class.eq(name)) } - - # Get jobs with given ActiveJob ID - # @!method active_job_id(active_job_id) - # @!scope class - # @param active_job_id [String] - # ActiveJob ID - # @return [ActiveRecord::Relation] - scope :active_job_id, ->(active_job_id) { where(active_job_id: active_job_id) } - - # Get jobs that have not yet finished (succeeded or discarded). - # @!method unfinished - # @!scope class - # @return [ActiveRecord::Relation] - scope :unfinished, -> { where(finished_at: nil) } - - # Get jobs that are not scheduled for a later time than now (i.e. jobs that - # are not scheduled or scheduled for earlier than the current time). - # @!method only_scheduled - # @!scope class - # @return [ActiveRecord::Relation] - scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) } - - # Order jobs by priority (highest priority first). - # @!method priority_ordered - # @!scope class - # @return [ActiveRecord::Relation] - scope :priority_ordered, (lambda do - if GoodJob.configuration.smaller_number_is_higher_priority - order('priority ASC NULLS LAST') - else - order('priority DESC NULLS LAST') - end - end) - - # Order jobs by created_at, for first-in first-out - # @!method creation_ordered - # @!scope class - # @return [ActiveRecord:Relation] - scope :creation_ordered, -> { order(created_at: :asc) } - - # Order jobs for de-queueing - # @!method dequeueing_ordered(parsed_queues) - # @!scope class - # @param parsed_queues [Hash] - # optional output of .queue_parser, parsed queues, will be used for - # ordered queues. - # @return [ActiveRecord::Relation] - scope :dequeueing_ordered, (lambda do |parsed_queues| - relation = self - relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include] - relation = relation.priority_ordered.creation_ordered - - relation - end) - - # Order jobs in order of queues in array param - # @!method queue_ordered(queues) - # @!scope class - # @param queues [Array { order(coalesce_scheduled_at_created_at.asc) } - - # Get completed jobs before the given timestamp. If no timestamp is - # provided, get *all* completed jobs. By default, GoodJob - # destroys jobs after they're completed, meaning this returns no jobs. - # However, if you have changed {GoodJob.preserve_job_records}, this may - # find completed Jobs. - # @!method finished(timestamp = nil) - # @!scope class - # @param timestamp (Float) - # Get jobs that finished before this time (in epoch time). - # @return [ActiveRecord::Relation] - scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(bind_value('finished_at', timestamp, ActiveRecord::Type::DateTime))) : where.not(finished_at: nil) } - - # Get Jobs that started but not finished yet. - # @!method running - # @!scope class - # @return [ActiveRecord::Relation] - scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) } - - # Get Jobs on queues that match the given queue string. - # @!method queue_string(string) - # @!scope class - # @param string [String] - # A string expression describing what queues to select. See - # {Job.queue_parser} or - # {file:README.md#optimize-queues-threads-and-processes} for more details - # on the format of the string. Note this only handles individual - # semicolon-separated segments of that string format. - # @return [ActiveRecord::Relation] - scope :queue_string, (lambda do |string| - parsed = queue_parser(string) - - if parsed[:all] - all - elsif parsed[:exclude] - where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) - elsif parsed[:include] - where(queue_name: parsed[:include]) - end - end) - - class << self - def json_string(json, attr) - Arel::Nodes::Grouping.new(Arel::Nodes::InfixOperation.new('->>', json, Arel::Nodes.build_quoted(attr))) - end - - def params_job_class - json_string(arel_table['serialized_params'], 'job_class') - end - - def params_execution_count - Arel::Nodes::InfixOperation.new( - '::', - json_string(arel_table['serialized_params'], 'executions'), - Arel.sql('integer') - ) - end - - def coalesce_scheduled_at_created_at - arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at']) - end - end - - # Build an ActiveJob instance and deserialize the arguments, using `#active_job_data`. - # - # @param ignore_deserialization_errors [Boolean] - # Whether to ignore ActiveJob::DeserializationError and NameError when deserializing the arguments. - # This is most useful if you aren't planning to use the arguments directly. - def active_job(ignore_deserialization_errors: false) - ActiveJob::Base.deserialize(active_job_data).tap do |aj| - aj.send(:deserialize_arguments_if_needed) - rescue ActiveJob::DeserializationError - raise unless ignore_deserialization_errors - end - rescue NameError - raise unless ignore_deserialization_errors - end - - def self.build_for_enqueue(active_job, scheduled_at: nil) - new(**enqueue_args(active_job, scheduled_at: scheduled_at)) - end - - # Construct arguments for GoodJob::Execution from an ActiveJob instance. - def self.enqueue_args(active_job, scheduled_at: nil) - execution_args = { - id: active_job.job_id, - active_job_id: active_job.job_id, - job_class: active_job.class.name, - queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, - priority: active_job.priority || DEFAULT_PRIORITY, - serialized_params: active_job.serialize, - created_at: Time.current, - } - - execution_args[:scheduled_at] = if scheduled_at - scheduled_at - elsif active_job.scheduled_at - Time.zone.at(active_job.scheduled_at) - else - execution_args[:created_at] - end - - execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) - - if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? - labels = active_job.good_job_labels.dup - labels.map! { |label| label.to_s.strip.presence } - labels.tap(&:compact!).tap(&:uniq!) - execution_args[:labels] = labels - end - - reenqueued_current_job = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id - current_job = CurrentThread.job - - if reenqueued_current_job - execution_args[:batch_id] = current_job.batch_id - execution_args[:batch_callback_id] = current_job.batch_callback_id - execution_args[:cron_key] = current_job.cron_key - else - execution_args[:batch_id] = GoodJob::Batch.current_batch_id - execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id - execution_args[:cron_key] = CurrentThread.cron_key - execution_args[:cron_at] = CurrentThread.cron_at - end - - execution_args - end - - # Finds the next eligible Execution, acquire an advisory lock related to it, and - # executes the job. - # @yield [Execution, nil] The next eligible Execution, or +nil+ if none found, before it is performed. - # @return [ExecutionResult, nil] - # If a job was executed, returns an array with the {Execution} record, the - # return value for the job's +#perform+ method, and the exception the job - # raised, if any (if the job raised, then the second array entry will be - # +nil+). If there were no jobs to execute, returns +nil+. - def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) - job = nil - result = nil - - unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs| - job = jobs.first - - if job&.executable? - yield(job) if block_given? - - result = job.perform(lock_id: lock_id) - else - job = nil - yield(nil) if block_given? - end - end - - job&.run_callbacks(:perform_unlocked) - result - end - - # Fetches the scheduled execution time of the next eligible Execution(s). - # @param after [DateTime] - # @param limit [Integer] - # @param now_limit [Integer, nil] - # @return [Array] - def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) - query = advisory_unlocked.unfinished.schedule_ordered - - after ||= Time.current - after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime) - after_query = query.where(arel_table['scheduled_at'].gt(after_bind)).or query.where(scheduled_at: nil).where(arel_table['created_at'].gt(after_bind)) - after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } - - if now_limit&.positive? - now_query = query.where(arel_table['scheduled_at'].lt(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))).or query.where(scheduled_at: nil) - now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } - end - - Array(now_at) + after_at - end - - # Places an ActiveJob job on a queue by creating a new {Execution} record. - # @param active_job [ActiveJob::Base] - # The job to enqueue. - # @param scheduled_at [Float] - # Epoch timestamp when the job should be executed, if blank will delegate to the ActiveJob instance - # @param create_with_advisory_lock [Boolean] - # Whether to establish a lock on the {Execution} record after it is created. - # @return [Execution] - # The new {Execution} instance representing the queued ActiveJob job. - def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) - ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| - current_job = CurrentThread.job - - retried = current_job && current_job.active_job_id == active_job.job_id - if retried - job = current_job - job.assign_attributes(enqueue_args(active_job, scheduled_at: scheduled_at)) - job.scheduled_at ||= Time.current - # TODO: these values ideally shouldn't be persisted until the current_job is finished - # which will require handling `retry_job` being called from outside the job context. - job.performed_at = nil - job.finished_at = nil - else - job = build_for_enqueue(active_job, scheduled_at: scheduled_at) - end - - if create_with_advisory_lock - if job.persisted? - job.advisory_lock - else - job.create_with_advisory_lock = true - end - end - - instrument_payload[:job] = job - job.save! - - CurrentThread.retried_job = job if retried - - active_job.provider_job_id = job.id - raise "These should be equal" if active_job.provider_job_id != active_job.job_id - - job - end - end - - def self.format_error(error) - raise ArgumentError unless error.is_a?(Exception) - - [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join - end - - # Execute the ActiveJob job this {Execution} represents. - # @return [ExecutionResult] - # An array of the return value of the job's +#perform+ method and the - # exception raised by the job, if any. If the job completed successfully, - # the second array entry (the exception) will be +nil+ and vice versa. - def perform(lock_id:) - run_callbacks(:perform) do - raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at - - job_performed_at = Time.current - monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - discrete_execution = nil - result = GoodJob::CurrentThread.within do |current_thread| - current_thread.reset - current_thread.job = self - - existing_performed_at = performed_at - if existing_performed_at - current_thread.execution_interrupted = existing_performed_at - - interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'")) - self.error = interrupt_error_string - self.error_event = :interrupted - monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds - - discrete_execution_attrs = { - error: interrupt_error_string, - finished_at: job_performed_at, - error_event: :interrupted, - duration: monotonic_duration, - } - discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all(discrete_execution_attrs) # rubocop:disable Rails/SkipsModelValidations - end - - transaction do - discrete_execution_attrs = { - job_class: job_class, - queue_name: queue_name, - serialized_params: serialized_params, - scheduled_at: (scheduled_at || created_at), - created_at: job_performed_at, - process_id: lock_id, - } - job_attrs = { - performed_at: job_performed_at, - executions_count: ((executions_count || 0) + 1), - locked_by_id: lock_id, - locked_at: Time.current, - } - - discrete_execution = discrete_executions.create!(discrete_execution_attrs) - update!(job_attrs) - end - - ActiveSupport::Notifications.instrument("perform_job.good_job", { job: self, execution: discrete_execution, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| - value = ActiveJob::Base.execute(active_job_data) - - if value.is_a?(Exception) - handled_error = value - value = nil - end - handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard - - error_event = if handled_error == current_thread.error_on_discard - :discarded - elsif handled_error == current_thread.error_on_retry - :retried - elsif handled_error == current_thread.error_on_retry_stopped - :retry_stopped - elsif handled_error - :handled - end - - instrument_payload.merge!( - value: value, - handled_error: handled_error, - retried: current_thread.retried_job.present?, - error_event: error_event - ) - ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried_job: current_thread.retried_job) - rescue StandardError => e - error_event = if e.is_a?(GoodJob::InterruptError) - :interrupted - elsif e == current_thread.error_on_retry_stopped - :retry_stopped - else - :unhandled - end - - instrument_payload[:unhandled_error] = e - ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event) - end - end - - job_attributes = { locked_by_id: nil, locked_at: nil } - - job_error = result.handled_error || result.unhandled_error - if job_error - error_string = self.class.format_error(job_error) - - job_attributes[:error] = error_string - job_attributes[:error_event] = result.error_event - - discrete_execution.error = error_string - discrete_execution.error_event = result.error_event - discrete_execution.error_backtrace = job_error.backtrace - else - job_attributes[:error] = nil - job_attributes[:error_event] = nil - end - - job_finished_at = Time.current - monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds - job_attributes[:finished_at] = job_finished_at - - discrete_execution.finished_at = job_finished_at - discrete_execution.duration = monotonic_duration - - retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error - reenqueued = result.retried? || retried_good_job_id.present? || retry_unhandled_error - if reenqueued - job_attributes[:performed_at] = nil - job_attributes[:finished_at] = nil - end - - assign_attributes(job_attributes) - preserve_unhandled = (result.unhandled_error && (GoodJob.retry_on_unhandled_error || GoodJob.preserve_job_records == :on_unhandled_error)) - if finished_at.blank? || GoodJob.preserve_job_records == true || reenqueued || preserve_unhandled || cron_key.present? - transaction do - discrete_execution.save! - save! - end - else - destroy! - end - - result - end - end - - # Tests whether this job is safe to be executed by this thread. - # @return [Boolean] - def executable? - reload.finished_at.blank? - rescue ActiveRecord::RecordNotFound - false - end - - # Return formatted serialized_params for display in the dashboard - # @return [Hash] - def display_serialized_params - serialized_params.merge({ - _good_job: attributes.except('serialized_params', 'locktype', 'owns_advisory_lock'), - }) - end - - def running? - if has_attribute?(:locktype) - self['locktype'].present? - else - advisory_locked? - end - end - - def number - serialized_params.fetch('executions', 0) + 1 - end - - # Time between when this job was expected to run and when it started running - def queue_latency - now = Time.zone.now - expected_start = scheduled_at || created_at - actual_start = performed_at || finished_at || now - - actual_start - expected_start unless expected_start >= now - end - - # Time between when this job started and finished - def runtime_latency - (finished_at || Time.zone.now) - performed_at if performed_at - end - - # Destroys this execution and all executions within the same job - def destroy_job - @_destroy_job = true - destroy! - ensure - @_destroy_job = false - end - - def job_state - state = { queue_name: queue_name } - state[:scheduled_at] = scheduled_at if scheduled_at - state - end - - private - - def reset_batch_values(&block) - GoodJob::Batch.within_thread(batch_id: nil, batch_callback_id: nil, &block) - end - - def continue_discard_or_finish_batch - batch._continue_discard_or_finish(self) if batch.present? - end - - def active_job_data - serialized_params.deep_dup - .tap do |job_data| - job_data["provider_job_id"] = id - job_data["good_job_concurrency_key"] = concurrency_key if concurrency_key - job_data["good_job_labels"] = Array(labels) if labels.present? - end - end - end -end diff --git a/app/models/good_job/discrete_execution.rb b/app/models/good_job/discrete_execution.rb index 4d951c424..242d99f23 100644 --- a/app/models/good_job/discrete_execution.rb +++ b/app/models/good_job/discrete_execution.rb @@ -1,63 +1,10 @@ # frozen_string_literal: true -module GoodJob # :nodoc: - class DiscreteExecution < BaseRecord - include ErrorEvents - - self.table_name = 'good_job_executions' - self.implicit_order_column = 'created_at' - - belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true - - scope :finished, -> { where.not(finished_at: nil) } - - alias_attribute :performed_at, :created_at - - # TODO: Remove when support for Rails 6.1 is dropped - attribute :duration, :interval if ActiveJob.version.canonical_segments.take(2) == [6, 1] - - def number - serialized_params.fetch('executions', 0) + 1 - end - - # Time between when this job was expected to run and when it started running - def queue_latency - created_at - scheduled_at - end - - # Monotonic time between when this job started and finished - def runtime_latency - duration - end - - def last_status_at - finished_at || created_at - end - - def status - if finished_at.present? - if error.present? && job.finished_at.present? - :discarded - elsif error.present? - :retried - else - :succeeded - end - else - :running - end - end - - def display_serialized_params - serialized_params.merge({ - _good_job_execution: attributes.except('serialized_params'), - }) - end - - def filtered_error_backtrace - Rails.backtrace_cleaner.clean(error_backtrace || []) - end +module GoodJob + # Deprecated, use +Execution+ instead. + class DiscreteExecution < Execution end -end -ActiveSupport.run_load_hooks(:good_job_execution, GoodJob::DiscreteExecution) + include ActiveSupport::Deprecation::DeprecatedConstantAccessor + deprecate_constant :DiscreteExecution, 'Execution', deprecator: GoodJob.deprecator +end diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index aa2142c57..8774c3ed6 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -1,8 +1,63 @@ # frozen_string_literal: true -module GoodJob - # Created at the time a Job begins executing. - # Behavior from +DiscreteExecution+ will be merged into this class. - class Execution < DiscreteExecution +module GoodJob # :nodoc: + class Execution < BaseRecord + include ErrorEvents + + self.table_name = 'good_job_executions' + self.implicit_order_column = 'created_at' + + belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'id', inverse_of: :executions, optional: true + + scope :finished, -> { where.not(finished_at: nil) } + + alias_attribute :performed_at, :created_at + + # TODO: Remove when support for Rails 6.1 is dropped + attribute :duration, :interval if ActiveJob.version.canonical_segments.take(2) == [6, 1] + + def number + serialized_params.fetch('executions', 0) + 1 + end + + # Time between when this job was expected to run and when it started running + def queue_latency + created_at - scheduled_at + end + + # Monotonic time between when this job started and finished + def runtime_latency + duration + end + + def last_status_at + finished_at || created_at + end + + def status + if finished_at.present? + if error.present? && job.finished_at.present? + :discarded + elsif error.present? + :retried + else + :succeeded + end + else + :running + end + end + + def display_serialized_params + serialized_params.merge({ + _good_job_execution: attributes.except('serialized_params'), + }) + end + + def filtered_error_backtrace + Rails.backtrace_cleaner.clean(error_backtrace || []) + end end end + +ActiveSupport.run_load_hooks(:good_job_execution, GoodJob::Execution) diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index ba006d683..cc21233ff 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -2,7 +2,23 @@ module GoodJob # Active Record model that represents an +ActiveJob+ job. - class Job < BaseExecution + class Job < BaseRecord + include AdvisoryLockable + include ErrorEvents + include Filterable + include Reportable + + # Raised if something attempts to execute a previously completed Execution again. + PreviouslyPerformedError = Class.new(StandardError) + + # String separating Error Class from Error Message + ERROR_MESSAGE_SEPARATOR = ": " + + # ActiveJob jobs without a +queue_name+ attribute are placed on this queue. + DEFAULT_QUEUE_NAME = 'default' + # ActiveJob jobs without a +priority+ attribute are given this priority. + DEFAULT_PRIORITY = 0 + # Raised when an inappropriate action is applied to a Job based on its state. ActionForStateMismatchError = Class.new(StandardError) # Raised when GoodJob is not configured as the Active Job Queue Adapter @@ -15,12 +31,17 @@ class Job < BaseExecution self.table_name = 'good_jobs' self.advisory_lockable_column = 'id' self.implicit_order_column = 'created_at' + self.ignored_columns += ["is_discrete"] + + define_model_callbacks :perform + define_model_callbacks :perform_unlocked, only: :after + + set_callback :perform, :around, :reset_batch_values + set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true belongs_to :locked_by_process, class_name: "GoodJob::Process", foreign_key: :locked_by_id, inverse_of: :locked_jobs, optional: true - has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: :active_job_id, inverse_of: :job, dependent: :delete_all - # TODO: rename callers of discrete_execution to executions, but after v4 has some time to bake for cleaner diffs/patches - has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: :active_job_id, inverse_of: :job, dependent: :delete_all + has_many :executions, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: "id", inverse_of: :job, dependent: :delete_all before_create -> { self.id = active_job_id }, if: -> { active_job_id.present? } @@ -46,6 +67,345 @@ class Job < BaseExecution # Errored but will not be retried scope :discarded, -> { finished.where.not(error: nil) } + # With a given class name + # @!method job_class(name) + # @!scope class + # @param name [String] Job class name + # @return [ActiveRecord::Relation] + scope :job_class, ->(name) { where(params_job_class.eq(name)) } + + # Get jobs with given ActiveJob ID + # @!method active_job_id(active_job_id) + # @!scope class + # @param active_job_id [String] + # ActiveJob ID + # @return [ActiveRecord::Relation] + scope :active_job_id, ->(active_job_id) { where(active_job_id: active_job_id) } + + # Get jobs that have not yet finished (succeeded or discarded). + # @!method unfinished + # @!scope class + # @return [ActiveRecord::Relation] + scope :unfinished, -> { where(finished_at: nil) } + + # Get jobs that are not scheduled for a later time than now (i.e. jobs that + # are not scheduled or scheduled for earlier than the current time). + # @!method only_scheduled + # @!scope class + # @return [ActiveRecord::Relation] + scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) } + + # Order jobs by priority (highest priority first). + # @!method priority_ordered + # @!scope class + # @return [ActiveRecord::Relation] + scope :priority_ordered, (lambda do + if GoodJob.configuration.smaller_number_is_higher_priority + order('priority ASC NULLS LAST') + else + order('priority DESC NULLS LAST') + end + end) + + # Order jobs by created_at, for first-in first-out + # @!method creation_ordered + # @!scope class + # @return [ActiveRecord:Relation] + scope :creation_ordered, -> { order(created_at: :asc) } + + # Order jobs for de-queueing + # @!method dequeueing_ordered(parsed_queues) + # @!scope class + # @param parsed_queues [Hash] + # optional output of .queue_parser, parsed queues, will be used for + # ordered queues. + # @return [ActiveRecord::Relation] + scope :dequeueing_ordered, (lambda do |parsed_queues| + relation = self + relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include] + relation = relation.priority_ordered.creation_ordered + + relation + end) + + # Order jobs in order of queues in array param + # @!method queue_ordered(queues) + # @!scope class + # @param queues [Array { order(coalesce_scheduled_at_created_at.asc) } + + # Get completed jobs before the given timestamp. If no timestamp is + # provided, get *all* completed jobs. By default, GoodJob + # destroys jobs after they're completed, meaning this returns no jobs. + # However, if you have changed {GoodJob.preserve_job_records}, this may + # find completed Jobs. + # @!method finished(timestamp = nil) + # @!scope class + # @param timestamp (Float) + # Get jobs that finished before this time (in epoch time). + # @return [ActiveRecord::Relation] + scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(bind_value('finished_at', timestamp, ActiveRecord::Type::DateTime))) : where.not(finished_at: nil) } + + # Get Jobs that started but not finished yet. + # @!method running + # @!scope class + # @return [ActiveRecord::Relation] + scope :running, -> { where.not(performed_at: nil).where(finished_at: nil) } + + # Get Jobs on queues that match the given queue string. + # @!method queue_string(string) + # @!scope class + # @param string [String] + # A string expression describing what queues to select. See + # {Job.queue_parser} or + # {file:README.md#optimize-queues-threads-and-processes} for more details + # on the format of the string. Note this only handles individual + # semicolon-separated segments of that string format. + # @return [ActiveRecord::Relation] + scope :queue_string, (lambda do |string| + parsed = queue_parser(string) + + if parsed[:all] + all + elsif parsed[:exclude] + where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) + elsif parsed[:include] + where(queue_name: parsed[:include]) + end + end) + + class << self + # Parse a string representing a group of queues into a more readable data + # structure. + # @param string [String] Queue string + # @return [Hash] + # How to match a given queue. It can have the following keys and values: + # - +{ all: true }+ indicates that all queues match. + # - +{ exclude: Array }+ indicates the listed queue names should + # not match. + # - +{ include: Array }+ indicates the listed queue names should + # match. + # - +{ include: Array, ordered_queues: true }+ indicates the listed + # queue names should match, and dequeue should respect queue order. + # @example + # GoodJob::Execution.queue_parser('-queue1,queue2') + # => { exclude: [ 'queue1', 'queue2' ] } + def queue_parser(string) + string = string.strip.presence || '*' + + case string.first + when '-' + exclude_queues = true + string = string[1..] + when '+' + ordered_queues = true + string = string[1..] + end + + queues = string.split(',').map(&:strip) + + if queues.include?('*') + { all: true } + elsif exclude_queues + { exclude: queues } + elsif ordered_queues + { + include: queues, + ordered_queues: true, + } + else + { include: queues } + end + end + + def json_string(json, attr) + Arel::Nodes::Grouping.new(Arel::Nodes::InfixOperation.new('->>', json, Arel::Nodes.build_quoted(attr))) + end + + def params_job_class + json_string(arel_table['serialized_params'], 'job_class') + end + + def params_execution_count + Arel::Nodes::InfixOperation.new( + '::', + json_string(arel_table['serialized_params'], 'executions'), + Arel.sql('integer') + ) + end + + def coalesce_scheduled_at_created_at + arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at']) + end + end + + def self.build_for_enqueue(active_job, scheduled_at: nil) + new(**enqueue_args(active_job, scheduled_at: scheduled_at)) + end + + # Construct arguments for GoodJob::Execution from an ActiveJob instance. + def self.enqueue_args(active_job, scheduled_at: nil) + execution_args = { + id: active_job.job_id, + active_job_id: active_job.job_id, + job_class: active_job.class.name, + queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, + priority: active_job.priority || DEFAULT_PRIORITY, + serialized_params: active_job.serialize, + created_at: Time.current, + } + + execution_args[:scheduled_at] = if scheduled_at + scheduled_at + elsif active_job.scheduled_at + Time.zone.at(active_job.scheduled_at) + else + execution_args[:created_at] + end + + execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) + + if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? + labels = active_job.good_job_labels.dup + labels.map! { |label| label.to_s.strip.presence } + labels.tap(&:compact!).tap(&:uniq!) + execution_args[:labels] = labels + end + + reenqueued_current_job = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id + current_job = CurrentThread.job + + if reenqueued_current_job + execution_args[:batch_id] = current_job.batch_id + execution_args[:batch_callback_id] = current_job.batch_callback_id + execution_args[:cron_key] = current_job.cron_key + else + execution_args[:batch_id] = GoodJob::Batch.current_batch_id + execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id + execution_args[:cron_key] = CurrentThread.cron_key + execution_args[:cron_at] = CurrentThread.cron_at + end + + execution_args + end + + # Finds the next eligible Execution, acquire an advisory lock related to it, and + # executes the job. + # @yield [Execution, nil] The next eligible Execution, or +nil+ if none found, before it is performed. + # @return [ExecutionResult, nil] + # If a job was executed, returns an array with the {Execution} record, the + # return value for the job's +#perform+ method, and the exception the job + # raised, if any (if the job raised, then the second array entry will be + # +nil+). If there were no jobs to execute, returns +nil+. + def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) + job = nil + result = nil + + unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs| + job = jobs.first + + if job&.executable? + yield(job) if block_given? + + result = job.perform(lock_id: lock_id) + else + job = nil + yield(nil) if block_given? + end + end + + job&.run_callbacks(:perform_unlocked) + result + end + + # Fetches the scheduled execution time of the next eligible Execution(s). + # @param after [DateTime] + # @param limit [Integer] + # @param now_limit [Integer, nil] + # @return [Array] + def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) + query = advisory_unlocked.unfinished.schedule_ordered + + after ||= Time.current + after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime) + after_query = query.where(arel_table['scheduled_at'].gt(after_bind)).or query.where(scheduled_at: nil).where(arel_table['created_at'].gt(after_bind)) + after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } + + if now_limit&.positive? + now_query = query.where(arel_table['scheduled_at'].lt(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))).or query.where(scheduled_at: nil) + now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } + end + + Array(now_at) + after_at + end + + # Places an ActiveJob job on a queue by creating a new {Execution} record. + # @param active_job [ActiveJob::Base] + # The job to enqueue. + # @param scheduled_at [Float] + # Epoch timestamp when the job should be executed, if blank will delegate to the ActiveJob instance + # @param create_with_advisory_lock [Boolean] + # Whether to establish a lock on the {Execution} record after it is created. + # @return [Execution] + # The new {Execution} instance representing the queued ActiveJob job. + def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) + ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| + current_job = CurrentThread.job + + retried = current_job && current_job.active_job_id == active_job.job_id + if retried + job = current_job + job.assign_attributes(enqueue_args(active_job, scheduled_at: scheduled_at)) + job.scheduled_at ||= Time.current + # TODO: these values ideally shouldn't be persisted until the current_job is finished + # which will require handling `retry_job` being called from outside the job context. + job.performed_at = nil + job.finished_at = nil + else + job = build_for_enqueue(active_job, scheduled_at: scheduled_at) + end + + if create_with_advisory_lock + if job.persisted? + job.advisory_lock + else + job.create_with_advisory_lock = true + end + end + + instrument_payload[:job] = job + job.save! + + CurrentThread.retried_job = job if retried + + active_job.provider_job_id = job.id + raise "These should be equal" if active_job.provider_job_id != active_job.job_id + + job + end + end + + def self.format_error(error) + raise ArgumentError unless error.is_a?(Exception) + + [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join + end + # TODO: it would be nice to enforce these values at the model # validates :active_job_id, presence: true # validates :scheduled_at, presence: true @@ -193,10 +553,191 @@ def destroy_job end end - # Utility method to determine which execution record is used to represent this job - # @return [String] - def _execution_id - attributes['id'] + # Build an ActiveJob instance and deserialize the arguments, using `#active_job_data`. + # + # @param ignore_deserialization_errors [Boolean] + # Whether to ignore ActiveJob::DeserializationError and NameError when deserializing the arguments. + # This is most useful if you aren't planning to use the arguments directly. + def active_job(ignore_deserialization_errors: false) + ActiveJob::Base.deserialize(active_job_data).tap do |aj| + aj.send(:deserialize_arguments_if_needed) + rescue ActiveJob::DeserializationError + raise unless ignore_deserialization_errors + end + rescue NameError + raise unless ignore_deserialization_errors + end + + # Execute the ActiveJob job this {Execution} represents. + # @return [ExecutionResult] + # An array of the return value of the job's +#perform+ method and the + # exception raised by the job, if any. If the job completed successfully, + # the second array entry (the exception) will be +nil+ and vice versa. + def perform(lock_id:) + run_callbacks(:perform) do + raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at + + job_performed_at = Time.current + monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + execution = nil + result = GoodJob::CurrentThread.within do |current_thread| + current_thread.reset + current_thread.job = self + + existing_performed_at = performed_at + if existing_performed_at + current_thread.execution_interrupted = existing_performed_at + + interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'")) + self.error = interrupt_error_string + self.error_event = :interrupted + monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds + + execution_attrs = { + error: interrupt_error_string, + finished_at: job_performed_at, + error_event: :interrupted, + duration: monotonic_duration, + } + executions.where(finished_at: nil).where.not(performed_at: nil).update_all(execution_attrs) # rubocop:disable Rails/SkipsModelValidations + end + + transaction do + execution_attrs = { + job_class: job_class, + queue_name: queue_name, + serialized_params: serialized_params, + scheduled_at: (scheduled_at || created_at), + created_at: job_performed_at, + process_id: lock_id, + } + job_attrs = { + performed_at: job_performed_at, + executions_count: ((executions_count || 0) + 1), + locked_by_id: lock_id, + locked_at: Time.current, + } + + execution = executions.create!(execution_attrs) + update!(job_attrs) + end + + ActiveSupport::Notifications.instrument("perform_job.good_job", { job: self, execution: execution, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| + value = ActiveJob::Base.execute(active_job_data) + + if value.is_a?(Exception) + handled_error = value + value = nil + end + handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard + + error_event = if handled_error == current_thread.error_on_discard + :discarded + elsif handled_error == current_thread.error_on_retry + :retried + elsif handled_error == current_thread.error_on_retry_stopped + :retry_stopped + elsif handled_error + :handled + end + + instrument_payload.merge!( + value: value, + handled_error: handled_error, + retried: current_thread.retried_job.present?, + error_event: error_event + ) + ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried_job: current_thread.retried_job) + rescue StandardError => e + error_event = if e.is_a?(GoodJob::InterruptError) + :interrupted + elsif e == current_thread.error_on_retry_stopped + :retry_stopped + else + :unhandled + end + + instrument_payload[:unhandled_error] = e + ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event) + end + end + + job_attributes = { locked_by_id: nil, locked_at: nil } + + job_error = result.handled_error || result.unhandled_error + if job_error + error_string = self.class.format_error(job_error) + + job_attributes[:error] = error_string + job_attributes[:error_event] = result.error_event + + execution.error = error_string + execution.error_event = result.error_event + execution.error_backtrace = job_error.backtrace + else + job_attributes[:error] = nil + job_attributes[:error_event] = nil + end + + job_finished_at = Time.current + monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds + job_attributes[:finished_at] = job_finished_at + + execution.finished_at = job_finished_at + execution.duration = monotonic_duration + + retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error + reenqueued = result.retried? || retried_good_job_id.present? || retry_unhandled_error + if reenqueued + job_attributes[:performed_at] = nil + job_attributes[:finished_at] = nil + end + + assign_attributes(job_attributes) + preserve_unhandled = (result.unhandled_error && (GoodJob.retry_on_unhandled_error || GoodJob.preserve_job_records == :on_unhandled_error)) + if finished_at.blank? || GoodJob.preserve_job_records == true || reenqueued || preserve_unhandled || cron_key.present? + transaction do + execution.save! + save! + end + else + destroy! + end + + result + end + end + + # Tests whether this job is safe to be executed by this thread. + # @return [Boolean] + def executable? + reload.finished_at.blank? + rescue ActiveRecord::RecordNotFound + false + end + + def number + serialized_params.fetch('executions', 0) + 1 + end + + # Time between when this job was expected to run and when it started running + def queue_latency + now = Time.zone.now + expected_start = scheduled_at || created_at + actual_start = performed_at || finished_at || now + + actual_start - expected_start unless expected_start >= now + end + + # Time between when this job started and finished + def runtime_latency + (finished_at || Time.zone.now) - performed_at if performed_at + end + + def job_state + state = { queue_name: queue_name } + state[:scheduled_at] = scheduled_at if scheduled_at + state end private @@ -222,6 +763,23 @@ def _discard_job(message) update_record.call end end + + def reset_batch_values(&block) + GoodJob::Batch.within_thread(batch_id: nil, batch_callback_id: nil, &block) + end + + def continue_discard_or_finish_batch + batch._continue_discard_or_finish(self) if batch.present? + end + + def active_job_data + serialized_params.deep_dup + .tap do |job_data| + job_data["provider_job_id"] = id + job_data["good_job_concurrency_key"] = concurrency_key if concurrency_key + job_data["good_job_labels"] = Array(labels) if labels.present? + end + end end end diff --git a/lib/good_job.rb b/lib/good_job.rb index 01cc76780..7558cd062 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -211,7 +211,7 @@ def self.cleanup_preserved_jobs(older_than: nil, in_batches_of: 1_000) ActiveSupport::Notifications.instrument("cleanup_preserved_jobs.good_job", { older_than: older_than, timestamp: timestamp }) do |payload| deleted_jobs_count = 0 deleted_batches_count = 0 - deleted_discrete_executions_count = 0 + deleted_executions_count = 0 jobs_query = GoodJob::Job.finished_before(timestamp).order(finished_at: :asc).limit(in_batches_of) jobs_query = jobs_query.succeeded unless include_discarded @@ -219,8 +219,8 @@ def self.cleanup_preserved_jobs(older_than: nil, in_batches_of: 1_000) active_job_ids = jobs_query.pluck(:active_job_id) break if active_job_ids.empty? - deleted_discrete_executions = GoodJob::DiscreteExecution.where(active_job_id: active_job_ids).delete_all - deleted_discrete_executions_count += deleted_discrete_executions + deleted_executions = GoodJob::Execution.where(active_job_id: active_job_ids).delete_all + deleted_executions_count += deleted_executions deleted_jobs = GoodJob::Job.where(active_job_id: active_job_ids).delete_all deleted_jobs_count += deleted_jobs @@ -236,10 +236,10 @@ def self.cleanup_preserved_jobs(older_than: nil, in_batches_of: 1_000) end payload[:destroyed_batches_count] = deleted_batches_count - payload[:destroyed_discrete_executions_count] = deleted_discrete_executions_count + payload[:destroyed_executions_count] = deleted_executions_count payload[:destroyed_jobs_count] = deleted_jobs_count - destroyed_records_count = deleted_batches_count + deleted_discrete_executions_count + deleted_jobs_count + destroyed_records_count = deleted_batches_count + deleted_executions_count + deleted_jobs_count payload[:destroyed_records_count] = destroyed_records_count destroyed_records_count diff --git a/lib/good_job/active_job_extensions/concurrency.rb b/lib/good_job/active_job_extensions/concurrency.rb index 45625d052..147622216 100644 --- a/lib/good_job/active_job_extensions/concurrency.rb +++ b/lib/good_job/active_job_extensions/concurrency.rb @@ -94,9 +94,9 @@ def deserialize(job_data) throttle_limit = throttle[0] throttle_period = throttle[1] - query = DiscreteExecution.joins(:job) - .where(GoodJob::Job.table_name => { concurrency_key: key }) - .where(DiscreteExecution.arel_table[:created_at].gt(DiscreteExecution.bind_value('created_at', throttle_period.ago, ActiveRecord::Type::DateTime))) + query = Execution.joins(:job) + .where(GoodJob::Job.table_name => { concurrency_key: key }) + .where(Execution.arel_table[:created_at].gt(Execution.bind_value('created_at', throttle_period.ago, ActiveRecord::Type::DateTime))) allowed_active_job_ids = query.where(error: nil).or(query.where.not(error: "GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError: GoodJob::ActiveJobExtensions::Concurrency::ThrottleExceededError")) .order(created_at: :asc) .limit(throttle_limit) diff --git a/spec/app/jobs/example_job_spec.rb b/spec/app/jobs/example_job_spec.rb index ed4a43293..b9745d14f 100644 --- a/spec/app/jobs/example_job_spec.rb +++ b/spec/app/jobs/example_job_spec.rb @@ -27,8 +27,8 @@ Timecop.return good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) - expect(good_job.discrete_executions.count).to eq 2 - expect(good_job.discrete_executions.last.error).to be_nil + expect(good_job.executions.count).to eq 2 + expect(good_job.executions.last.error).to be_nil end end @@ -43,8 +43,8 @@ good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) - expect(good_job.discrete_executions.count).to eq 6 - expect(good_job.discrete_executions.last.error).to be_nil + expect(good_job.executions.count).to eq 6 + expect(good_job.executions.last.error).to be_nil end end @@ -58,11 +58,11 @@ Timecop.return good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) - expect(good_job.discrete_executions.count).to eq 3 - discrete_execution = good_job.discrete_executions.last - expect(discrete_execution.error).to be_present - expect(discrete_execution.error_backtrace.count).to be > 100 - expect(discrete_execution.filtered_error_backtrace).to eq(["app/jobs/example_job.rb:41:in `perform'"]) + expect(good_job.executions.count).to eq 3 + execution = good_job.executions.last + expect(execution.error).to be_present + expect(execution.error_backtrace.count).to be > 100 + expect(execution.filtered_error_backtrace).to eq(["app/jobs/example_job.rb:41:in `perform'"]) end end diff --git a/spec/app/models/good_job/job_spec.rb b/spec/app/models/good_job/job_spec.rb index 911ec7c66..4220466d1 100644 --- a/spec/app/models/good_job/job_spec.rb +++ b/spec/app/models/good_job/job_spec.rb @@ -23,7 +23,7 @@ 'arguments' => ['cat', { 'canine' => 'dog' }], } ).tap do |job| - job.discrete_executions.create!( + job.executions.create!( scheduled_at: 1.minute.ago, created_at: 1.minute.ago, finished_at: 1.minute.ago, @@ -166,7 +166,7 @@ def perform(*) expect(job).to be_finished - executions = job.discrete_executions.order(created_at: :asc).to_a + executions = job.executions.order(created_at: :asc).to_a expect(executions.size).to eq 3 # initial execution isn't created in test expect(executions.map(&:error)).to eq ["TestJob::Error: TestJob::Error", "TestJob::Error: TestJob::Error", nil] expect(executions[0].finished_at).to be < executions[1].finished_at @@ -341,7 +341,7 @@ def perform(*) job.destroy_job expect { job.reload }.to raise_error ActiveRecord::RecordNotFound - expect(GoodJob::DiscreteExecution.count).to eq 0 + expect(GoodJob::Execution.count).to eq 0 end context 'when a job is not finished' do @@ -390,18 +390,16 @@ def perform(result_value = nil, raise_error: false) describe '.enqueue' do let(:active_job) { TestJob.new } - context 'when discrete' do - it 'assigns is discrete, id, scheduled_at' do - expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1) + it 'assigns id, scheduled_at' do + expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1) - job = described_class.last - expect(job).to have_attributes( - id: active_job.job_id, - active_job_id: active_job.job_id, - created_at: within(1.second).of(Time.current), - scheduled_at: job.created_at - ) - end + job = described_class.last + expect(job).to have_attributes( + id: active_job.job_id, + active_job_id: active_job.job_id, + created_at: within(1.second).of(Time.current), + scheduled_at: job.created_at + ) end it 'creates a new GoodJob record' do @@ -951,7 +949,7 @@ def job_params ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline) end - it 'updates the Execution record and creates a DiscreteExecution record' do + it 'updates the Job record and creates a Execution record' do good_job.perform(lock_id: process_id) expect(good_job.reload).to have_attributes( @@ -959,9 +957,9 @@ def job_params finished_at: within(1.second).of(Time.current) ) - dexecution = good_job.discrete_executions.first - expect(dexecution).to be_present - expect(dexecution).to have_attributes( + execution = good_job.executions.first + expect(execution).to be_present + expect(execution).to have_attributes( active_job_id: good_job.active_job_id, job_class: good_job.job_class, queue_name: good_job.queue_name, @@ -997,9 +995,9 @@ def job_params finished_at: nil, scheduled_at: within(10.minutes).of(1.hour.from_now) # interval because of retry jitter ) - expect(GoodJob::DiscreteExecution.count).to eq(1) - discrete_execution = good_job.discrete_executions.first - expect(discrete_execution).to have_attributes( + expect(GoodJob::Execution.count).to eq(1) + execution = good_job.executions.first + expect(execution).to have_attributes( active_job_id: good_job.active_job_id, error: "TestJob::ExpectedError: Raised expected error", created_at: within(1.second).of(Time.current), @@ -1026,8 +1024,8 @@ def job_params scheduled_at: within(0.5).of(1.second.from_now) ) - expect(good_job.discrete_executions.size).to eq(1) - expect(good_job.discrete_executions.first).to have_attributes( + expect(good_job.executions.size).to eq(1) + expect(good_job.executions.first).to have_attributes( performed_at: within(1.second).of(Time.current), finished_at: within(1.second).of(Time.current), duration: be_a(ActiveSupport::Duration) diff --git a/spec/integration/adapter_spec.rb b/spec/integration/adapter_spec.rb index f42d804ba..42065e4c9 100644 --- a/spec/integration/adapter_spec.rb +++ b/spec/integration/adapter_spec.rb @@ -167,7 +167,7 @@ def perform job = GoodJob::Job.first expect(job.status).to eq :succeeded - expect(job.discrete_executions.count).to eq 3 + expect(job.executions.count).to eq 3 end it 'retries immediately when bulk enqueued' do diff --git a/spec/integration/batch_spec.rb b/spec/integration/batch_spec.rb index 73ae9b3cd..aa95d980c 100644 --- a/spec/integration/batch_spec.rb +++ b/spec/integration/batch_spec.rb @@ -216,7 +216,7 @@ def perform(*_args, **_kwargs) GoodJob.perform_inline expect(GoodJob::Job.count).to eq 3 - expect(GoodJob::DiscreteExecution.count).to eq 5 + expect(GoodJob::Execution.count).to eq 5 expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1 expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2 @@ -231,7 +231,7 @@ def perform(*_args, **_kwargs) GoodJob.perform_inline expect(GoodJob::Job.count).to eq 3 - expect(GoodJob::DiscreteExecution.count).to eq 5 + expect(GoodJob::Execution.count).to eq 5 expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1 expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2 @@ -315,7 +315,7 @@ def perform(_batch, _params) expect(job.performed_at).to be_present expect(job.finished_at).to be_present - job_executions = job.discrete_executions.order(:created_at).to_a + job_executions = job.executions.order(:created_at).to_a expect(job_executions.size).to eq 2 expect(job_executions.first.status).to eq :discarded expect(job_executions.last.status).to eq :succeeded diff --git a/spec/integration/capsule_spec.rb b/spec/integration/capsule_spec.rb index 934e3b088..6e232e07b 100644 --- a/spec/integration/capsule_spec.rb +++ b/spec/integration/capsule_spec.rb @@ -27,6 +27,6 @@ def perform perform_capsule.shutdown enqueue_capsule.shutdown - expect(GoodJob::DiscreteExecution.count).to eq(total_jobs) + expect(GoodJob::Execution.count).to eq(total_jobs) end end diff --git a/spec/integration/jobs_spec.rb b/spec/integration/jobs_spec.rb index 98b2addaa..80d67b5db 100644 --- a/spec/integration/jobs_spec.rb +++ b/spec/integration/jobs_spec.rb @@ -110,8 +110,8 @@ def perform error: "StandardError: error", error_event: "discarded" ) - expect(good_job.discrete_executions.size).to eq 1 - expect(good_job.discrete_executions.last).to have_attributes( + expect(good_job.executions.size).to eq 1 + expect(good_job.executions.last).to have_attributes( error: "StandardError: error", error_event: "discarded" ) @@ -142,8 +142,8 @@ def perform error: nil, error_event: nil ) - expect(good_job.discrete_executions.size).to eq 2 - expect(good_job.discrete_executions.order(created_at: :asc).to_a).to contain_exactly(have_attributes(error: "StandardError: error", error_event: "handled"), have_attributes(error: nil, error_event: nil)) + expect(good_job.executions.size).to eq 2 + expect(good_job.executions.order(created_at: :asc).to_a).to contain_exactly(have_attributes(error: "StandardError: error", error_event: "handled"), have_attributes(error: nil, error_event: nil)) end end @@ -171,8 +171,8 @@ def perform error: "TestError: error", error_event: "retry_stopped" ) - expect(good_job.discrete_executions.size).to eq 2 - expect(good_job.discrete_executions.order(created_at: :asc).to_a).to contain_exactly(have_attributes(error: "TestError: error", error_event: "retried"), have_attributes(error: "TestError: error", error_event: "retry_stopped")) + expect(good_job.executions.size).to eq 2 + expect(good_job.executions.order(created_at: :asc).to_a).to contain_exactly(have_attributes(error: "TestError: error", error_event: "retried"), have_attributes(error: "TestError: error", error_event: "retry_stopped")) expect(THREAD_ERRORS.size).to eq 1 THREAD_ERRORS.clear @@ -197,7 +197,7 @@ def perform expect(GoodJob::Job.count).to eq 1 job = GoodJob::Job.order(:created_at).last - executions = job.discrete_executions.order(:created_at).to_a + executions = job.executions.order(:created_at).to_a expect(job.status).to eq :succeeded expect(job.performed_at).to be_present diff --git a/spec/integration/scheduler_spec.rb b/spec/integration/scheduler_spec.rb index 4683891fc..218ccc03f 100644 --- a/spec/integration/scheduler_spec.rb +++ b/spec/integration/scheduler_spec.rb @@ -92,7 +92,7 @@ def perform(*args, **kwargs) "Expected run jobs(#{RUN_JOBS.size}) to equal number of jobs (#{number_of_jobs}). Instead ran jobs multiple times:\n#{rerun_jobs.join("\n")}" } - expect(GoodJob::DiscreteExecution.count).to eq number_of_jobs + expect(GoodJob::Execution.count).to eq number_of_jobs end end diff --git a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb index e42de273d..b6a316cd8 100644 --- a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb +++ b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb @@ -137,14 +137,14 @@ def perform(name:) 5.times { scheduler.create_thread } sleep_until(max: 10, increments_of: 0.5) do - GoodJob::DiscreteExecution.where(active_job_id: active_job.job_id).finished.count >= 1 + GoodJob::Execution.where(active_job_id: active_job.job_id).finished.count >= 1 end scheduler.shutdown expect(GoodJob::Job.find_by(active_job_id: active_job.job_id).concurrency_key).to eq "Alice" - expect(GoodJob::DiscreteExecution.count).to be >= 1 - expect(GoodJob::DiscreteExecution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present + expect(GoodJob::Execution.count).to be >= 1 + expect(GoodJob::Execution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present end it 'is ignored with the job is executed via perform_now' do diff --git a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb index bd7902a4a..1fc7d3685 100644 --- a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb +++ b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb @@ -19,7 +19,7 @@ def perform active_job = TestJob.perform_later good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) good_job.update!(performed_at: Time.current, finished_at: nil) - good_job.discrete_executions.create!(performed_at: Time.current, finished_at: nil) + good_job.executions.create!(performed_at: Time.current, finished_at: nil) end it 'raises a GoodJob::InterruptError' do @@ -30,41 +30,39 @@ def perform ) end - context 'when discrete execution is enabled' do - it 'does not create a new execution' do - TestJob.retry_on GoodJob::InterruptError + it 'does not create a new execution' do + TestJob.retry_on GoodJob::InterruptError - expect { GoodJob.perform_inline }.not_to raise_error - expect(GoodJob::Job.count).to eq(1) - expect(GoodJob::DiscreteExecution.count).to eq(2) + expect { GoodJob.perform_inline }.not_to raise_error + expect(GoodJob::Job.count).to eq(1) + expect(GoodJob::Execution.count).to eq(2) - job = GoodJob::Job.first - expect(job.discrete_executions.count).to eq(2) - expect(job).to have_attributes( - performed_at: be_blank, - finished_at: be_blank, - error: start_with('GoodJob::InterruptError: Interrupted after starting perform at'), - error_event: "retried" - ) + job = GoodJob::Job.first + expect(job.executions.count).to eq(2) + expect(job).to have_attributes( + performed_at: be_blank, + finished_at: be_blank, + error: start_with('GoodJob::InterruptError: Interrupted after starting perform at'), + error_event: "retried" + ) - initial_discrete_execution = job.discrete_executions.first - expect(initial_discrete_execution).to have_attributes( - performed_at: be_present, - finished_at: be_present, - duration: be_present, - error: start_with('GoodJob::InterruptError: Interrupted after starting perform at'), - error_event: "interrupted" - ) + initial_execution = job.executions.first + expect(initial_execution).to have_attributes( + performed_at: be_present, + finished_at: be_present, + duration: be_present, + error: start_with('GoodJob::InterruptError: Interrupted after starting perform at'), + error_event: "interrupted" + ) - retried_discrete_execution = job.discrete_executions.last - expect(retried_discrete_execution).to have_attributes( - performed_at: be_present, - finished_at: be_present, - duration: be_present, - error: start_with('GoodJob::InterruptError: Interrupted after starting perform at'), - error_event: "retried" - ) - end + retried_execution = job.executions.last + expect(retried_execution).to have_attributes( + performed_at: be_present, + finished_at: be_present, + duration: be_present, + error: start_with('GoodJob::InterruptError: Interrupted after starting perform at'), + error_event: "retried" + ) end end diff --git a/spec/lib/good_job/active_job_extensions/labels_spec.rb b/spec/lib/good_job/active_job_extensions/labels_spec.rb index 889129097..181f09ec8 100644 --- a/spec/lib/good_job/active_job_extensions/labels_spec.rb +++ b/spec/lib/good_job/active_job_extensions/labels_spec.rb @@ -110,7 +110,7 @@ def perform TestJob.set(good_job_labels: ["buffalo"]).perform_later GoodJob.perform_inline - expect(GoodJob::DiscreteExecution.count).to eq 3 + expect(GoodJob::Execution.count).to eq 3 expect(GoodJob::Job.first).to have_attributes(labels: %w[buffalo gopher]) end end diff --git a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb index bebb5c528..1ef5fe7ed 100644 --- a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb +++ b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb @@ -86,7 +86,7 @@ def perform scheduler = GoodJob::Scheduler.new(performer, max_threads: 5) scheduler.create_thread - sleep_until(max: 5, increments_of: 0.5) { GoodJob::DiscreteExecution.count >= 2 } + sleep_until(max: 5, increments_of: 0.5) { GoodJob::Execution.count >= 2 } scheduler.shutdown expect(GoodJob::Notifier).not_to have_received(:notify) diff --git a/spec/lib/good_job_spec.rb b/spec/lib/good_job_spec.rb index 0005de2ee..3b1f83f8b 100644 --- a/spec/lib/good_job_spec.rb +++ b/spec/lib/good_job_spec.rb @@ -56,7 +56,7 @@ let!(:recent_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 12.hours.ago) } let!(:old_unfinished_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, scheduled_at: 15.days.ago, finished_at: nil) } let!(:old_finished_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) } - let!(:old_finished_job_discrete_execution) { GoodJob::DiscreteExecution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) } + let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) } let!(:old_discarded_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") } let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) } @@ -68,7 +68,7 @@ expect { recent_job.reload }.not_to raise_error expect { old_unfinished_job.reload }.not_to raise_error expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound - expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_discarded_job.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end @@ -81,7 +81,7 @@ expect { recent_job.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_unfinished_job.reload }.not_to raise_error expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound - expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_discarded_job.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end @@ -106,7 +106,7 @@ expect { recent_job.reload }.not_to raise_error expect { old_unfinished_job.reload }.not_to raise_error expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound - expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_discarded_job.reload }.not_to raise_error expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end diff --git a/spec/requests/good_job/jobs_controller_spec.rb b/spec/requests/good_job/jobs_controller_spec.rb index 97d9a4a33..452aa328d 100644 --- a/spec/requests/good_job/jobs_controller_spec.rb +++ b/spec/requests/good_job/jobs_controller_spec.rb @@ -93,7 +93,7 @@ describe 'mass_action=retry' do before do job.update(error: "Error message") - job.discrete_executions.first.update(error: "Error message") + job.executions.first.update(error: "Error message") end it 'retries the job' do @@ -108,7 +108,7 @@ expect(flash[:notice]).to eq('Successfully retried 1 job') job.reload - expect(job.discrete_executions.count).to eq 2 + expect(job.executions.count).to eq 2 end context 'when Job is not deserializable' do diff --git a/spec/support/example_app_helper.rb b/spec/support/example_app_helper.rb index f2f419025..b5f4c6771 100644 --- a/spec/support/example_app_helper.rb +++ b/spec/support/example_app_helper.rb @@ -55,9 +55,8 @@ def within_example_app ] models = [ GoodJob::Job, - GoodJob::BatchRecord, GoodJob::Execution, - GoodJob::DiscreteExecution, + GoodJob::BatchRecord, GoodJob::Process, GoodJob::Setting, ]