Skip to content

Commit

Permalink
Introduce HireFire::Macro::Resque::Cache
Browse files Browse the repository at this point in the history
This change adds a caching mechanism to the Resque macro.

Since we have to scan the entire delayed set in Redis to acquire the sizes of the requested queues
containing jobs scheduled to run now, we can instead process the size of every queue associated with
the scanned jobs, whether requested or not, and cache it for a certain amount of time.

This approach allows us to avoid scanning the entire delayed set on subsequent requests within the
cache duration. Instead, we can access and aggregate the cached data. This will significantly
improve throughput for applications that have a large number of scheduled jobs and/or a large number
of Resque process types that need to be monitored.
  • Loading branch information
mrrooijen committed Jun 9, 2024
1 parent d6b0d40 commit 8929cf1
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 14 deletions.
66 changes: 61 additions & 5 deletions lib/hirefire/macro/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ def working_size(queues)
end

def scheduled_size(queues)
cached_result = cache.fetch(queues)
return cached_result if cached_result

cursor = 0
batch = 1000
total_size = 0
sizes = Hash.new(0)
current_time = Time.now.to_i

loop do
Expand Down Expand Up @@ -101,8 +105,9 @@ def scheduled_size(queues)

break if encoded_jobs.empty?

total_size += encoded_jobs.count do |encoded_job|
queues.include?(::Resque.decode(encoded_job)["queue"])
encoded_jobs.each do |encoded_job|
queue = ::Resque.decode(encoded_job)["queue"]
sizes[queue] += 1
end

break if encoded_jobs.size < batch
Expand All @@ -117,14 +122,65 @@ def scheduled_size(queues)
cursor += batch
end

total_size
if queues.empty?
total_size
else
cache.store(sizes)
cache.fetch(queues)
end
end

private

def registered_queues
::Resque.redis.keys("queue:*").map { |key| key[6..] }.to_set
end

class Cache
EXPIRY_TIME = 5 # seconds

def initialize
@sizes = Hash.new(0)
@cached_at = expired_time
end

def fetch(queues)
return nil if expired?

if queues.empty?
sizes.values.sum
else
sizes.values_at(*queues).sum
end
end

def store(sizes)
@sizes = Hash.new(0).merge(sizes)
@cached_at = current_time
end

def expire!
@cached_at = expired_time
end

private

attr_reader :sizes, :cached_at

def current_time
Time.now.to_i
end

def expired_time
current_time - EXPIRY_TIME
end

def expired?
current_time - cached_at >= EXPIRY_TIME
end
end

def cache
@cache ||= Cache.new
end
end
end
end
23 changes: 14 additions & 9 deletions test/hirefire/macro/test_resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@

class HireFire::Macro::ResqueTest < Minitest::Test
def setup
expire_cache!
Resque.redis = Redis.new(db: 15).tap(&:flushdb)
end

def teardown
Resque.redis.close
end

def expire_cache!
HireFire::Macro::Resque.send(:cache).expire!
end

def test_job_queue_latency_unsupported
assert_raises(HireFire::Errors::JobQueueLatencyUnsupportedError) do
HireFire::Macro::Resque.job_queue_latency
Expand Down Expand Up @@ -41,20 +46,20 @@ def test_job_queue_size_with_scheduled_jobs
Resque.enqueue_in_with_queue(:default, 300, BasicJob)
Resque.enqueue_in_with_queue(:mailer, 300, BasicJob)

assert_equal 0, HireFire::Macro::Resque.job_queue_size
assert_equal 0, HireFire::Macro::Resque.job_queue_size # uncached

Timecop.freeze(Time.now + 200) do
assert_equal 1, HireFire::Macro::Resque.job_queue_size
assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default)
assert_equal 0, HireFire::Macro::Resque.job_queue_size(:mailer)
assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default, :mailer)
assert_equal 1, HireFire::Macro::Resque.job_queue_size # uncached
assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default) # uncached
assert_equal 0, HireFire::Macro::Resque.job_queue_size(:mailer) # cached
assert_equal 1, HireFire::Macro::Resque.job_queue_size(:default, :mailer) # cached
end

Timecop.freeze(Time.now + 400) do
assert_equal 3, HireFire::Macro::Resque.job_queue_size
assert_equal 2, HireFire::Macro::Resque.job_queue_size(:default)
assert_equal 1, HireFire::Macro::Resque.job_queue_size(:mailer)
assert_equal 3, HireFire::Macro::Resque.job_queue_size(:default, :mailer)
assert_equal 2, HireFire::Macro::Resque.job_queue_size(:default) # expired
assert_equal 1, HireFire::Macro::Resque.job_queue_size(:mailer) # cached
assert_equal 3, HireFire::Macro::Resque.job_queue_size(:default, :mailer) # cached
assert_equal 3, HireFire::Macro::Resque.job_queue_size # cached
end
end

Expand Down

0 comments on commit 8929cf1

Please sign in to comment.