Skip to content

Commit

Permalink
Add concurrency extension for ActiveJob
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Jul 2, 2021
1 parent 2b16bda commit 69b42bb
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 8 deletions.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla
- [Configuration options](#configuration-options)
- [Global options](#global-options)
- [Dashboard](#dashboard)
- [ActiveJob Concurrency](#activejob-concurrency)
- [Updating](#updating)
- [Go deeper](#go-deeper)
- [Exceptions, retries, and reliability](#exceptions-retries-and-reliability)
Expand Down Expand Up @@ -319,6 +320,35 @@ GoodJob includes a Dashboard as a mountable `Rails::Engine`.
end
```
### ActiveJob Concurrency
GoodJob can extend ActiveJob to provide limits on concurrently running jobs, either at time of _enqueue_ or at _perform_.
**Note:** Limiting concurrency at _enqueue_ requires Rails 6.0+ because Rails 5.2 does not support `throw :abort` in ActiveJob callbacks.
```ruby
class MyJob < ApplicationJob
include GoodJob::ActiveJobExtensions::Concurrency
good_job_control_concurrency_with(
# Maximum number of jobs with the concurrency key to be concurrently enqueued
enqueue_limit: 2,
# Maximum number of jobs with the concurrency key to be concurrently performed
perform_limit: 1,
# A unique key to be globally locked against.
# Can be String or Lambda/Proc that is invoked in the context of the job.
# Note: Arguments passed to #perform_later must be accessed through `arguments` method.
key: -> { "Unique-#{arguments.first}" } # MyJob.perform_later("Alice") => "Unique-Alice"
)
def perform(first_name)
# do work
end
end
```
### Updating
GoodJob follows semantic versioning, though updates may be encouraged through deprecation warnings in minor versions.
Expand Down
4 changes: 4 additions & 0 deletions lib/good_job/active_job_extensions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module GoodJob
module ActiveJobExtensions
end
end
68 changes: 68 additions & 0 deletions lib/good_job/active_job_extensions/concurrency.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
module GoodJob
module ActiveJobExtensions
module Concurrency
extend ActiveSupport::Concern

ConcurrencyExceededError = Class.new(StandardError)

included do
class_attribute :good_job_concurrency_config, instance_accessor: false, default: {}

before_enqueue do |job|
# Always allow jobs to be retried because the current job's execution will complete momentarily
next if CurrentExecution.active_job_id == job.job_id

limit = job.class.good_job_concurrency_config.fetch(:enqueue_limit, Float::INFINITY)
next if limit.blank? || (0...Float::INFINITY).exclude?(limit)

key = job.good_job_concurrency_key
next if key.blank?

GoodJob::Job.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do
# TODO: Why is `unscoped` necessary? Nested scope is bleeding into subsequent query?
enqueue_concurrency = GoodJob::Job.unscoped.where(concurrency_key: key).unfinished.count
# The job has not yet been enqueued, so check if adding it will go over the limit
throw :abort if enqueue_concurrency + 1 > limit
end
end

retry_on(
GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError,
attempts: Float::INFINITY,
wait: :exponentially_longer
)

before_perform do |job|
limit = job.class.good_job_concurrency_config.fetch(:perform_limit, Float::INFINITY)
next if limit.blank? || (0...Float::INFINITY).exclude?(limit)

key = job.good_job_concurrency_key
next if key.blank?

GoodJob::Job.new.with_advisory_lock(key: key, function: "pg_advisory_lock") do
perform_concurrency = GoodJob::Job.unscoped.where(concurrency_key: key).advisory_locked.count
# The current job has already been locked and will appear in the previous query
raise GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError if perform_concurrency > limit
end
end
end

class_methods do
def good_job_control_concurrency_with(config)
self.good_job_concurrency_config = config
end
end

def good_job_concurrency_key
key = self.class.good_job_concurrency_config[:key]
return if key.blank?

if key.respond_to? :call
instance_exec(&key)
else
key
end
end
end
end
end
17 changes: 12 additions & 5 deletions lib/good_job/current_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,30 @@ module GoodJob
# Thread-local attributes for passing values from Instrumentation.
# (Cannot use ActiveSupport::CurrentAttributes because ActiveJob resets it)
module CurrentExecution
# @!attribute [rw] error_on_retry
# @!attribute [rw] active_job_id
# @!scope class
# Error captured by retry_on
# @return [Exception, nil]
thread_mattr_accessor :error_on_retry
# ActiveJob ID
# @return [String, nil]
thread_mattr_accessor :active_job_id

# @!attribute [rw] error_on_discard
# @!scope class
# Error captured by discard_on
# @return [Exception, nil]
thread_mattr_accessor :error_on_discard

# @!attribute [rw] error_on_retry
# @!scope class
# Error captured by retry_on
# @return [Exception, nil]
thread_mattr_accessor :error_on_retry

# Resets attributes
# @return [void]
def self.reset
self.error_on_retry = nil
self.active_job_id = nil
self.error_on_discard = nil
self.error_on_retry = nil
end

# @return [Integer] Current process ID
Expand Down
20 changes: 20 additions & 0 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,21 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
DEPRECATION
end

if column_names.include?('concurrency_key')
good_job_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)
else
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
GoodJob has pending database migrations. To create the migration files, run:
rails generate good_job:update
To apply the migration files, run:
rails db:migrate
DEPRECATION
end

good_job = GoodJob::Job.new(**good_job_args)

instrument_payload[:good_job] = good_job
Expand Down Expand Up @@ -264,6 +279,10 @@ def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
end

def active_job_id
super || serialized_params['job_id']
end

private

# @return [ExecutionResult]
Expand All @@ -273,6 +292,7 @@ def execute
)

GoodJob::CurrentExecution.reset
GoodJob::CurrentExecution.active_job_id = active_job_id
ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do
value = ActiveJob::Base.execute(params)

Expand Down
13 changes: 10 additions & 3 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,16 @@ def pg_or_jdbc_query(query)
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @return [Boolean] whether the lock was acquired.
def advisory_lock(key: lockable_key, function: advisory_lockable_function)
query = <<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS locked
SQL
query = if function.include? "_try_"
<<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS locked
SQL
else
<<~SQL.squish
SELECT #{function}(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked
SQL
end

binds = [[nil, key]]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).first['locked']
end
Expand Down
64 changes: 64 additions & 0 deletions spec/lib/good_job/active_job_extensions/concurrency_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
require 'rails_helper'

RSpec.describe GoodJob::ActiveJobExtensions::Concurrency do
before do
ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)

stub_const 'TestJob', (Class.new(ActiveJob::Base) do
include GoodJob::ActiveJobExtensions::Concurrency

good_job_control_concurrency_with(
enqueue_limit: 2,
perform_limit: 1,
key: -> { arguments.first[:name] }
)

def perform(name:)
name && sleep(1)
end
end)
end

describe '.good_job_control_concurrency_with' do
describe 'enqueue_limit' do
it "does not enqueue if enqueue concurrency limit is exceeded for a particular key" do
expect(TestJob.perform_later(name: "Alice")).to be_present
expect(TestJob.perform_later(name: "Alice")).to be_present

# Third usage of key does not enqueue
expect(TestJob.perform_later(name: "Alice")).to eq false

# Usage of different key does enqueue
expect(TestJob.perform_later(name: "Bob")).to be_present

expect(GoodJob::Job.where(concurrency_key: "Alice").count).to eq 2
expect(GoodJob::Job.where(concurrency_key: "Bob").count).to eq 1
end
end

describe 'perform_limit' do
before do
allow(GoodJob).to receive(:preserve_job_records).and_return(true)
end

it "will error and retry jobs if concurrency is exceeded" do
TestJob.perform_later(name: "Alice")
TestJob.perform_later(name: "Alice")
TestJob.perform_later(name: "Bob")

performer = GoodJob::JobPerformer.new('*')
scheduler = GoodJob::Scheduler.new(performer, max_threads: 5)
5.times { scheduler.create_thread }

sleep_until(max: 10, increments_of: 0.5) do
GoodJob::Job.where(concurrency_key: "Alice").finished.count >= 1 &&
GoodJob::Job.where(concurrency_key: "Bob").finished.count == 1
end
scheduler.shutdown

expect(GoodJob::Job.count).to be > 3
expect(GoodJob::Job.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present
end
end
end
end
15 changes: 15 additions & 0 deletions spec/lib/good_job/current_execution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,19 @@
expect(described_class.error_on_retry).to eq nil
end
end

describe '.active_job_id' do
it 'is assignable, thread-safe, and resettable' do
described_class.active_job_id = 'duck'

Thread.new do
described_class.active_job_id = 'bear'
end.join

expect(described_class.active_job_id).to eq 'duck'

described_class.reset
expect(described_class.active_job_id).to eq nil
end
end
end
6 changes: 6 additions & 0 deletions spec/lib/good_job/lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@

job.advisory_unlock(key: "alternative")
end

it 'can lock alternative postgres functions' do
job.advisory_lock!(function: "pg_advisory_lock")
expect(job.advisory_locked?).to be true
job.advisory_unlock
end
end

describe '#advisory_unlock' do
Expand Down
4 changes: 4 additions & 0 deletions spec/support/rails_versions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
RSpec.configure do |c|
less_than_rails_6 = Gem::Version.new(Rails.version) < Gem::Version.new('6')
c.filter_run_excluding(:skip_rails_5) if less_than_rails_6
end

0 comments on commit 69b42bb

Please sign in to comment.