From a90be5e00997b2486a8f369a871794ee92a2b838 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 29 Dec 2021 17:43:55 -0800 Subject: [PATCH] Track processes in the database and on the Dashboard --- .../good_job/processes_controller.rb | 8 ++ .../views/good_job/processes/index.html.erb | 40 ++++++++++ .../app/views/layouts/good_job/base.html.erb | 15 +--- engine/config/routes.rb | 15 ++-- .../migrations/create_good_jobs.rb.erb | 5 ++ .../04_create_good_job_processes.rb.erb | 19 +++++ lib/good_job/active_job_job.rb | 5 +- lib/good_job/assignable_connection.rb | 28 +++++++ lib/good_job/base_record.rb | 18 +++++ lib/good_job/current_thread.rb | 2 +- lib/good_job/daemon.rb | 6 +- lib/good_job/execution.rb | 17 +--- lib/good_job/notifier.rb | 78 +++++++++++++------ lib/good_job/notifier/process_registration.rb | 31 ++++++++ lib/good_job/process.rb | 65 ++++++++++++++++ lib/good_job/scheduler.rb | 5 +- spec/lib/good_job/notifier_spec.rb | 29 +++++++ spec/lib/good_job/process_spec.rb | 42 ++++++++++ ...0211230012344_create_good_job_processes.rb | 19 +++++ spec/test_app/db/schema.rb | 8 +- 20 files changed, 387 insertions(+), 68 deletions(-) create mode 100644 engine/app/controllers/good_job/processes_controller.rb create mode 100644 engine/app/views/good_job/processes/index.html.erb create mode 100644 lib/generators/good_job/templates/update/migrations/04_create_good_job_processes.rb.erb create mode 100644 lib/good_job/assignable_connection.rb create mode 100644 lib/good_job/base_record.rb create mode 100644 lib/good_job/notifier/process_registration.rb create mode 100644 lib/good_job/process.rb create mode 100644 spec/lib/good_job/process_spec.rb create mode 100644 spec/test_app/db/migrate/20211230012344_create_good_job_processes.rb diff --git a/engine/app/controllers/good_job/processes_controller.rb b/engine/app/controllers/good_job/processes_controller.rb new file mode 100644 index 000000000..48532849a --- /dev/null +++ b/engine/app/controllers/good_job/processes_controller.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true +module GoodJob + class ProcessesController < GoodJob::BaseController + def index + @processes = GoodJob::Process.active.order(created_at: :desc) if GoodJob::Process.migrated? + end + end +end diff --git a/engine/app/views/good_job/processes/index.html.erb b/engine/app/views/good_job/processes/index.html.erb new file mode 100644 index 000000000..87bc270f2 --- /dev/null +++ b/engine/app/views/good_job/processes/index.html.erb @@ -0,0 +1,40 @@ +<% if !GoodJob::Process.migrated? %> +
+
+

+ Feature unavailable because of pending database migration. +

+
+
+<% elsif @processes.present? %> +
+
+ + + + + + + + + + <% @processes.each do |process| %> + + + + + + <% end %> + +
Process UUIDCreated AtState
<%= process.id %><%= relative_time(process.created_at) %><%= tag.pre JSON.pretty_generate(process.state) %>
+
+
+<% else %> +
+
+

+ No GoodJob processes found. +

+
+
+<% end %> diff --git a/engine/app/views/layouts/good_job/base.html.erb b/engine/app/views/layouts/good_job/base.html.erb index 0188b0d1f..a3e4fddc5 100644 --- a/engine/app/views/layouts/good_job/base.html.erb +++ b/engine/app/views/layouts/good_job/base.html.erb @@ -33,23 +33,14 @@ + - -
Times are displayed in <%= Time.current.zone %> timezone
diff --git a/engine/config/routes.rb b/engine/config/routes.rb index 3aca16a04..3eac241bb 100644 --- a/engine/config/routes.rb +++ b/engine/config/routes.rb @@ -2,11 +2,7 @@ GoodJob::Engine.routes.draw do root to: 'executions#index' - resources :cron_entries, only: %i[index show] do - member do - post :enqueue - end - end + resources :executions, only: %i[destroy] resources :jobs, only: %i[index show] do member do @@ -15,7 +11,14 @@ put :retry end end - resources :executions, only: %i[destroy] + + resources :cron_entries, only: %i[index show] do + member do + post :enqueue + end + end + + resources :processes, only: %i[index] scope controller: :assets do constraints(format: :css) do diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index 2eb3c30c5..4b165c945 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -21,6 +21,11 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.timestamp :cron_at end + create_table :good_job_processes, id: :uuid do |t| + t.timestamps + t.jsonb :state + end + add_index :good_jobs, :scheduled_at, where: "(finished_at IS NULL)", name: "index_good_jobs_on_scheduled_at" add_index :good_jobs, [:queue_name, :scheduled_at], where: "(finished_at IS NULL)", name: :index_good_jobs_on_queue_name_and_scheduled_at add_index :good_jobs, [:active_job_id, :created_at], name: :index_good_jobs_on_active_job_id_and_created_at diff --git a/lib/generators/good_job/templates/update/migrations/04_create_good_job_processes.rb.erb b/lib/generators/good_job/templates/update/migrations/04_create_good_job_processes.rb.erb new file mode 100644 index 000000000..0e6fed9b1 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/04_create_good_job_processes.rb.erb @@ -0,0 +1,19 @@ +# frozen_string_literal: true +class CreateGoodJobProcesses < ActiveRecord::Migration<%= migration_version %> + def change + enable_extension 'pgcrypto' + + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.table_exists?(:good_job_processes) + end + end + + create_table :good_job_processes, id: :uuid do |t| + t.timestamps + t.jsonb :state + end + end +end diff --git a/lib/good_job/active_job_job.rb b/lib/good_job/active_job_job.rb index a365f5e25..7b12680e8 100644 --- a/lib/good_job/active_job_job.rb +++ b/lib/good_job/active_job_job.rb @@ -4,10 +4,7 @@ module GoodJob # There is not a table in the database whose discrete rows represents "Jobs". # The +good_jobs+ table is a table of individual {GoodJob::Execution}s that share the same +active_job_id+. # A single row from the +good_jobs+ table of executions is fetched to represent an ActiveJobJob - # Parent class can be configured with +GoodJob.active_record_parent_class+. - # @!parse - # class ActiveJob < ActiveRecord::Base; end - class ActiveJobJob < Object.const_get(GoodJob.active_record_parent_class) + class ActiveJobJob < BaseRecord include Filterable include Lockable diff --git a/lib/good_job/assignable_connection.rb b/lib/good_job/assignable_connection.rb new file mode 100644 index 000000000..03a70a7c5 --- /dev/null +++ b/lib/good_job/assignable_connection.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true +module GoodJob + module AssignableConnection + extend ActiveSupport::Concern + + included do + cattr_accessor :_connection + end + + class_methods do + def connection=(conn) + self._connection = conn + end + + def connection + _connection || super + end + + def assign_connection(conn) + original_conn = _connection + self.connection = conn + yield + ensure + self._connection = original_conn + end + end + end +end diff --git a/lib/good_job/base_record.rb b/lib/good_job/base_record.rb new file mode 100644 index 000000000..efa2e8b27 --- /dev/null +++ b/lib/good_job/base_record.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true +module GoodJob + # Base ActiveRecord class that all GoodJob models inherit from. + # Parent class can be configured with +GoodJob.active_record_parent_class+. + # @!parse + # class BaseRecord < ActiveRecord::Base; end + class BaseRecord < Object.const_get(GoodJob.active_record_parent_class) + def self.migration_pending_warning! + ActiveSupport::Deprecation.warn(<<~DEPRECATION) + GoodJob has pending database migrations. To create the migration files, run: + rails generate good_job:update + To apply the migration files, run: + rails db:migrate + DEPRECATION + nil + end + end +end diff --git a/lib/good_job/current_thread.rb b/lib/good_job/current_thread.rb index b12c141dc..226812db7 100644 --- a/lib/good_job/current_thread.rb +++ b/lib/good_job/current_thread.rb @@ -68,7 +68,7 @@ def self.active_job_id # @return [Integer] Current process ID def self.process_id - Process.pid + ::Process.pid end # @return [String] Current thread name diff --git a/lib/good_job/daemon.rb b/lib/good_job/daemon.rb index 89c4734a4..b99502206 100644 --- a/lib/good_job/daemon.rb +++ b/lib/good_job/daemon.rb @@ -17,7 +17,7 @@ def initialize(pidfile:) # @return [void] def daemonize check_pid - Process.daemon + ::Process.daemon write_pid end @@ -25,7 +25,7 @@ def daemonize # @return [void] def write_pid - File.open(pidfile, ::File::CREAT | ::File::EXCL | ::File::WRONLY) { |f| f.write(Process.pid.to_s) } + File.open(pidfile, ::File::CREAT | ::File::EXCL | ::File::WRONLY) { |f| f.write(::Process.pid.to_s) } at_exit { File.delete(pidfile) if File.exist?(pidfile) } rescue Errno::EEXIST check_pid @@ -55,7 +55,7 @@ def pid_status(pidfile) pid = ::File.read(pidfile).to_i return :dead if pid.zero? - Process.kill(0, pid) # check process status + ::Process.kill(0, pid) # check process status :running rescue Errno::ESRCH :dead diff --git a/lib/good_job/execution.rb b/lib/good_job/execution.rb index adfab5fd8..e5a437c3e 100644 --- a/lib/good_job/execution.rb +++ b/lib/good_job/execution.rb @@ -1,10 +1,7 @@ # frozen_string_literal: true module GoodJob # ActiveRecord model that represents an +ActiveJob+ job. - # Parent class can be configured with +GoodJob.active_record_parent_class+. - # @!parse - # class Execution < ActiveRecord::Base; end - class Execution < Object.const_get(GoodJob.active_record_parent_class) + class Execution < BaseRecord include Lockable include Filterable @@ -54,16 +51,6 @@ def self.queue_parser(string) end end - def self._migration_pending_warning - ActiveSupport::Deprecation.warn(<<~DEPRECATION) - GoodJob has pending database migrations. To create the migration files, run: - rails generate good_job:update - To apply the migration files, run: - rails db:migrate - DEPRECATION - nil - end - # Get Jobs with given ActiveJob ID # @!method active_job_id # @!scope class @@ -224,7 +211,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false if @cron_at_index execution_args[:cron_at] = CurrentThread.cron_at else - _migration_pending_warning + migration_pending_warning! end elsif CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id execution_args[:cron_key] = CurrentThread.execution.cron_key diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index 9bac3dc08..54d4274ca 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true +require 'active_support/core_ext/module/attribute_accessors_per_thread' require 'concurrent/atomic/atomic_boolean' module GoodJob # :nodoc: @@ -10,6 +11,11 @@ module GoodJob # :nodoc: # When a message is received, the notifier passes the message to each of its recipients. # class Notifier + include ActiveSupport::Callbacks + define_callbacks :listen, :unlisten + + include Notifier::ProcessRegistration + # Raised if the Database adapter does not implement LISTEN. AdapterCannotListenError = Class.new(StandardError) @@ -43,6 +49,12 @@ class Notifier # @return [Array, nil] cattr_reader :instances, default: [], instance_reader: false + # @!attribute [rw] connection + # @!scope class + # ActiveRecord Connection that has been established for the Notifier. + # @return [ActiveRecord::ConnectionAdapters::AbstractAdapter, nil] + thread_cattr_accessor :connection + # Send a message via Postgres NOTIFY # @param message [#to_json] def self.notify(message) @@ -146,30 +158,36 @@ def create_executor def listen(delay: 0) future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening| - with_listen_connection do |conn| - ActiveSupport::Notifications.instrument("notifier_listen.good_job") do - conn.async_exec("LISTEN #{CHANNEL}").clear - end + with_connection do + begin + run_callbacks :listen do + ActiveSupport::Notifications.instrument("notifier_listen.good_job") do + connection.execute("LISTEN #{CHANNEL}") + end + thr_listening.make_true + end - ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - thr_listening.make_true - while thr_executor.running? - conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload| - next unless channel == CHANNEL - - ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) - parsed_payload = JSON.parse(payload, symbolize_names: true) - thr_recipients.each do |recipient| - target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] - target.send(method_name, parsed_payload) + ActiveSupport::Dependencies.interlock.permit_concurrent_loads do + while thr_executor.running? + wait_for_notify do |channel, payload| + next unless channel == CHANNEL + + ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) + parsed_payload = JSON.parse(payload, symbolize_names: true) + thr_recipients.each do |recipient| + target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] + target.send(method_name, parsed_payload) + end end end end end ensure - thr_listening.make_false - ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - conn.async_exec("UNLISTEN *").clear + run_callbacks :unlisten do + thr_listening.make_false + ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do + connection.execute("UNLISTEN *") + end end end end @@ -178,17 +196,27 @@ def listen(delay: 0) future.execute end - def with_listen_connection - ar_conn = Execution.connection_pool.checkout.tap do |conn| + def with_connection + self.connection = Execution.connection_pool.checkout.tap do |conn| Execution.connection_pool.remove(conn) end - pg_conn = ar_conn.raw_connection - raise AdapterCannotListenError unless pg_conn.respond_to? :wait_for_notify + connection.execute("SET application_name = #{connection.quote(self.class.name)}") - pg_conn.async_exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}").clear - yield pg_conn + yield ensure - ar_conn&.disconnect! + connection&.disconnect! + self.connection = nil + end + + def wait_for_notify + raw_connection = connection.raw_connection + if raw_connection.respond_to?(:wait_for_notify) + raw_connection.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload| + yield(channel, payload) + end + else + sleep WAIT_INTERVAL + end end end end diff --git a/lib/good_job/notifier/process_registration.rb b/lib/good_job/notifier/process_registration.rb new file mode 100644 index 000000000..79618e468 --- /dev/null +++ b/lib/good_job/notifier/process_registration.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module GoodJob + class Notifier + module ProcessRegistration + extend ActiveSupport::Concern + + included do + set_callback :listen, :after, :register_process + set_callback :unlisten, :after, :deregister_process + end + + def register_process + GoodJob::Process.assign_connection(connection) do + next unless Process.migrated? + + GoodJob::Process.cleanup + @process = GoodJob::Process.register + end + end + + def deregister_process + GoodJob::Process.assign_connection(connection) do + next unless Process.migrated? + + @process&.deregister + end + end + end + end +end diff --git a/lib/good_job/process.rb b/lib/good_job/process.rb new file mode 100644 index 000000000..d206146be --- /dev/null +++ b/lib/good_job/process.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true +require 'socket' + +module GoodJob + # ActiveRecord model that represents an GoodJob process (either async or CLI). + class Process < BaseRecord + include AssignableConnection + include Lockable + + self.table_name = 'good_job_processes' + + cattr_reader :mutex, default: Mutex.new + cattr_accessor :_current_id, default: nil + cattr_accessor :_pid, default: nil + + scope :active, -> { advisory_locked } + scope :inactive, -> { advisory_unlocked } + + def self.migrated? + return true if connection.table_exists?(table_name) + + migration_pending_warning! + false + end + + def self.current_id + mutex.synchronize do + if _current_id.nil? || _pid != ::Process.pid + self._current_id = SecureRandom.uuid + self._pid = ::Process.pid + end + _current_id + end + end + + def self.current_state + { + id: current_id, + hostname: Socket.gethostname, + pid: ::Process.pid, + proctitle: $PROGRAM_NAME, + schedulers: GoodJob::Scheduler.instances.map(&:name), + } + end + + def self.cleanup + inactive.delete_all + end + + def self.register + instance = new(id: current_id, state: current_state) + return unless instance.advisory_lock + + instance.save! + instance + end + + def deregister + return unless owns_advisory_lock? + + destroy! + advisory_unlock + end + end +end diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 5d14b178e..f1a68d374 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -59,6 +59,8 @@ def self.from_configuration(configuration, warm_cache_on_initialize: false) end end + attr_reader :name + # @param performer [GoodJob::JobPerformer] # @param max_threads [Numeric, nil] number of seconds between polls for jobs # @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory @@ -76,7 +78,8 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia @executor_options[:max_threads] = max_threads @executor_options[:max_queue] = max_threads end - @executor_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})" + @name = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})" + @executor_options[:name] = name create_executor warm_cache if warm_cache_on_initialize diff --git a/spec/lib/good_job/notifier_spec.rb b/spec/lib/good_job/notifier_spec.rb index 397b3b68d..5ad5789ab 100644 --- a/spec/lib/good_job/notifier_spec.rb +++ b/spec/lib/good_job/notifier_spec.rb @@ -48,4 +48,33 @@ expect(on_thread_error).to have_received(:call).at_least(:once).with instance_of(ExpectedError) end end + + describe 'Process tracking' do + it 'creates and destroys a new Process record' do + notifier = described_class.new + + wait_until { expect(GoodJob::Process.count).to eq 1 } + + process = GoodJob::Process.first + expect(process.id).to eq GoodJob::Process.current_id + expect(process).to be_advisory_locked + + notifier.shutdown + expect { process.reload }.to raise_error ActiveRecord::RecordNotFound + end + + context 'when, for some reason, the process already exists' do + it 'does not create a new process' do + process = GoodJob::Process.register + notifier = described_class.new + + wait_until { expect(notifier).to be_listening } + expect(GoodJob::Process.count).to eq 1 + + notifier.shutdown + expect(process.reload).to eq process + process.advisory_unlock + end + end + end end diff --git a/spec/lib/good_job/process_spec.rb b/spec/lib/good_job/process_spec.rb new file mode 100644 index 000000000..7519ef3ef --- /dev/null +++ b/spec/lib/good_job/process_spec.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true +require 'rails_helper' + +RSpec.describe GoodJob::Process do + describe '.id' do + it 'returns a uuid that does not change' do + value = described_class.current_id + expect(value).to be_present + + expect(described_class.current_id).to eq value + end + + it 'changes when the PID changes' do + allow(Process).to receive(:pid).and_return(1) + original_value = described_class.current_id + + allow(Process).to receive(:pid).and_return(2) + expect(described_class.current_id).not_to eq original_value + + # Unstub the pid or RSpec/DatabaseCleaner may fail + RSpec::Mocks.space.proxy_for(Process).reset + end + end + + describe '.register' do + it 'registers the process' do + process = nil + expect do + process = described_class.register + end.to change(described_class, :count).by(1) + + process.deregister + end + end + + describe '#deregister' do + it 'deregisters the record' do + process = described_class.register + expect { process.deregister }.to change(described_class, :count).by(-1) + end + end +end diff --git a/spec/test_app/db/migrate/20211230012344_create_good_job_processes.rb b/spec/test_app/db/migrate/20211230012344_create_good_job_processes.rb new file mode 100644 index 000000000..dea4f8674 --- /dev/null +++ b/spec/test_app/db/migrate/20211230012344_create_good_job_processes.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true +class CreateGoodJobProcesses < ActiveRecord::Migration[6.1] + def change + enable_extension 'pgcrypto' + + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.table_exists?(:good_job_processes) + end + end + + create_table :good_job_processes, id: :uuid do |t| + t.timestamps + t.jsonb :state + end + end +end diff --git a/spec/test_app/db/schema.rb b/spec/test_app/db/schema.rb index ec1340ac1..ed42a99f8 100644 --- a/spec/test_app/db/schema.rb +++ b/spec/test_app/db/schema.rb @@ -10,12 +10,18 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2021_10_11_221038) do +ActiveRecord::Schema.define(version: 2021_12_30_012344) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" + create_table "good_job_processes", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| + t.datetime "created_at", precision: 6, null: false + t.datetime "updated_at", precision: 6, null: false + t.jsonb "state" + end + create_table "good_jobs", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| t.text "queue_name" t.integer "priority"