-
I came across this gem as I had a need to control the concurrency of jobs based on arguments. Incidentally, I had also been using https://github.com/veeqo/activejob-uniqueness to control situations where we might over-enqueue certain jobs. Basically the behavior of that gem is that it will simply not re-enqueue a job at all if it is still waiting to execute... it maintains a lock until execution and drops new attempts. A use case of this for us is mostly to control over-calling external services like Hubspot (ex: a lot of user actions could trigger updates), or say situations where we are updating counter cache columns that are expensive and can be triggered somewhat rapidly (ex: we give them a slight scheduling delay to start, and any additional ones get dropped until that finishes) Initially I thought this gem could solve both of my problems (and save me some further connections to Redis), but I think I might have had a misunderstanding of what the It seems like it will still enqueue as many jobs as are ultimately told to
I couldn't find anything that seemed to indicate that, so figured I would ask in case I was missing something or others had a similar question. Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 5 replies
-
sidekiq-throttled is only controls rate of execution. Not uniqueness. To solve the problem you have, in one of the projects I used redlock with debounced workers implementation. The problem we were solving was:
Debounced Workerslib/sidekiq/debounced_job.rb# file: lib/sidekiq/debounced_job.rb
module Sidekiq
# Implement cheap debounce logic using Sidekiq scheduled jobs and Redis.
module DebouncedJob
extend ActiveSupport::Concern
# Extra buffer in seconds to avoid possible time drifts between
# redis and elasticbeanstalk nodes. Even though chances of such
# drifts are close to none - it's better be safe than sorry.
JITTER = 3
DEFAULT_OPTIONS = {
# Delay in seconds
"wait" => 10
}.freeze
module PerformMethodOverride
def perform(*args)
super(*args) if self.class.debounce_ticket(*args).let_go?
end
end
class Ticket
RESERVE = RedisPrescription.new(File.read("#{__dir__}/debounced_job/reserve.lua")).freeze
ATTEMPT = RedisPrescription.new(File.read("#{__dir__}/debounced_job/attempt.lua")).freeze
def initialize(resource, wait:, max_wait:)
@resource = resource
@wait = wait
@max_wait = max_wait
end
def reserve
::Sidekiq.redis do |redis|
RESERVE.call(redis, keys: [@resource], argv: [@wait, @max_wait])
end
end
def let_go?
::Sidekiq.redis do |redis|
1 == ATTEMPT.call(redis, keys: [@resource], argv: [@wait, @max_wait]).to_i
end
end
end
class_methods do
def perform_async(*args)
debounce_ticket(*args).reserve
perform_in(debounce_wait + JITTER, *args)
end
def debounce_ticket(*args)
Ticket.new("debounce:#{name.underscore}:#{args.to_json}", wait: debounce_wait, max_wait: debounce_max_wait)
end
def debounce_options(opts = {})
opts = opts.transform_keys(&:to_s)
self.debounce_options_hash = (debounce_options_hash || DEFAULT_OPTIONS).merge(opts).freeze
end
def debounce_wait
(debounce_options_hash || DEFAULT_OPTIONS).fetch("wait").to_i
end
def debounce_max_wait
(debounce_wait * 1.1).ceil
end
end
included do
prepend PerformMethodOverride
sidekiq_class_attribute :debounce_options_hash
end
end
end lib/sidekiq/debounced_job/attempt.lua-- file: lib/sidekiq/debounced_job/attempt.lua
local timestamp = tonumber(redis.call("TIME")[1])
local attempt_at_key = KEYS[1] .. ":attempt_at"
local force_attempt_at_key = KEYS[1] .. ":force_attempt_at"
local last_attempt_at_key = KEYS[1] .. ":last_attempt_at"
local let_go = function ()
redis.call("DEL", attempt_at_key, force_attempt_at_key)
redis.call("SET", last_attempt_at_key, timestamp, "EX", ARGV[2])
return 1
end
local attempt_at = tonumber(redis.call("GET", attempt_at_key))
if attempt_at and attempt_at <= timestamp then
return let_go()
end
local force_attempt_at = tonumber(redis.call("GET", force_attempt_at_key))
if force_attempt_at and force_attempt_at <= timestamp then
return let_go()
end
-- Edge case, when worker died after it was admitted
if not attempt_at and not force_attempt_at then
local last_attempt_at = tonumber(redis.call("GET", last_attempt_at_key))
if not last_attempt_at or last_attempt_at + ARGV[1] <= timestamp then
return let_go()
end
end
return 0 lib/sidekiq/debounced_job/reserve.lua-- file: lib/sidekiq/debounced_job/reserve.lua
local timestamp = tonumber(redis.call("TIME")[1])
redis.call("SET", KEYS[1] .. ":attempt_at", timestamp + tonumber(ARGV[1]))
redis.call("SET", KEYS[1] .. ":force_attempt_at", timestamp + tonumber(ARGV[2]), "NX") Examplemodule IncrediblyUnreliableService
class SyncJob
include Sidekiq::Job
include Sidekiq::DebouncedJob
sidekiq_options queue: "lowest"
debounce_options wait: 5.minutes
def perform(external_id)
Redlock::Client
.new([Sidekiq.redis_pool])
.lock!("braze-sync:#{external_id}", 60_000) do
...
end
end
end
end Post-ScriptumWe are not using ActiveJob, so I'm not sure if the solution can be used as is in that case. I might refactor our code into a gem and make it ActiveJob compatible at some point. |
Beta Was this translation helpful? Give feedback.
-
Threshold rate-limiting implemented in sidekiq-throttled does following:
That might not be what you want in all cases. class MyJob
include Sidekiq::Job
include Sidekiq::Throttled::Job
sidekiq_throttle threshold: { limit: 10, period: 1.hour }
end That should make class MyJob
include Sidekiq::Job
include Sidekiq::Throttled::Job
sidekiq_throttle threshold: {
limit: 10,
period: 1.hour,
key_suffix: ->(user_id, *, **) { "my-job:#{user_id}" }
}
end This will ensure max 10 MyJob's per user per hour. Notice that when job is throttled it's pushed back to queue. And if jobs are draining much slower than they are enqueued you might get queue backing up... |
Beta Was this translation helpful? Give feedback.
sidekiq-throttled is only controls rate of execution. Not uniqueness. To solve the problem you have, in one of the projects I used redlock with debounced workers implementation.
The problem we were solving was:
Debounced Workers
lib/sidekiq/debounced_job.rb