diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index c6e2267f4..c00f8ac79 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -90,6 +90,18 @@ def shutdown? !@pool.running? end + # Invoked on completion of ThreadPoolExecutor task + # @!visibility private + # @return [void] + def listen_observer(_time, _result, thread_error) + if thread_error + GoodJob.on_thread_error.call(thread_error) if GoodJob.on_thread_error.respond_to?(:call) + ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: thread_error }) + end + + listen unless shutdown? + end + private def create_pool @@ -120,14 +132,11 @@ def listen listening.make_false end end - end - rescue StandardError => e - ActiveSupport::Notifications.instrument("notifier_notify_error.good_job", { error: e }) - raise - ensure - @listening.make_false - ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - conn.async_exec "UNLISTEN *" + ensure + listening.make_false + ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do + conn.async_exec "UNLISTEN *" + end end end @@ -135,10 +144,6 @@ def listen future.execute end - def listen_observer(_time, _result, _thread_error) - listen unless shutdown? - end - def with_listen_connection ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn| ActiveRecord::Base.connection_pool.remove(conn) diff --git a/spec/lib/good_job/notifier_spec.rb b/spec/lib/good_job/notifier_spec.rb index 464cc569b..086640923 100644 --- a/spec/lib/good_job/notifier_spec.rb +++ b/spec/lib/good_job/notifier_spec.rb @@ -32,5 +32,19 @@ expect(RECEIVED_MESSAGE.true?).to eq true end + + it 'raises exception to GoodJob.on_thread_error' do + stub_const('ExpectedError', Class.new(StandardError)) + on_thread_error = instance_double(Proc, call: nil) + allow(GoodJob).to receive(:on_thread_error).and_return(on_thread_error) + allow(JSON).to receive(:parse).and_raise ExpectedError + + notifier = described_class.new + sleep_until(max: 5, increments_of: 0.5) { notifier.listening? } + described_class.notify(true) + notifier.shutdown + + expect(on_thread_error).to have_received(:call).at_least(:once).with instance_of(ExpectedError) + end end end