Skip to content

Commit

Permalink
Correct running job count
Browse files Browse the repository at this point in the history
The previous iteration didn't actually count running jobs correctly.
  • Loading branch information
carlocab committed May 2, 2024
1 parent 8de88c3 commit bda2ca8
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 15 deletions.
8 changes: 4 additions & 4 deletions src/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Job
NAME_REGEX =
/\A(?<runner>\d+(?:\.\d+)?(?:-arm64)?(?:-cross)?)-(?<run_id>\d+)(?:-(?<run_attempt>\d+))?(?:-(?<tags>[-a-z]+))?\z/

attr_reader :runner_name, :repository, :github_id, :secret, :type
attr_reader :runner_name, :repository, :github_id, :secret, :group
attr_writer :orka_setup_timeout
attr_accessor :github_state, :orka_vm_id, :orka_setup_time, :orka_start_attempts, :runner_completion_time

Expand All @@ -26,7 +26,7 @@ def initialize(runner_name, repository, github_id, secret: nil)
@orka_start_attempts = 0
@secret = secret || SecureRandom.hex(32)
@runner_completion_time = nil
@type = long_build? ? :long : :standard
@group = long_build? ? :long : :default
end

def os
Expand All @@ -46,11 +46,11 @@ def run_attempt
end

def tags
@runner_name[NAME_REGEX, :tags]&.split("-")
@runner_name[NAME_REGEX, :tags]&.split("-").to_a
end

def long_build?
tags&.include?("long")
tags.include?("long")
end

def runner_labels
Expand Down
14 changes: 8 additions & 6 deletions src/job_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,29 @@

require_relative "shared_state"

# A variation of `Thread::Queue` that allows us to prioritise certain types of jobs.
class JobQueue
def initialize
def initialize(queue_type)
@mutex = Mutex.new
@queue = Hash.new { |h, k| h[k] = Queue.new }
@state = SharedState.instance
@queue_type = queue_type
end

def <<(job)
@mutex.synchronize do
@queue[job.type] << job
@queue[job.group] << job
end
end

def pop
@mutex.synchronize do
running_long_build_count = @state.running_jobs
.count { |job| job.type == :long }
if running_long_build_count < 2
running_long_build_count = @state.running_jobs(@queue_type).count(&:long_build?)

if (running_long_build_count < 2 || @queue[:default].empty?) && !@queue[:long].empty?
@queue[:long].pop
else
@queue[:standard].pop
@queue[:default].pop
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions src/orka_start_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ class OrkaStartProcessor < ThreadRunner

attr_reader :queue

def initialize(name)
def initialize(queue_type, name)
super("#{self.class.name} (#{name})")
@queue = JobQueue.new
@queue = JobQueue.new(queue_type)
end

def pausable?
Expand Down
6 changes: 3 additions & 3 deletions src/shared_state.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def initialize
@file_mutex = Mutex.new

@orka_start_processors = QueueTypes.to_h do |type|
[type, OrkaStartProcessor.new(QueueTypes.name(type))]
[type, OrkaStartProcessor.new(type, QueueTypes.name(type))]
end
@orka_stop_processor = OrkaStopProcessor.new
@orka_timeout_processor = OrkaTimeoutProcessor.new
Expand Down Expand Up @@ -215,8 +215,8 @@ def free_slot?(waiting_job)
@jobs.count { |job| job.queue_type == waiting_job.queue_type && !job.orka_vm_id.nil? } < max_slots
end

def running_jobs
jobs.reject { |j| j.github_state == :queued || j.github_state == :completed }
def running_jobs(queue_type)
jobs.count { |job| job.queue_type == queue_type && !job.orka_vm_id.nil? }
end

private
Expand Down

0 comments on commit bda2ca8

Please sign in to comment.