Skip to content

Commit

Permalink
make batch progress calculation counter-based
Browse files Browse the repository at this point in the history
  • Loading branch information
alachaum committed Jun 24, 2024
1 parent 97f7c30 commit 0bc8792
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ AllCops:
- 'vendor/**/*'

Metrics/ClassLength:
Max: 200
Max: 300

Metrics/ModuleLength:
Max: 150
Expand Down
32 changes: 18 additions & 14 deletions lib/cloudtasker/batch/batch_progress.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,28 @@ module Cloudtasker
module Batch
# Capture the progress of a batch
class BatchProgress
attr_reader :batch_state
attr_reader :batches

#
# Build a new instance of the class.
#
# @param [Hash] batch_state The batch state
# @param [Array<Cloudtasker::Batch::Job>] batches The batches to consider
#
def initialize(batch_state = {})
@batch_state = batch_state
def initialize(batches = [])
@batches = batches
end

# Count the number of items in a given status

#
# Count the number of items in a given status
#
# @param [String] status The status to count
#
# @return [Integer] The number of jobs in the status
#
def count(status = 'all')
batches.sum { |e| e.batch_state_count(status) }
end

#
Expand Down Expand Up @@ -122,16 +135,7 @@ def percent(min_total: 0, smoothing: 0)
# @return [Cloudtasker::Batch::BatchProgress] The sum of the two batch progresses.
#
def +(other)
self.class.new(batch_state.to_h.merge(other.batch_state.to_h))
end

private

# Count the number of items in a given status
def count(status = nil)
return batch_state.to_h.keys.size unless status

batch_state.to_h.values.count { |e| e == status }
self.class.new(batches + other.batches)
end
end
end
Expand Down
78 changes: 68 additions & 10 deletions lib/cloudtasker/batch/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Job
# means that the job will never succeed. There is no point in blocking
# the batch forever so we proceed forward eventually.
#
BATCH_STATUSES = %w[scheduled processing completed errored dead all].freeze
COMPLETION_STATUSES = %w[completed dead].freeze

# These callbacks do not need to raise errors on their own
Expand Down Expand Up @@ -182,6 +183,25 @@ def batch_state_gid
key("#{STATES_NAMESPACE}/#{batch_id}")
end

#
# Return the key under which the batch progress is stored
# for a specific state.
#
# @return [String] The batch progress state namespaced id.
#
def batch_state_count_gid(state)
"#{batch_state_gid}/state_count/#{state}"
end

#
# Return the number of jobs in a given state
#
# @return [String] The batch progress state namespaced id.
#
def batch_state_count(state)
redis.get(batch_state_count_gid(state)).to_i
end

#
# The list of jobs to be enqueued in the batch
#
Expand Down Expand Up @@ -258,6 +278,28 @@ def migrate_batch_state_to_redis_hash
end
end

#
# This method initializes the batch job counters if not set already
#
def migrate_progress_stats_to_redis_counters
# Abort if counters have already been set. The 'all' counter acts as a feature flag.
return if redis.exists?(batch_state_count_gid('all'))

# Get all job states
values = batch_state.values

# Count by value
redis.multi do |m|
# Per status
values.tally.each do |k, v|
m.set(batch_state_count_gid(k), v)
end

# All counter
m.set(batch_state_count_gid('all'), values.size)
end
end

#
# Save serialized version of the worker.
#
Expand All @@ -278,8 +320,17 @@ def save
def update_state(batch_id, status)
migrate_batch_state_to_redis_hash

# Get current status
current_status = redis.hget(batch_state_gid, batch_id)
return if current_status == status.to_s

# Update the batch state batch_id entry with the new status
redis.hset(batch_state_gid, batch_id, status)
# and update counters
redis.multi do |m|
m.hset(batch_state_gid, batch_id, status)
m.decr(batch_state_count_gid(current_status))
m.incr(batch_state_count_gid(status))
end
end

#
Expand Down Expand Up @@ -385,8 +436,11 @@ def cleanup
redis.hkeys(batch_state_gid).each { |id| self.class.find(id)&.cleanup }

# Delete batch redis entries
redis.del(batch_gid)
redis.del(batch_state_gid)
redis.multi do |m|
m.del(batch_gid)
m.del(batch_state_gid)
BATCH_STATUSES.each { |e| m.del(batch_state_count_gid(e)) }
end
end

#
Expand All @@ -400,17 +454,17 @@ def cleanup
def progress(depth: 0)
depth = depth.to_i

# Capture batch state
state = batch_state
# Initialize counters from batch state. This is only applicable to running batches
# that started before the counter-based progress was implemented/released.
migrate_progress_stats_to_redis_counters

# Return immediately if we do not need to go down the tree
return BatchProgress.new(state) if depth <= 0
return BatchProgress.new([self]) if depth <= 0

# Sum batch progress of current batch and sub-batches up to the specified
# depth
state.to_h.reduce(BatchProgress.new(state)) do |memo, (child_id, child_status)|
memo + (self.class.find(child_id)&.progress(depth: depth - 1) ||
BatchProgress.new(child_id => child_status))
batch_state.to_h.reduce(BatchProgress.new([self])) do |memo, (child_id, _)|
memo + (self.class.find(child_id)&.progress(depth: depth - 1) || BatchProgress.new)
end
end

Expand All @@ -432,7 +486,11 @@ def schedule_pending_jobs
# having never-ending batches - which could occur if a batch was crashing
# while enqueuing children due to a OOM error and since 'scheduled' is a
# blocking status.
redis.hsetnx(batch_state_gid, j.job_id, 'scheduled')
redis.multi do |m|
m.hsetnx(batch_state_gid, j.job_id, 'scheduled')
m.incr(batch_state_count_gid('scheduled'))
m.incr(batch_state_count_gid('all'))
end

# Flag job as enqueued
ret_list << j
Expand Down
115 changes: 58 additions & 57 deletions spec/cloudtasker/batch/batch_progress_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,118 +3,119 @@
require 'cloudtasker/batch/middleware'

RSpec.describe Cloudtasker::Batch::BatchProgress do
let(:batch_state) do
{
'1' => 'completed',
'2' => 'scheduled',
'3' => 'processing',
'4' => 'errored',
'5' => 'dead'
}
end
let(:batch_progress) { described_class.new(batch_state) }
let(:batch) { instance_double(Cloudtasker::Batch::Job) }
let(:batch_progress) { described_class.new([batch]) }

describe '.new' do
subject { batch_progress }

it { is_expected.to have_attributes(batch_state: batch_state) }
it { is_expected.to have_attributes(batches: [batch]) }
end

describe '#total' do
subject { batch_progress.total }
describe '#count' do
subject { batch_progress.count(*args) }

it { is_expected.to eq(batch_state.keys.count) }
end
let(:args) { [] }
let(:count) { 18 }

describe '#completed' do
subject { batch_progress.completed }

it { is_expected.to eq(batch_state.values.count { |e| e == 'completed' }) }
end
context 'with no args' do
before { allow(batch).to receive(:batch_state_count).with('all').and_return(count) }
it { is_expected.to eq(count) }
end

describe '#scheduled' do
subject { batch_progress.scheduled }
context 'with status provided' do
let(:args) { ['processing'] }

it { is_expected.to eq(batch_state.values.count { |e| e == 'scheduled' }) }
before { allow(batch).to receive(:batch_state_count).with(args[0]).and_return(count) }
it { is_expected.to eq(count) }
end
end

describe '#errored' do
subject { batch_progress.errored }

it { is_expected.to eq(batch_state.values.count { |e| e == 'errored' }) }
end
describe '#total' do
subject { batch_progress.total }

describe '#dead' do
subject { batch_progress.dead }
let(:count) { 18 }

it { is_expected.to eq(batch_state.values.count { |e| e == 'dead' }) }
before { allow(batch_progress).to receive(:count).and_return(count) }
it { is_expected.to eq(count) }
end

describe '#processing' do
subject { batch_progress.processing }
%w[scheduled processing completed errored dead].each do |tested_status|
describe "##{tested_status}" do
subject { batch_progress.send(tested_status) }

let(:count) { 18 }

it { is_expected.to eq(batch_state.values.count { |e| e == 'processing' }) }
before { allow(batch_progress).to receive(:count).with(tested_status).and_return(count) }
it { is_expected.to eq(count) }
end
end

describe '#pending' do
subject { batch_progress.pending }

it { is_expected.to eq(batch_state.values.count { |e| !%w[dead completed].include?(e) }) }
let(:total) { 25 }
let(:done) { 18 }

before do
allow(batch_progress).to receive_messages(total: total, done: done)
end

it { is_expected.to eq(total - done) }
end

describe '#done' do
subject { batch_progress.done }

it { is_expected.to eq(batch_state.values.count { |e| %w[dead completed].include?(e) }) }
let(:completed) { 25 }
let(:dead) { 18 }

before do
allow(batch_progress).to receive_messages(completed: completed, dead: dead)
end

it { is_expected.to eq(completed + dead) }
end

describe '#percent' do
subject { batch_progress.percent(**opts) }

let(:opts) { {} }
let(:total) { 25 }
let(:done) { 18 }

before do
allow(batch_progress).to receive_messages(total: total, done: done)
end

context 'with batch' do
it { is_expected.to eq((batch_progress.done.to_f / batch_progress.total) * 100) }
it { is_expected.to eq((done.to_f / total) * 100) }
end

context 'with min_total > total' do
let(:opts) { { min_total: 1000 } }

it { is_expected.to eq((batch_progress.done.to_f / opts[:min_total]) * 100) }
it { is_expected.to eq((done.to_f / opts[:min_total]) * 100) }
end

context 'with min_total < total' do
let(:opts) { { min_total: batch_progress.total / 2 } }
let(:opts) { { min_total: total / 2 } }

it { is_expected.to eq((batch_progress.done.to_f / batch_progress.total) * 100) }
it { is_expected.to eq((done.to_f / total) * 100) }
end

context 'with additive smoothing' do
let(:opts) { { smoothing: 10 } }

it { is_expected.to eq((batch_progress.done.to_f / (batch_progress.total + opts[:smoothing])) * 100) }
end

context 'with empty elements' do
let(:batch_state) { {} }

it { is_expected.to be_zero }
it { is_expected.to eq((done.to_f / (total + opts[:smoothing])) * 100) }
end
end

describe '#+' do
subject { batch_progress + other }

let(:other_state) do
{
'4' => 'completed',
'5' => 'scheduled',
'6' => 'processing'
}
end
let(:other) { described_class.new(other_state) }
let(:other) { described_class.new([instance_double(Cloudtasker::Batch::Job)]) }

it { is_expected.to be_a(described_class) }
it { is_expected.to have_attributes(batch_state: batch_state.merge(other_state)) }
it { is_expected.to have_attributes(class: described_class, batches: batch_progress.batches + other.batches) }
end
end
Loading

0 comments on commit 0bc8792

Please sign in to comment.