Skip to content

Commit

Permalink
Track processes in the database and on the Dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Dec 30, 2021
1 parent b3fe7f8 commit a90be5e
Show file tree
Hide file tree
Showing 20 changed files with 387 additions and 68 deletions.
8 changes: 8 additions & 0 deletions engine/app/controllers/good_job/processes_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# frozen_string_literal: true
module GoodJob
class ProcessesController < GoodJob::BaseController
def index
@processes = GoodJob::Process.active.order(created_at: :desc) if GoodJob::Process.migrated?
end
end
end
40 changes: 40 additions & 0 deletions engine/app/views/good_job/processes/index.html.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<% if !GoodJob::Process.migrated? %>
<div class="card my-3">
<div class="card-body">
<p class="card-text">
<em>Feature unavailable because of pending database migration.</em>
</p>
</div>
</div>
<% elsif @processes.present? %>
<div class="card my-3">
<div class="table-responsive">
<table class="table card-table table-bordered table-hover table-sm mb-0">
<thead>
<tr>
<th>Process UUID</th>
<th>Created At</th></th>
<th>State</th>
</tr>
</thead>
<tbody>
<% @processes.each do |process| %>
<tr class="<%= dom_class(process) %>" id="<%= dom_id(process) %>">
<td><%= process.id %></td>
<td><%= relative_time(process.created_at) %></td>
<td><%= tag.pre JSON.pretty_generate(process.state) %></td>
</tr>
<% end %>
</tbody>
</table>
</div>
</div>
<% else %>
<div class="card my-3">
<div class="card-body">
<p class="card-text">
<em>No GoodJob processes found.</em>
</p>
</div>
</div>
<% end %>
15 changes: 3 additions & 12 deletions engine/app/views/layouts/good_job/base.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,14 @@
<li class="nav-item">
<%= link_to "Cron Schedules", cron_entries_path, class: ["nav-link", ("active" if current_page?(cron_entries_path))] %>
</li>
<li class="nav-item">
<%= link_to "Processes", processes_path, class: ["nav-link", ("active" if current_page?(processes_path))] %>
</li>
<li class="nav-item">
<div class="nav-link">
<span class="badge bg-secondary">More views coming soon</span>
</div>
</li>

<!-- Coming Soon
<li class="nav-item">
<%= link_to "Upcoming Jobs", 'todo', class: ["nav-link", ("active" if current_page?('todo'))] %>
</li>
<li class="nav-item">
<%= link_to "Finished Jobs", 'todo', class: ["nav-link", ("active" if current_page?('todo'))] %>
</li>
<li class="nav-item">
<%= link_to "Errored Jobs", 'todo', class: ["nav-link", ("active" if current_page?('todo'))] %>
</li>
-->
</ul>
<div class="text-muted" title="Now is <%= Time.current %>">Times are displayed in <%= Time.current.zone %> timezone</div>
</div>
Expand Down
15 changes: 9 additions & 6 deletions engine/config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@
GoodJob::Engine.routes.draw do
root to: 'executions#index'

resources :cron_entries, only: %i[index show] do
member do
post :enqueue
end
end
resources :executions, only: %i[destroy]

resources :jobs, only: %i[index show] do
member do
Expand All @@ -15,7 +11,14 @@
put :retry
end
end
resources :executions, only: %i[destroy]

resources :cron_entries, only: %i[index show] do
member do
post :enqueue
end
end

resources :processes, only: %i[index]

scope controller: :assets do
constraints(format: :css) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.timestamp :cron_at
end

create_table :good_job_processes, id: :uuid do |t|
t.timestamps
t.jsonb :state
end

add_index :good_jobs, :scheduled_at, where: "(finished_at IS NULL)", name: "index_good_jobs_on_scheduled_at"
add_index :good_jobs, [:queue_name, :scheduled_at], where: "(finished_at IS NULL)", name: :index_good_jobs_on_queue_name_and_scheduled_at
add_index :good_jobs, [:active_job_id, :created_at], name: :index_good_jobs_on_active_job_id_and_created_at
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true
class CreateGoodJobProcesses < ActiveRecord::Migration<%= migration_version %>
def change
enable_extension 'pgcrypto'

reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.table_exists?(:good_job_processes)
end
end

create_table :good_job_processes, id: :uuid do |t|
t.timestamps
t.jsonb :state
end
end
end
5 changes: 1 addition & 4 deletions lib/good_job/active_job_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ module GoodJob
# There is not a table in the database whose discrete rows represents "Jobs".
# The +good_jobs+ table is a table of individual {GoodJob::Execution}s that share the same +active_job_id+.
# A single row from the +good_jobs+ table of executions is fetched to represent an ActiveJobJob
# Parent class can be configured with +GoodJob.active_record_parent_class+.
# @!parse
# class ActiveJob < ActiveRecord::Base; end
class ActiveJobJob < Object.const_get(GoodJob.active_record_parent_class)
class ActiveJobJob < BaseRecord
include Filterable
include Lockable

Expand Down
28 changes: 28 additions & 0 deletions lib/good_job/assignable_connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true
module GoodJob
module AssignableConnection
extend ActiveSupport::Concern

included do
cattr_accessor :_connection
end

class_methods do
def connection=(conn)
self._connection = conn
end

def connection
_connection || super
end

def assign_connection(conn)
original_conn = _connection
self.connection = conn
yield
ensure
self._connection = original_conn
end
end
end
end
18 changes: 18 additions & 0 deletions lib/good_job/base_record.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true
module GoodJob
# Base ActiveRecord class that all GoodJob models inherit from.
# Parent class can be configured with +GoodJob.active_record_parent_class+.
# @!parse
# class BaseRecord < ActiveRecord::Base; end
class BaseRecord < Object.const_get(GoodJob.active_record_parent_class)
def self.migration_pending_warning!
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
GoodJob has pending database migrations. To create the migration files, run:
rails generate good_job:update
To apply the migration files, run:
rails db:migrate
DEPRECATION
nil
end
end
end
2 changes: 1 addition & 1 deletion lib/good_job/current_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def self.active_job_id

# @return [Integer] Current process ID
def self.process_id
Process.pid
::Process.pid
end

# @return [String] Current thread name
Expand Down
6 changes: 3 additions & 3 deletions lib/good_job/daemon.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ def initialize(pidfile:)
# @return [void]
def daemonize
check_pid
Process.daemon
::Process.daemon
write_pid
end

private

# @return [void]
def write_pid
File.open(pidfile, ::File::CREAT | ::File::EXCL | ::File::WRONLY) { |f| f.write(Process.pid.to_s) }
File.open(pidfile, ::File::CREAT | ::File::EXCL | ::File::WRONLY) { |f| f.write(::Process.pid.to_s) }
at_exit { File.delete(pidfile) if File.exist?(pidfile) }
rescue Errno::EEXIST
check_pid
Expand Down Expand Up @@ -55,7 +55,7 @@ def pid_status(pidfile)
pid = ::File.read(pidfile).to_i
return :dead if pid.zero?

Process.kill(0, pid) # check process status
::Process.kill(0, pid) # check process status
:running
rescue Errno::ESRCH
:dead
Expand Down
17 changes: 2 additions & 15 deletions lib/good_job/execution.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
# frozen_string_literal: true
module GoodJob
# ActiveRecord model that represents an +ActiveJob+ job.
# Parent class can be configured with +GoodJob.active_record_parent_class+.
# @!parse
# class Execution < ActiveRecord::Base; end
class Execution < Object.const_get(GoodJob.active_record_parent_class)
class Execution < BaseRecord
include Lockable
include Filterable

Expand Down Expand Up @@ -54,16 +51,6 @@ def self.queue_parser(string)
end
end

def self._migration_pending_warning
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
GoodJob has pending database migrations. To create the migration files, run:
rails generate good_job:update
To apply the migration files, run:
rails db:migrate
DEPRECATION
nil
end

# Get Jobs with given ActiveJob ID
# @!method active_job_id
# @!scope class
Expand Down Expand Up @@ -224,7 +211,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
if @cron_at_index
execution_args[:cron_at] = CurrentThread.cron_at
else
_migration_pending_warning
migration_pending_warning!
end
elsif CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
execution_args[:cron_key] = CurrentThread.execution.cron_key
Expand Down
78 changes: 53 additions & 25 deletions lib/good_job/notifier.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# frozen_string_literal: true
require 'active_support/core_ext/module/attribute_accessors_per_thread'
require 'concurrent/atomic/atomic_boolean'

module GoodJob # :nodoc:
Expand All @@ -10,6 +11,11 @@ module GoodJob # :nodoc:
# When a message is received, the notifier passes the message to each of its recipients.
#
class Notifier
include ActiveSupport::Callbacks
define_callbacks :listen, :unlisten

include Notifier::ProcessRegistration

# Raised if the Database adapter does not implement LISTEN.
AdapterCannotListenError = Class.new(StandardError)

Expand Down Expand Up @@ -43,6 +49,12 @@ class Notifier
# @return [Array<GoodJob::Notifier>, nil]
cattr_reader :instances, default: [], instance_reader: false

# @!attribute [rw] connection
# @!scope class
# ActiveRecord Connection that has been established for the Notifier.
# @return [ActiveRecord::ConnectionAdapters::AbstractAdapter, nil]
thread_cattr_accessor :connection

# Send a message via Postgres NOTIFY
# @param message [#to_json]
def self.notify(message)
Expand Down Expand Up @@ -146,30 +158,36 @@ def create_executor

def listen(delay: 0)
future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening|
with_listen_connection do |conn|
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
conn.async_exec("LISTEN #{CHANNEL}").clear
end
with_connection do
begin
run_callbacks :listen do
ActiveSupport::Notifications.instrument("notifier_listen.good_job") do
connection.execute("LISTEN #{CHANNEL}")
end
thr_listening.make_true
end

ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
thr_listening.make_true
while thr_executor.running?
conn.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
next unless channel == CHANNEL

ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
parsed_payload = JSON.parse(payload, symbolize_names: true)
thr_recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name, parsed_payload)
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
while thr_executor.running?
wait_for_notify do |channel, payload|
next unless channel == CHANNEL

ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload })
parsed_payload = JSON.parse(payload, symbolize_names: true)
thr_recipients.each do |recipient|
target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call]
target.send(method_name, parsed_payload)
end
end
end
end
end
ensure
thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
conn.async_exec("UNLISTEN *").clear
run_callbacks :unlisten do
thr_listening.make_false
ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do
connection.execute("UNLISTEN *")
end
end
end
end
Expand All @@ -178,17 +196,27 @@ def listen(delay: 0)
future.execute
end

def with_listen_connection
ar_conn = Execution.connection_pool.checkout.tap do |conn|
def with_connection
self.connection = Execution.connection_pool.checkout.tap do |conn|
Execution.connection_pool.remove(conn)
end
pg_conn = ar_conn.raw_connection
raise AdapterCannotListenError unless pg_conn.respond_to? :wait_for_notify
connection.execute("SET application_name = #{connection.quote(self.class.name)}")

pg_conn.async_exec("SET application_name = #{pg_conn.escape_identifier(self.class.name)}").clear
yield pg_conn
yield
ensure
ar_conn&.disconnect!
connection&.disconnect!
self.connection = nil
end

def wait_for_notify
raw_connection = connection.raw_connection
if raw_connection.respond_to?(:wait_for_notify)
raw_connection.wait_for_notify(WAIT_INTERVAL) do |channel, _pid, payload|
yield(channel, payload)
end
else
sleep WAIT_INTERVAL
end
end
end
end
Loading

0 comments on commit a90be5e

Please sign in to comment.