Skip to content

Commit

Permalink
Merge pull request #417 from ikyn-inc/interruptible
Browse files Browse the repository at this point in the history
Reimplement Interruptible
  • Loading branch information
rosa authored Dec 4, 2024
2 parents 76b7eaf + 17c4cd5 commit 4f29fc8
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 44 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@
# Folder for Visual Studio Code
/.vscode/

# Files for RVM holdouts
.ruby-gemset

# misc
.DS_Store
2 changes: 1 addition & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
inherit_gem: { rubocop-rails-omakase: rubocop.yml }

AllCops:
TargetRubyVersion: 3.0
TargetRubyVersion: 3.3
Exclude:
- "test/dummy/db/schema.rb"
- "test/dummy/db/queue_schema.rb"
6 changes: 5 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ GEM
reline (>= 0.4.2)
json (2.8.2)
language_server-protocol (3.17.0.3)
logger (1.6.2)
loofah (2.23.1)
crass (~> 1.0.2)
nokogiri (>= 1.12.0)
Expand Down Expand Up @@ -177,16 +178,19 @@ GEM
PLATFORMS
arm64-darwin-22
arm64-darwin-23
arm64-darwin-24
x86_64-darwin-21
x86_64-darwin-23
x86_64-linux

DEPENDENCIES
debug
debug (~> 1.9)
logger
mocha
mysql2
pg
puma
rdoc
rubocop-rails-omakase
solid_queue!
sqlite3
Expand Down
28 changes: 10 additions & 18 deletions lib/solid_queue/processes/interruptible.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,23 @@ def wake_up
end

private
SELF_PIPE_BLOCK_SIZE = 11

def interrupt
self_pipe[:writer].write_nonblock(".")
rescue Errno::EAGAIN, Errno::EINTR
# Ignore writes that would block and retry
# if another signal arrived while writing
retry
queue << true
end

def interruptible_sleep(time)
if time > 0 && self_pipe[:reader].wait_readable(time)
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
end
rescue Errno::EAGAIN, Errno::EINTR
# Invoking from the main thread can result in a 35% slowdown (at least when running the test suite).
# Using some form of Async (Futures) addresses this performance issue.
Concurrent::Promises.future(time) do |timeout|
if timeout > 0 && queue.pop(timeout:)
queue.clear
end
end.value
end

# Self-pipe for signal-handling (http://cr.yp.to/docs/selfpipe.html)
def self_pipe
@self_pipe ||= create_self_pipe
end

def create_self_pipe
reader, writer = IO.pipe
{ reader: reader, writer: writer }
def queue
@queue ||= Queue.new
end
end
end
4 changes: 3 additions & 1 deletion solid_queue.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ Gem::Specification.new do |spec|
spec.add_dependency "fugit", "~> 1.11.0"
spec.add_dependency "thor", "~> 1.3.1"

spec.add_development_dependency "debug"
spec.add_development_dependency "debug", "~> 1.9"
spec.add_development_dependency "mocha"
spec.add_development_dependency "puma"
spec.add_development_dependency "mysql2"
spec.add_development_dependency "pg"
spec.add_development_dependency "sqlite3"
spec.add_development_dependency "rubocop-rails-omakase"
spec.add_development_dependency "rdoc"
spec.add_development_dependency "logger"
end
12 changes: 12 additions & 0 deletions test/dummy/config/initializers/enable_yjit.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

# Ideally, tests should be configured as close to production settings as
# possible and YJIT is likely to be enabled. While it's highly unlikely
# YJIT would cause issues, enabling it confirms this assertion.
#
# Configured via initializer to align with Rails 7.1 default in gemspec
if defined?(RubyVM::YJIT.enable)
Rails.application.config.after_initialize do
RubyVM::YJIT.enable
end
end
2 changes: 1 addition & 1 deletion test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
test "run several jobs over the same record sequentially, with some of them failing" do
("A".."F").each_with_index do |name, i|
# A, C, E will fail, for i= 0, 2, 4
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (RuntimeError if i.even?))
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?))
end

("G".."K").each do |name|
Expand Down
8 changes: 4 additions & 4 deletions test/integration/instrumentation_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class InstrumentationTest < ActiveSupport::TestCase

test "errors when deregistering processes are included in deregister_process events" do
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
error = RuntimeError.new("everything is broken")
error = ExpectedTestError.new("everything is broken")
SolidQueue::Process.any_instance.expects(:destroy!).raises(error).at_least_once

events = subscribed("deregister_process.solid_queue") do
Expand All @@ -182,7 +182,7 @@ class InstrumentationTest < ActiveSupport::TestCase
end

test "retrying failed job emits retry event" do
RaisingJob.perform_later(RuntimeError, "A")
RaisingJob.perform_later(ExpectedTestError, "A")
job = SolidQueue::Job.last

worker = SolidQueue::Worker.new.tap(&:start)
Expand All @@ -198,7 +198,7 @@ class InstrumentationTest < ActiveSupport::TestCase
end

test "retrying failed jobs in bulk emits retry_all" do
3.times { RaisingJob.perform_later(RuntimeError, "A") }
3.times { RaisingJob.perform_later(ExpectedTestError, "A") }
AddToBufferJob.perform_later("A")

jobs = SolidQueue::Job.last(4)
Expand Down Expand Up @@ -392,7 +392,7 @@ class InstrumentationTest < ActiveSupport::TestCase
test "thread errors emit thread_error events" do
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false

error = RuntimeError.new("everything is broken")
error = ExpectedTestError.new("everything is broken")
SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once

AddToBufferJob.perform_later "hey!"
Expand Down
8 changes: 5 additions & 3 deletions test/integration/jobs_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

class JobsLifecycleTest < ActiveSupport::TestCase
setup do
SolidQueue.on_thread_error = silent_on_thread_error_for([ ExpectedTestError, RaisingJob::DefaultError ])
@worker = SolidQueue::Worker.new(queues: "background", threads: 3)
@dispatcher = SolidQueue::Dispatcher.new(batch_size: 10, polling_interval: 0.2)
end

teardown do
SolidQueue.on_thread_error = @on_thread_error
@worker.stop
@dispatcher.stop

Expand All @@ -29,16 +31,16 @@ class JobsLifecycleTest < ActiveSupport::TestCase
end

test "enqueue and run jobs that fail without retries" do
RaisingJob.perform_later(RuntimeError, "A")
RaisingJob.perform_later(RuntimeError, "B")
RaisingJob.perform_later(ExpectedTestError, "A")
RaisingJob.perform_later(ExpectedTestError, "B")
jobs = SolidQueue::Job.last(2)

@dispatcher.start
@worker.start

wait_for_jobs_to_finish_for(3.seconds)

message = "raised RuntimeError for the 1st time"
message = "raised ExpectedTestError for the 1st time"
assert_equal [ "A: #{message}", "B: #{message}" ], JobBuffer.values.sort

assert_empty SolidQueue::Job.finished
Expand Down
6 changes: 3 additions & 3 deletions test/integration/processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase
test "process some jobs that raise errors" do
2.times { enqueue_store_result_job("no error", :background) }
2.times { enqueue_store_result_job("no error", :default) }
error1 = enqueue_store_result_job("error", :background, exception: RuntimeError)
error1 = enqueue_store_result_job("error", :background, exception: ExpectedTestError)
enqueue_store_result_job("no error", :background, pause: 0.03)
error2 = enqueue_store_result_job("error", :background, exception: RuntimeError, pause: 0.05)
error2 = enqueue_store_result_job("error", :background, exception: ExpectedTestError, pause: 0.05)
2.times { enqueue_store_result_job("no error", :default, pause: 0.01) }
error3 = enqueue_store_result_job("error", :default, exception: RuntimeError)
error3 = enqueue_store_result_job("error", :default, exception: ExpectedTestError)

wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ])

Expand Down
16 changes: 9 additions & 7 deletions test/models/solid_queue/failed_execution_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,25 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase
end

test "run job that fails" do
RaisingJob.perform_later(RuntimeError, "A")
RaisingJob.perform_later(ExpectedTestError, "A")
@worker.start

assert_equal 1, SolidQueue::FailedExecution.count
assert SolidQueue::Job.last.failed?
end

test "run job that fails with a SystemStackError (stack level too deep)" do
InfiniteRecursionJob.perform_later
@worker.start
silence_on_thread_error_for(SystemStackError) do
InfiniteRecursionJob.perform_later
@worker.start

assert_equal 1, SolidQueue::FailedExecution.count
assert SolidQueue::Job.last.failed?
assert_equal 1, SolidQueue::FailedExecution.count
assert SolidQueue::Job.last.failed?
end
end

test "retry failed job" do
RaisingJob.perform_later(RuntimeError, "A")
RaisingJob.perform_later(ExpectedTestError, "A")
@worker.start

assert_difference -> { SolidQueue::FailedExecution.count }, -1 do
Expand All @@ -34,7 +36,7 @@ class SolidQueue::FailedExecutionTest < ActiveSupport::TestCase
end

test "retry failed jobs in bulk" do
1.upto(5) { |i| RaisingJob.perform_later(RuntimeError, i) }
1.upto(5) { |i| RaisingJob.perform_later(ExpectedTestError, i) }
1.upto(3) { |i| AddToBufferJob.perform_later(i) }

@worker.start
Expand Down
33 changes: 33 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,20 @@ def write(...)
end

Logger::LogDevice.prepend(BlockLogDeviceTimeoutExceptions)
class ExpectedTestError < RuntimeError; end


class ActiveSupport::TestCase
include ProcessesTestHelper, JobsTestHelper

setup do
# Could be cleaner with one several minitest gems, but didn't want to add new dependency
@_on_thread_error = SolidQueue.on_thread_error
SolidQueue.on_thread_error = silent_on_thread_error_for(ExpectedTestError)
end

teardown do
SolidQueue.on_thread_error = @_on_thread_error
JobBuffer.clear

if SolidQueue.supervisor_pidfile && File.exist?(SolidQueue.supervisor_pidfile)
Expand Down Expand Up @@ -69,4 +78,28 @@ def wait_while_with_timeout!(timeout, &block)
def skip_active_record_query_cache(&block)
SolidQueue::Record.uncached(&block)
end

# Silences specified exceptions during the execution of a block
#
# @param [Exception, Array<Exception>] expected an Exception or an array of Exceptions to ignore
# @yield Executes the provided block with specified exception(s) silenced
def silence_on_thread_error_for(expected, &block)
SolidQueue.with(on_thread_error: silent_on_thread_error_for(expected)) do
block.call
end
end

# Does not call on_thread_error for expected exceptions
# @param [Exception, Array<Exception>] expected an Exception or an array of Exceptions to ignore
def silent_on_thread_error_for(expected)
current_proc = SolidQueue.on_thread_error

->(exception) do
expected_exceptions = Array(expected)

unless expected_exceptions.any? { exception.instance_of?(_1) }
current_proc.call(exception)
end
end
end
end
10 changes: 5 additions & 5 deletions test/unit/worker_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ class WorkerTest < ActiveSupport::TestCase
original_on_thread_error, SolidQueue.on_thread_error = SolidQueue.on_thread_error, ->(error) { errors << error.message }
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false

SolidQueue::ReadyExecution.expects(:claim).raises(RuntimeError.new("everything is broken")).at_least_once
SolidQueue::ReadyExecution.expects(:claim).raises(ExpectedTestError.new("everything is broken")).at_least_once

AddToBufferJob.perform_later "hey!"

worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2).tap(&:start)
sleep(1)

assert_raises RuntimeError do
assert_raises ExpectedTestError do
worker.stop
end

Expand All @@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase
subscriber = ErrorBuffer.new
Rails.error.subscribe(subscriber)

SolidQueue::ClaimedExecution::Result.expects(:new).raises(RuntimeError.new("everything is broken")).at_least_once
SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once

AddToBufferJob.perform_later "hey!"

Expand All @@ -71,15 +71,15 @@ class WorkerTest < ActiveSupport::TestCase
subscriber = ErrorBuffer.new
Rails.error.subscribe(subscriber)

RaisingJob.perform_later(RuntimeError, "B")
RaisingJob.perform_later(ExpectedTestError, "B")

@worker.start

wait_for_jobs_to_finish_for(1.second)
@worker.wake_up

assert_equal 1, subscriber.errors.count
assert_equal "This is a RuntimeError exception", subscriber.messages.first
assert_equal "This is a ExpectedTestError exception", subscriber.messages.first
ensure
Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe)
end
Expand Down

0 comments on commit 4f29fc8

Please sign in to comment.