Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to allow ActiveJob to manage retries on app-level first #103

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,22 @@ Cloudtasker.configure do |config|
#
# config.max_retries = 10

#
# Specify whether ActiveJob's or CloudTask's retry mechanism should be used.
# - :provider => Use CloudTask's retry management.
# - :active_job => Rely on ActiveJob to manage retries, e.g. using `retry_on`.
#
# ActiveJob has its own mechanism for retries and will reschedule the job and prevent it
# from failing hard until a configured threshold is met. Then, the error is allowed to bubble up
# to the underlying queuing system (in this case CloudTask). To prevent CloudTask from retrying
# on top of ActiveJob, pass a (potentially empty) block to `retry_on`.
#
# When `:active_job` is chosen, `max_retries` will have no effect!
#
# Default: `:provider` (CloudTask)
#
# config.retry_mechanism = :active_job

#
# Specify the redis connection hash.
#
Expand Down
22 changes: 18 additions & 4 deletions lib/active_job/queue_adapters/cloudtasker_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class CloudtaskerAdapter
'priority' # Not used
].freeze

ACTIVE_JOB_RETRIAL_SERIALIZATION_FILTERED_KEYS =
SERIALIZATION_FILTERED_KEYS.without('executions').freeze

# Enqueues the given ActiveJob instance for execution
#
# @param job [ActiveJob::Base] The ActiveJob instance
Expand All @@ -41,7 +44,7 @@ def enqueue_at(job, precise_timestamp)
private

def build_worker(job)
job_serialization = job.serialize.except(*SERIALIZATION_FILTERED_KEYS)
job_serialization = job.serialize.except(*serialization_filtered_keys)

JobWrapper.new(
job_id: job_serialization.delete('job_id'),
Expand All @@ -50,6 +53,14 @@ def build_worker(job)
)
end

def serialization_filtered_keys
if Cloudtasker.config.retry_mechanism == :active_job
ACTIVE_JOB_RETRIAL_SERIALIZATION_FILTERED_KEYS
else
SERIALIZATION_FILTERED_KEYS
end
end

# == Job Wrapper for the Cloudtasker adapter
#
# Executes jobs scheduled by the Cloudtasker ActiveJob adapter
Expand All @@ -64,16 +75,19 @@ class JobWrapper # :nodoc:
# @return [any] The execution of the ActiveJob call
#
def perform(job_serialization, *_extra_options)
job_executions = job_retries < 1 ? 0 : (job_retries + 1)

job_serialization.merge!(
'job_id' => job_id,
'queue_name' => job_queue,
'provider_job_id' => task_id,
'executions' => job_executions,
'priority' => nil
)

# Overrides ActiveJob default retry counter with one that tracks Cloudtasker-managed retries
if Cloudtasker.config.retry_mechanism == :provider
job_executions = job_retries < 1 ? 0 : (job_retries + 1)
job_serialization.merge!('executions' => job_executions)
end

Base.execute job_serialization
end
end
Expand Down
15 changes: 14 additions & 1 deletion lib/cloudtasker/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Cloudtasker
class Config
attr_accessor :redis, :store_payloads_in_redis, :gcp_queue_prefix
attr_writer :secret, :gcp_location_id, :gcp_project_id,
:processor_path, :logger, :mode, :max_retries,
:processor_path, :logger, :mode, :max_retries, :retry_mechanism,
:dispatch_deadline, :on_error, :on_dead, :oidc

# Max Cloud Task size in bytes
Expand Down Expand Up @@ -113,6 +113,19 @@ def max_retries
@max_retries ||= DEFAULT_MAX_RETRY_ATTEMPTS
end

#
# Configures who is responsible for tracking and managing retries.
# Defaults to `:provider`.
# - :provider => Use CloudTask's retry management.
# - :active_job => Rely on ActiveJob first to manage retries, e.g. using `retry_on`.
# Ignores the CloudTask `RetryCount` header.
#
# @return [<Type>] <description>
#
def retry_mechanism
@retry_mechanism ||= :provider
end

#
# The operating mode.
# - :production => process tasks via GCP Cloud Task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,63 @@

subject(:worker) { described_class.new(**example_job_wrapper_args.merge(task_id: '00000001')) }

let(:example_unreconstructed_job_serialization) do
example_job_serialization.except('job_id', 'queue_name', 'provider_job_id', 'executions', 'priority')
let(:example_job_serialization) do
example_job.serialize.except('job_id', 'priority', 'queue_name', 'provider_job_id')
end

let(:example_reconstructed_job_serialization) do
example_job_serialization.merge(
'job_id' => worker.job_id,
'queue_name' => worker.job_queue,
'provider_job_id' => worker.task_id,
'executions' => 0,
'priority' => nil
)
context 'when the CloudTask retry mechanism is used' do
let(:example_unreconstructed_job_serialization) do
example_job_serialization.except('job_id', 'queue_name', 'provider_job_id', 'executions', 'priority')
end

let(:example_reconstructed_job_serialization) do
example_job_serialization.merge(
'job_id' => worker.job_id,
'queue_name' => worker.job_queue,
'provider_job_id' => worker.task_id,
'executions' => 0,
'priority' => nil
)
end

describe '#perform' do
it "calls 'ActiveJob::Base.execute' with the job serialization" do
expect(ActiveJob::Base).to receive(:execute).with(example_reconstructed_job_serialization)
worker.perform(example_unreconstructed_job_serialization)
end
end
end

describe '#perform' do
it "calls 'ActiveJob::Base.execute' with the job serialization" do
expect(ActiveJob::Base).to receive(:execute).with(example_reconstructed_job_serialization)
worker.perform(example_unreconstructed_job_serialization)
context 'when the ActiveJob retry mechanism is used' do
around do |example|
Cloudtasker.config.retry_mechanism = :active_job
example.call
Cloudtasker.config.retry_mechanism = :provider
end

before do
example_job.executions = 1
end

let(:example_unreconstructed_job_serialization) do
example_job_serialization.except('job_id', 'queue_name', 'provider_job_id', 'priority')
end

let(:example_reconstructed_job_serialization) do
example_job_serialization.merge(
'job_id' => worker.job_id,
'executions' => 1,
'queue_name' => worker.job_queue,
'provider_job_id' => worker.task_id,
'priority' => nil
)
end

describe '#perform' do
it "calls 'ActiveJob::Base.execute' with the job serialization" do
expect(ActiveJob::Base).to receive(:execute).with(example_reconstructed_job_serialization)
worker.perform(example_unreconstructed_job_serialization)
end
end
end
end
Expand Down
16 changes: 16 additions & 0 deletions spec/cloudtasker/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
let(:logger) { Logger.new(nil) }
let(:mode) { :production }
let(:max_retries) { 10 }
let(:retry_mechanism) { :provider }
let(:store_payloads_in_redis) { 10 }
let(:dispatch_deadline) { 15 * 60 }
let(:on_error) { ->(e, w) {} }
Expand Down Expand Up @@ -41,6 +42,7 @@
c.processor_host = processor_host
c.processor_path = processor_path
c.max_retries = max_retries
c.retry_mechanism = retry_mechanism
c.store_payloads_in_redis = store_payloads_in_redis
c.dispatch_deadline = dispatch_deadline
c.on_error = on_error
Expand Down Expand Up @@ -97,6 +99,20 @@
end
end

describe '#retry_mechanism' do
subject { config.retry_mechanism }

context 'with value specified via config' do
it { is_expected.to eq(retry_mechanism) }
end

context 'with no value' do
let(:retry_mechanism) { nil }

it { is_expected.to eq(:provider) }
end
end

describe '#mode' do
subject { config.mode }

Expand Down
Loading