diff --git a/src/job.rb b/src/job.rb index 568d0d9..9f0ffeb 100644 --- a/src/job.rb +++ b/src/job.rb @@ -9,7 +9,7 @@ class Job NAME_REGEX = /\A(?\d+(?:\.\d+)?(?:-arm64)?(?:-cross)?)-(?\d+)(?:-(?\d+))?(?:-(?[-a-z]+))?\z/ - attr_reader :runner_name, :repository, :github_id, :secret, :type + attr_reader :runner_name, :repository, :github_id, :secret, :group attr_writer :orka_setup_timeout attr_accessor :github_state, :orka_vm_id, :orka_setup_time, :orka_start_attempts, :runner_completion_time @@ -26,7 +26,7 @@ def initialize(runner_name, repository, github_id, secret: nil) @orka_start_attempts = 0 @secret = secret || SecureRandom.hex(32) @runner_completion_time = nil - @type = long_build? ? :long : :standard + @group = long_build? ? :long : :default end def os @@ -46,11 +46,11 @@ def run_attempt end def tags - @runner_name[NAME_REGEX, :tags]&.split("-") + @runner_name[NAME_REGEX, :tags]&.split("-").to_a end def long_build? - tags&.include?("long") + tags.include?("long") end def runner_labels diff --git a/src/job_queue.rb b/src/job_queue.rb index 33143a6..4ec0855 100644 --- a/src/job_queue.rb +++ b/src/job_queue.rb @@ -2,27 +2,29 @@ require_relative "shared_state" +# A variation of `Thread::Queue` that allows us to prioritise certain types of jobs. class JobQueue - def initialize + def initialize(queue_type) @mutex = Mutex.new @queue = Hash.new { |h, k| h[k] = Queue.new } @state = SharedState.instance + @queue_type = queue_type end def <<(job) @mutex.synchronize do - @queue[job.type] << job + @queue[job.group] << job end end def pop @mutex.synchronize do - running_long_build_count = @state.running_jobs - .count { |job| job.type == :long } - if running_long_build_count < 2 + running_long_build_count = @state.running_jobs(@queue_type).count(&:long_build?) + + if (running_long_build_count < 2 || @queue[:default].empty?) && !@queue[:long].empty? @queue[:long].pop else - @queue[:standard].pop + @queue[:default].pop end end end diff --git a/src/orka_start_processor.rb b/src/orka_start_processor.rb index ebd9397..1285a5b 100644 --- a/src/orka_start_processor.rb +++ b/src/orka_start_processor.rb @@ -22,9 +22,9 @@ class OrkaStartProcessor < ThreadRunner attr_reader :queue - def initialize(name) + def initialize(queue_type, name) super("#{self.class.name} (#{name})") - @queue = JobQueue.new + @queue = JobQueue.new(queue_type) end def pausable? diff --git a/src/shared_state.rb b/src/shared_state.rb index d0e0870..387a7fd 100644 --- a/src/shared_state.rb +++ b/src/shared_state.rb @@ -81,7 +81,7 @@ def initialize @file_mutex = Mutex.new @orka_start_processors = QueueTypes.to_h do |type| - [type, OrkaStartProcessor.new(QueueTypes.name(type))] + [type, OrkaStartProcessor.new(type, QueueTypes.name(type))] end @orka_stop_processor = OrkaStopProcessor.new @orka_timeout_processor = OrkaTimeoutProcessor.new @@ -215,8 +215,8 @@ def free_slot?(waiting_job) @jobs.count { |job| job.queue_type == waiting_job.queue_type && !job.orka_vm_id.nil? } < max_slots end - def running_jobs - jobs.reject { |j| j.github_state == :queued || j.github_state == :completed } + def running_jobs(queue_type) + jobs.count { |job| job.queue_type == queue_type && !job.orka_vm_id.nil? } end private