From cdc9e2487dff8de80b0b43975f9d377351371e7e Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Sat, 3 Oct 2020 10:08:52 -0700 Subject: [PATCH] Create a Timer object that will wake up the Scheduler at specific times --- lib/good_job.rb | 1 + lib/good_job/adapter.rb | 8 ++- lib/good_job/job.rb | 12 ++++ lib/good_job/performer.rb | 15 +++- lib/good_job/scheduler.rb | 30 +++++++- lib/good_job/timer.rb | 114 ++++++++++++++++++++++++++++++ spec/lib/good_job/adapter_spec.rb | 2 +- spec/lib/good_job/job_spec.rb | 9 +++ spec/lib/good_job/timer_spec.rb | 65 +++++++++++++++++ spec/support/reset_good_job.rb | 1 + 10 files changed, 250 insertions(+), 7 deletions(-) create mode 100644 lib/good_job/timer.rb create mode 100644 spec/lib/good_job/timer_spec.rb diff --git a/lib/good_job.rb b/lib/good_job.rb index 6a2ff15e3..d8c92aea3 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -82,6 +82,7 @@ def self.reperform_jobs_on_standard_error=(value) def self.shutdown(wait: true) Notifier.instances.each { |notifier| notifier.shutdown(wait: wait) } Scheduler.instances.each { |scheduler| scheduler.shutdown(wait: wait) } + Timer.instances.each { |timer| timer.shutdown(wait: wait) } end # Tests whether jobs have stopped executing. diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 1d7e96000..df315a372 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -78,8 +78,12 @@ def enqueue_at(active_job, timestamp) end end - executed_locally = execute_async? && @scheduler.create_thread(queue_name: good_job.queue_name) - Notifier.notify(queue_name: good_job.queue_name) unless executed_locally + job_state = { + queue_name: good_job.queue_name, + scheduled_at: good_job.scheduled_at&.to_i, + } + executed_locally = execute_async? && @scheduler.create_thread(job_state) + Notifier.notify(job_state) unless executed_locally good_job end diff --git a/lib/good_job/job.rb b/lib/good_job/job.rb index 7a1264571..59148d54e 100644 --- a/lib/good_job/job.rb +++ b/lib/good_job/job.rb @@ -66,6 +66,12 @@ def self.queue_parser(string) # @return [ActiveRecord::Relation] scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(Time.current)).or(where(scheduled_at: nil)) } + # Order jobs by scheduled (unscheduled or soonest first). + # @!method schedule_ordered + # @!scope class + # @return [ActiveRecord::Relation] + scope :schedule_ordered, -> { order('scheduled_at ASC NULLS FIRST, created_at ASC') } + # Order jobs by priority (highest priority first). # @!method priority_ordered # @!scope class @@ -147,6 +153,12 @@ def self.perform_with_advisory_lock [good_job, result, error] if good_job end + # Fetches the scheduled execution time of the next eligible Job(s). + # @return [Array<(DateTime)>] + def self.next_at(count = 1) + advisory_unlocked.unfinished.schedule_ordered.limit(count).pluck(:created_at, :scheduled_at).map { |timestamps| timestamps.compact.max } + end + # Places an ActiveJob job on a queue by creating a new {Job} record. # @param active_job [ActiveJob::Base] # The job to enqueue. diff --git a/lib/good_job/performer.rb b/lib/good_job/performer.rb index a24d87026..e0df0a728 100644 --- a/lib/good_job/performer.rb +++ b/lib/good_job/performer.rb @@ -28,11 +28,15 @@ class Performer # Used to determine whether the performer should be used in GoodJob's # current state. GoodJob state is a +Hash+ that will be passed as the # first argument to +filter+ and includes info like the current queue. - def initialize(target, method_name, name: nil, filter: nil) + # @param next_at_method [Symbol] + # The name of a method on +target+ that returns timestamps of when next + # tasks may be available. + def initialize(target, method_name, name: nil, filter: nil, next_at_method: nil) @target = target @method_name = method_name @name = name @filter = filter + @next_at_method_name = next_at_method end # Find and perform any eligible jobs. @@ -56,5 +60,14 @@ def next?(state = {}) @filter.call(state) end + + # The Returns timestamps of when next tasks may be available. + # @param count [Integer] number of timestamps to return. + # @return [Array<(Time, Timestamp)>, nil] + def next_at(count = 1) + return unless @next_at_method_name + + @target.public_send(@next_at_method_name, count) + end end end diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 1e2907057..0e07e307b 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -51,7 +51,13 @@ def self.from_configuration(configuration) true end end - job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter) + job_performer = GoodJob::Performer.new( + job_query, + :perform_with_advisory_lock, + name: queue_string, + filter: job_filter, + next_at_method: :next_at + ) GoodJob::Scheduler.new(job_performer, max_threads: max_threads) end @@ -71,6 +77,7 @@ def initialize(performer, max_threads: nil) self.class.instances << self @performer = performer + @timer_wake = GoodJob::Timer.new([self, :create_thread]) @pool_options = DEFAULT_POOL_OPTIONS.dup @pool_options[:max_threads] = max_threads if max_threads.present? @@ -122,7 +129,19 @@ def restart(wait: true) # Returns +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it. def create_thread(state = nil) return nil unless @pool.running? && @pool.ready_worker_count.positive? - return false if state && !@performer.next?(state) + + if state + return false unless @performer.next?(state) + + if state[:scheduled_at] + scheduled_at = Time.zone.at(state[:scheduled_at]) + + if scheduled_at > Time.current + @timer_wake.push(scheduled_at) + return true + end + end + end future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| output = nil @@ -141,7 +160,12 @@ def create_thread(state = nil) def task_observer(time, output, thread_error) GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call) instrument("finished_job_task", { result: output, error: thread_error, time: time }) - create_thread if output + if output + create_thread + elsif @performer.respond_to?(:next_at) + next_at = @performer.next_at(1).first + @timer_wake.push(next_at) if next_at + end end private diff --git a/lib/good_job/timer.rb b/lib/good_job/timer.rb new file mode 100644 index 000000000..df2559bc0 --- /dev/null +++ b/lib/good_job/timer.rb @@ -0,0 +1,114 @@ +module GoodJob + # + # Timers will wake at the provided times to check for new work. + # + # Timers manage a discrete set of wake up times, sorted by soonest. + # New times can be pushed onto a Timer, and they will added if they are + # sooner than existing tracked times, or discarded if they are later than + # existing tracked times and the Timer's queue of tracked times is full. + # + # @todo Allow Timer to track an unbounded number of wake times. + # + # Timers are intended to be used with a {GoodJob::Scheduler} to provide + # reduced execution scheduling latency compared to a {GoodJob::Poller}. + # + class Timer + # Default number of wake times to track + DEFAULT_MAX_QUEUE = 5 + + # Defaults for instance of +Concurrent::ThreadPoolExecutor+. + EXECUTOR_OPTIONS = { + name: 'timer', + min_threads: 0, + max_threads: 1, + auto_terminate: true, + idletime: 60, + max_queue: 0, + fallback_policy: :discard, # shouldn't matter -- 0 max queue + }.freeze + + # @!attribute [r] instances + # @!scope class + # List of all instantiated Timers in the current process. + # @return [array] + cattr_reader :instances, default: [], instance_reader: false + + # @!attribute [r] queue + # List of scheduled wakeups. + # @return [GoodJob::Timer::ScheduleTask] + attr_reader :queue + + # @!attribute [r] queue + # Number of wake times to track. + # @return [Integer] + attr_reader :max_queue + + # List of recipients that will receive wakeups. + # @return [Array<#call, Array(Object, Symbol)>] + attr_reader :recipients + + # @param recipients [Array<#call, Array(Object, Symbol)>] + # @param max_queue [nil, Integer] Maximum number of times to track + def initialize(*recipients, max_queue: nil) + @recipients = Concurrent::Array.new(recipients) + @max_queue = max_queue || DEFAULT_MAX_QUEUE + @queue = Concurrent::Array.new + @mutex = Mutex.new + + self.class.instances << self + + create_executor + end + + # Add a wake time to be tracked. + # The timestamp value be be discarded it is not sooner than the + # @param timestamp [Time, DateTime] the wake time + def push(timestamp) + @mutex.synchronize do + queue.select!(&:pending?) + return if queue.size == max_queue && timestamp > queue.last.scheduled_at + + task = ScheduledTask.new(timestamp, args: [@recipients], executor: @executor) do |recipients| + recipients.each do |recipient| + target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] + target.send(method_name) + end + end + task.execute + + queue.unshift(task) + queue.sort_by!(&:scheduled_at) + + removed_items = queue.slice!(max_queue..-1) + removed_items&.each(&:cancel) + + task + end + end + + # Shut down the timer. + def shutdown(wait: true) + return unless @executor&.running? + + @executor.shutdown + @executor.wait_for_termination if wait + end + + private + + def create_executor + @executor = Concurrent::ThreadPoolExecutor.new(EXECUTOR_OPTIONS) + end + + class ScheduledTask < Concurrent::ScheduledTask + attr_reader :scheduled_at + + def initialize(timestamp, **args, &block) + @scheduled_at = timestamp + + delay = [(timestamp - Time.current).to_f, 0].max + super(delay, **args, &block) + end + end + end +end diff --git a/spec/lib/good_job/adapter_spec.rb b/spec/lib/good_job/adapter_spec.rb index 0d281f5ff..3d6dfee27 100644 --- a/spec/lib/good_job/adapter_spec.rb +++ b/spec/lib/good_job/adapter_spec.rb @@ -4,7 +4,7 @@ RSpec.describe GoodJob::Adapter do let(:adapter) { described_class.new } let(:active_job) { instance_double(ApplicationJob) } - let(:good_job) { instance_double(GoodJob::Job, queue_name: 'default') } + let(:good_job) { instance_double(GoodJob::Job, queue_name: 'default', scheduled_at: nil) } describe '#initialize' do it 'guards against improper execution modes' do diff --git a/spec/lib/good_job/job_spec.rb b/spec/lib/good_job/job_spec.rb index 4b3cd1ddb..4a86b4201 100644 --- a/spec/lib/good_job/job_spec.rb +++ b/spec/lib/good_job/job_spec.rb @@ -84,6 +84,15 @@ def perform(result_value = nil, raise_error: false) end end + describe '.next_at' do + let(:active_job) { ExampleJob.new } + let!(:good_job) { described_class.enqueue(active_job) } + + it 'returns an array of timestamps' do + expect(described_class.next_at).to eq [good_job.created_at] + end + end + describe '.queue_parser' do it 'creates an intermediary hash' do result = described_class.queue_parser('first,second') diff --git a/spec/lib/good_job/timer_spec.rb b/spec/lib/good_job/timer_spec.rb new file mode 100644 index 000000000..9d1146df9 --- /dev/null +++ b/spec/lib/good_job/timer_spec.rb @@ -0,0 +1,65 @@ +require 'rails_helper' + +RSpec.describe GoodJob::Timer do + describe '#initialize' do + it 'succeeds' do + described_class.new + end + end + + describe '#push' do + let(:timer) { described_class.new(max_queue: 2) } + + it 'adds a future to the queue' do + run_at = 1.minute.from_now + timer.push(run_at) + + task = timer.queue.first + expect(task).to be_a Concurrent::ScheduledTask + expect(task.scheduled_at).to eq(run_at) + end + + it 'maintains the appropriate queue size' do + one_minute = 1.minute.from_now + two_minutes = 2.minutes.from_now + + timer.push(one_minute) + timer.push(two_minutes) + + (3..5).to_a.each { |i| timer.push(i.minutes.from_now) } + + expect(timer.queue.map(&:scheduled_at)).to eq [one_minute, two_minutes] + end + end + + describe '#recipients' do + let(:recipient) { -> { RUNS << Time.current } } + let(:timer) { described_class.new(recipient, max_queue: 2) } + + before do + stub_const "RUNS", Concurrent::Array.new + end + + it 'triggers the recipient at the appropriate time' do + scheduled_at = 0.1.seconds.from_now + timer.push(scheduled_at) + sleep_until(max: 5) { RUNS.any? } + + expect(RUNS.size).to eq(1) + expect(RUNS.first).to be_within(0.01.seconds).of(scheduled_at) + end + + it 'only triggers scheduled items' do + one_tenth = 0.1.seconds.from_now + two_tenths = 0.2.seconds.from_now + + timer.push(one_tenth) + timer.push(two_tenths) + (3..5).to_a.each { |i| timer.push((i * 0.1).minutes.from_now) } + + sleep(1) + + expect(RUNS.size).to eq 2 + end + end +end diff --git a/spec/support/reset_good_job.rb b/spec/support/reset_good_job.rb index aa65ef446..b7c39f9aa 100644 --- a/spec/support/reset_good_job.rb +++ b/spec/support/reset_good_job.rb @@ -7,5 +7,6 @@ GoodJob.shutdown GoodJob::Notifier.instances.clear GoodJob::Scheduler.instances.clear + GoodJob::Timer.instances.clear end end