Skip to content

Commit

Permalink
Add support for multiple DSL invocations (#3)
Browse files Browse the repository at this point in the history
* add support for multiple dsl invocations

* update README

* makes retry counter independent for the groups

* fix error handling
  • Loading branch information
moofkit authored May 30, 2024
1 parent 575efad commit 375861e
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 46 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ gem "rake", "~> 13.0"
gem "rspec", "~> 3.0"

gem "rubocop", "~> 1.21"
gem "rubocop-performance"
gem "rubocop-rake"
gem "rubocop-rspec"
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,31 @@
[![Build Status](https://github.com/moofkit/sidekiq-rescue/actions/workflows/main.yml/badge.svg?branch=main)](https://github.com/moofkit/sidekiq-rescue/actions/workflows/main.yml)

[Sidekiq](https://github.com/sidekiq/sidekiq) plugin to rescue jobs from expected errors and retry them later.
Catch expected errors and retry the job with a delay and a limit. It's useful when you want to retry jobs that failed due to expected errors and not spam your exception tracker with these errors. If the exception will getting raised beyond the limit, it will be re-raised and will be handled by Sidekiq standard retry mechanism.

Handlers are searched from bottom to top, and up the inheritance chain. The first handler that `exception.is_a?(klass)` holds true will be used.

## Example

```ruby
class MyJob
include Sidekiq::Job
include Sidekiq::Rescue::Dsl

sidekiq_rescue CustomAppException # defaults to 60 seconds delay and 10 retries
sidekiq_rescue AnotherCustomAppException, delay: ->(counter) { counter * 2 }
sidekiq_rescue CustomInfrastructureException, delay: 5.minutes
sidekiq_rescue ActiveRecord::Deadlocked, delay: 5.seconds, limit: 3
sidekiq_rescue Net::OpenTimeout, Timeout::Error, limit: 10 # retries at most 10 times for Net::OpenTimeout and Timeout::Error combined

def perform(*args)
# Might raise CustomAppException, AnotherCustomAppException, or YetAnotherCustomAppException for something domain specific
# Might raise ActiveRecord::Deadlocked when a local db deadlock is detected
# Might raise Net::OpenTimeout or Timeout::Error when the remote service is down
end
end
```


## Installation

Expand Down Expand Up @@ -224,7 +249,7 @@ end
## Motivation

Sidekiq provides a retry mechanism for jobs that failed due to unexpected errors. However, it does not provide a way to retry jobs that failed due to expected errors. This gem aims to fill this gap.
In addition, it provides a way to configure the number of retries and the delay between retries independently from the Sidekiq standard retry mechanism.
In addition, it provides a way to configure the number of retries and the delay between retries independently from the Sidekiq standard retry mechanism. Mostly inspired by [ActiveJob](https://edgeapi.rubyonrails.org/classes/ActiveJob/Exceptions/ClassMethods.html#method-i-retry_on)

## Supported Ruby versions

Expand Down
18 changes: 11 additions & 7 deletions lib/sidekiq/rescue/dsl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ module ClassMethods
# @raise [ArgumentError] if limit is not an Integer
# @example
# sidekiq_rescue NetworkError, delay: 60, limit: 10
def sidekiq_rescue(*error, delay: nil, limit: nil)
error = validate_and_unpack_error_argument(error)
def sidekiq_rescue(*errors, delay: Sidekiq::Rescue.config.delay, limit: Sidekiq::Rescue.config.limit)
unpacked_errors = validate_and_unpack_error_argument(errors)
validate_delay_argument(delay)
validate_limit_argument(limit)
assign_sidekiq_rescue_options(unpacked_errors, delay, limit)
end

self.sidekiq_rescue_options = {
error: error,
delay: delay || Sidekiq::Rescue.config.delay,
limit: limit || Sidekiq::Rescue.config.limit
}
def sidekiq_rescue_options_for(error)
sidekiq_rescue_options&.find { |k, _v| k.include?(error) }&.last
end

private
Expand Down Expand Up @@ -63,6 +62,11 @@ def validate_delay_argument(delay)
def validate_limit_argument(limit)
raise ArgumentError, "limit must be integer" if limit && !limit.is_a?(Integer)
end

def assign_sidekiq_rescue_options(errors, delay, limit)
self.sidekiq_rescue_options ||= {}
self.sidekiq_rescue_options.merge!(errors => { delay: delay, limit: limit })
end
end
end
# Alias for Dsl; TODO: remove in 1.0.0
Expand Down
17 changes: 11 additions & 6 deletions lib/sidekiq/rescue/rspec/matchers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ module Matchers
end

match do |actual|
actual.is_a?(Class) &&
actual.include?(Sidekiq::Rescue::Dsl) &&
actual.respond_to?(:sidekiq_rescue_options) &&
Array(actual&.sidekiq_rescue_options&.[](:error)).include?(expected) &&
(@delay.nil? || actual.sidekiq_rescue_options[:delay] == @delay) &&
(@limit.nil? || actual.sidekiq_rescue_options[:limit] == @limit)
matched = actual.is_a?(Class) &&
actual.include?(Sidekiq::Rescue::Dsl) &&
actual.respond_to?(:sidekiq_rescue_options) &&
actual&.sidekiq_rescue_options&.keys&.flatten&.include?(expected)

return false unless matched

options = actual.sidekiq_rescue_options_for(expected)

(@delay.nil? || options.fetch(:delay) == @delay) &&
(@limit.nil? || options.fetch(:limit) == @limit)
end

match_when_negated do |actual|
Expand Down
38 changes: 25 additions & 13 deletions lib/sidekiq/rescue/server_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,39 @@ class ServerMiddleware

def call(job_instance, job_payload, _queue, &block)
job_class = job_instance.class
options = job_class.sidekiq_rescue_options if job_class.respond_to?(:sidekiq_rescue_options)
if options
sidekiq_rescue(job_payload, **options, &block)
if job_class.respond_to?(:sidekiq_rescue_options) && !job_class.sidekiq_rescue_options.nil?
sidekiq_rescue(job_payload, job_class, &block)
else
yield
end
end

private

def sidekiq_rescue(job_payload, delay:, limit:, error:, **)
def sidekiq_rescue(job_payload, job_class)
yield
rescue *error => e
rescue_counter = increment_rescue_counter(job_payload)
raise e if rescue_counter > limit
rescue StandardError => e
error_group, options = job_class.sidekiq_rescue_options.reverse_each.find do |error_group, _options|
Array(error_group).any? { |error| e.is_a?(error) }
end
raise e unless error_group

rescue_error(e, error_group, options, job_payload)
end

def rescue_error(error, error_group, options, job_payload)
delay, limit = options.fetch_values(:delay, :limit)
rescue_counter = increment_rescue_counter_for(error_group, job_payload)
raise error if rescue_counter > limit

reschedule_at = calculate_reschedule_time(delay, rescue_counter)
log_reschedule_info(rescue_counter, e, reschedule_at)
reschedule_job(job_payload, reschedule_at, rescue_counter)
log_reschedule_info(rescue_counter, error, reschedule_at)
reschedule_job(job_payload: job_payload, reschedule_at: reschedule_at, rescue_counter: rescue_counter,
error_group: error_group)
end

def increment_rescue_counter(job_payload)
rescue_counter = job_payload["sidekiq_rescue_counter"].to_i
def increment_rescue_counter_for(error_group, job_payload)
rescue_counter = job_payload.dig("sidekiq_rescue_exceptions_counter", error_group.to_s) || 0
rescue_counter += 1
rescue_counter
end
Expand All @@ -52,8 +62,10 @@ def log_reschedule_info(rescue_counter, error, reschedule_at)
"#{error.message}; rescheduling at #{reschedule_at}")
end

def reschedule_job(job_payload, reschedule_at, rescue_counter)
Sidekiq::Client.push(job_payload.merge("at" => reschedule_at, "sidekiq_rescue_counter" => rescue_counter))
def reschedule_job(job_payload:, reschedule_at:, rescue_counter:, error_group:)
payload = job_payload.merge("at" => reschedule_at,
"sidekiq_rescue_exceptions_counter" => { error_group.to_s => rescue_counter })
Sidekiq::Client.push(payload)
end
end
end
Expand Down
5 changes: 4 additions & 1 deletion spec/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ require:
- rubocop-rspec

RSpec/MultipleExpectations:
Max: 2
Max: 3

RSpec/ExampleLength:
Max: 15
20 changes: 15 additions & 5 deletions spec/sidekiq/rescue/dsl_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,35 @@ def define_dsl(...)
it "sets error and default options" do
define_dsl { sidekiq_rescue TestError }

expect(job_class.sidekiq_rescue_options).to eq(error: [TestError], delay: 60, limit: 10)
expect(job_class.sidekiq_rescue_options).to eq({ [TestError] => { delay: 60, limit: 10 } })
end

it "sets the error classes" do
define_dsl { sidekiq_rescue TestError, ParentError, ChildError }

expect(job_class.sidekiq_rescue_options[:error]).to eq([TestError, ParentError, ChildError])
expect(job_class.sidekiq_rescue_options.keys).to eq([[TestError, ParentError, ChildError]])
expect(job_class.sidekiq_rescue_options.values).to all(include(delay: 60, limit: 10))
end

it "supports multiple calls" do
define_dsl do
sidekiq_rescue TestError
sidekiq_rescue ParentError
end

expect(job_class.sidekiq_rescue_options.keys).to eq([[TestError], [ParentError]])
end

it "sets the delay" do
define_dsl { sidekiq_rescue TestError, delay: 10 }

expect(job_class.sidekiq_rescue_options[:delay]).to eq(10)
expect(job_class.sidekiq_rescue_options.dig([TestError], :delay)).to eq(10)
end

it "sets proc as the delay" do
define_dsl { sidekiq_rescue TestError, delay: ->(counter) { counter * 10 } }

expect(job_class.sidekiq_rescue_options[:delay]).to be_a(Proc)
expect(job_class.sidekiq_rescue_options.dig([TestError], :delay)).to be_a(Proc)
end

it "raises an ArgumentError if delay proc has no arguments" do
Expand All @@ -47,7 +57,7 @@ def define_dsl(...)
it "sets the limit" do
define_dsl { sidekiq_rescue TestError, limit: 5 }

expect(job_class.sidekiq_rescue_options[:limit]).to eq(5)
expect(job_class.sidekiq_rescue_options.dig([TestError], :limit)).to eq(5)
end

it "raises ArgumentError if there are no arguments" do
Expand Down
47 changes: 39 additions & 8 deletions spec/sidekiq/rescue/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,23 @@

it "reschedules the job with correct arguments" do
perform_async
scheduled_job = last_job
expect(scheduled_job["args"]).to eq(args)
expect(scheduled_job["sidekiq_rescue_counter"]).to eq(1)
expect(last_job["args"]).to eq(args)
end

it "reschedules the job with correct arguments and delay" do
perform_async

delay = job_class.sidekiq_rescue_options[:delay]
expect(last_job["at"]).to be_within(10).of(Time.now.to_f + delay)
expect(last_job["at"]).to be_within(10).of(Time.now.to_f + 60)
end

it "increments the counter" do
perform_async

expect(last_job["sidekiq_rescue_counter"]).to eq(1)
expect(last_job["sidekiq_rescue_exceptions_counter"]).to eq("[TestError]" => 1)
end

it "raises an error if the counter is greater than the limit" do
limit = job_class.sidekiq_rescue_options[:limit]
limit = 10

job_class.perform_async(*args)
limit.times { job_class.perform_one }
Expand All @@ -57,7 +54,7 @@
end

context "with multiple errors" do
let(:job_class) { WithMultipleErrorsJob }
let(:job_class) { WithGroupErrorsJob }

it "rescues the expected error" do
expect { perform_async }.not_to raise_error
Expand Down Expand Up @@ -93,4 +90,38 @@
expect(last_job["at"]).to be_within(10).of(Time.now.to_f + 10)
end
end

context "with multiple errors and delay" do
let(:job_class) { WithMultipleErrorsAndDelayJob }

context "with TestError" do
let(:args) { "TestError" }

it "reschedules the job with correct delay" do
expect { perform_async }.not_to raise_error
expect(last_job["at"]).to be_within(10).of(Time.now.to_f + 10)
expect(last_job["sidekiq_rescue_exceptions_counter"]).to eq("[TestError]" => 1)
end
end

context "with ParentError" do
let(:args) { "ParentError" }

it "reschedules the job with correct delay" do
expect { perform_async }.not_to raise_error
expect(last_job["at"]).to be_within(10).of(Time.now.to_f + 20)
expect(last_job["sidekiq_rescue_exceptions_counter"]).to eq("[ParentError]" => 1)
end
end

context "with ChildError" do
let(:args) { "ChildError" }

it "reschedules the job with correct delay" do
expect { perform_async }.not_to raise_error
expect(last_job["at"]).to be_within(10).of(Time.now.to_f + 30)
expect(last_job["sidekiq_rescue_exceptions_counter"]).to eq("[ChildError]" => 1)
end
end
end
end
2 changes: 1 addition & 1 deletion spec/sidekiq/rescue/rspec/matchers_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
end

context "with multiple errors" do
let(:job_class) { WithMultipleErrorsJob }
let(:job_class) { WithGroupErrorsJob }

it "matches TestError" do
expect(job_class).to have_sidekiq_rescue(TestError)
Expand Down
26 changes: 23 additions & 3 deletions spec/sidekiq/rescue/server_middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

call_with_expected_error
expect(Sidekiq::Client).to have_received(:push).with(
job_payload.merge("at" => be_within(10).of(Time.now.to_f + 60), "sidekiq_rescue_counter" => 1)
job_payload.merge("at" => be_within(10).of(Time.now.to_f + 60),
"sidekiq_rescue_exceptions_counter" => { "[TestError]" => 1 })
)
end

Expand All @@ -45,7 +46,7 @@
"queue" => "default",
"jid" => "123",
"created_at" => Time.now.to_f,
"sidekiq_rescue_counter" => 1
"sidekiq_rescue_exceptions_counter" => { "[TestError]" => 1 }
}
end

Expand All @@ -54,7 +55,8 @@

call_with_expected_error_and_delay_proc
expect(Sidekiq::Client).to have_received(:push).with(
job_payload.merge("at" => be_within(20).of(Time.now.to_f + (10 * 2)), "sidekiq_rescue_counter" => 2)
job_payload.merge("at" => be_within(20).of(Time.now.to_f + (10 * 2)),
"sidekiq_rescue_exceptions_counter" => { "[TestError]" => 2 })
)
end
end
Expand All @@ -77,4 +79,22 @@
expect { call_with_unexpected_error }.to raise_error(UnexpectedError)
end
end

context "with grouped errors" do
subject(:call_with_multiple_errors) do
middleware.call(job_instance, job_payload, "default") { raise TestError }
end

let(:job_instance) { WithGroupErrorsJob.new }

it "reschedules the job on expected error and increments counter" do
allow(Sidekiq::Client).to receive(:push)

call_with_multiple_errors
expect(Sidekiq::Client).to have_received(:push).with(
job_payload.merge("at" => be_within(10).of(Time.now.to_f + 60),
"sidekiq_rescue_exceptions_counter" => { "[TestError, ParentError, ChildError]" => 1 })
)
end
end
end
12 changes: 11 additions & 1 deletion spec/support/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def perform(*)
end
end

class WithMultipleErrorsJob < BaseJob
class WithGroupErrorsJob < BaseJob
sidekiq_rescue TestError, ParentError, ChildError

def perform(*)
Expand All @@ -43,4 +43,14 @@ def perform(*)
end
end

class WithMultipleErrorsAndDelayJob < BaseJob
sidekiq_rescue TestError, delay: 10, limit: 5
sidekiq_rescue ParentError, delay: 20, limit: 10
sidekiq_rescue ChildError, delay: 30, limit: 15

def perform(error_class)
raise Object.const_get(error_class)
end
end

ChildJobWithExpectedError = Class.new(WithTestErrorJob)

0 comments on commit 375861e

Please sign in to comment.