From fd8cf109a7a70cf143a799bad453a016200a2f5e Mon Sep 17 00:00:00 2001 From: Stefan Hoffmann Date: Fri, 20 Oct 2023 15:41:37 +0200 Subject: [PATCH 1/4] Stop the dispatching of new messages when a SIGTERM signal has been received --- lib/shoryuken/launcher.rb | 3 +++ spec/shoryuken/launcher_spec.rb | 26 ++++++++++++++++++++------ 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/lib/shoryuken/launcher.rb b/lib/shoryuken/launcher.rb index 5a9d94a8..30269fe5 100644 --- a/lib/shoryuken/launcher.rb +++ b/lib/shoryuken/launcher.rb @@ -16,6 +16,9 @@ def start def stop! initiate_stop + # Don't await here so the timeout below is not delayed + stop_new_dispatching + executor.shutdown executor.kill unless executor.wait_for_termination(Shoryuken.options[:timeout]) diff --git a/spec/shoryuken/launcher_spec.rb b/spec/shoryuken/launcher_spec.rb index 0c9a5318..bc65b88c 100644 --- a/spec/shoryuken/launcher_spec.rb +++ b/spec/shoryuken/launcher_spec.rb @@ -67,10 +67,17 @@ end it 'fires quiet, shutdown and stopped event' do - expect(subject).to receive(:fire_event).with(:quiet, true) - expect(subject).to receive(:fire_event).with(:shutdown, true) - expect(subject).to receive(:fire_event).with(:stopped) + allow(subject).to receive(:fire_event) subject.stop + expect(subject).to have_received(:fire_event).with(:quiet, true) + expect(subject).to have_received(:fire_event).with(:shutdown, true) + expect(subject).to have_received(:fire_event).with(:stopped) + end + + it 'stops the managers' do + subject.stop + expect(first_group_manager).to have_received(:stop_new_dispatching) + expect(second_group_manager).to have_received(:stop_new_dispatching) end end @@ -83,9 +90,16 @@ end it 'fires shutdown and stopped event' do - expect(subject).to receive(:fire_event).with(:shutdown, true) - expect(subject).to receive(:fire_event).with(:stopped) + allow(subject).to receive(:fire_event) + subject.stop! + expect(subject).to have_received(:fire_event).with(:shutdown, true) + expect(subject).to have_received(:fire_event).with(:stopped) + end + + it 'stops the managers' do subject.stop! + expect(first_group_manager).to have_received(:stop_new_dispatching) + expect(second_group_manager).to have_received(:stop_new_dispatching) end end -end \ No newline at end of file +end From b1f81fafdfaaafa81977de52e325b396e62971a8 Mon Sep 17 00:00:00 2001 From: Stefan Hoffmann Date: Wed, 25 Oct 2023 19:42:44 +0200 Subject: [PATCH 2/4] Use ActiveSupport::Testing::TimeHelpers to fix a flaky test that checks the message deduplication id --- Gemfile | 1 + spec/shared_examples_for_active_job.rb | 17 +++++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/Gemfile b/Gemfile index 6de67cc8..ea278e45 100644 --- a/Gemfile +++ b/Gemfile @@ -5,6 +5,7 @@ gemspec group :test do gem 'activejob' + gem 'activesupport' gem 'aws-sdk-core', '~> 3' gem 'aws-sdk-sqs' gem 'codeclimate-test-reporter', require: nil diff --git a/spec/shared_examples_for_active_job.rb b/spec/shared_examples_for_active_job.rb index 49c13741..5ad7cb7b 100644 --- a/spec/shared_examples_for_active_job.rb +++ b/spec/shared_examples_for_active_job.rb @@ -1,11 +1,14 @@ require 'active_job' require 'shoryuken/extensions/active_job_extensions' +require 'active_support/testing/time_helpers' # Stand-in for a job class specified by the user class TestJob < ActiveJob::Base; end # rubocop:disable Metrics/BlockLength RSpec.shared_examples 'active_job_adapters' do + include ActiveSupport::Testing::TimeHelpers + let(:job_sqs_send_message_parameters) { {} } let(:job) do job = TestJob.new @@ -43,14 +46,16 @@ class TestJob < ActiveJob::Base; end let(:fifo) { true } it 'does not include job_id in the deduplication_id' do - expect(queue).to receive(:send_message) do |hash| - message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id'))) + freeze_time do + expect(queue).to receive(:send_message) do |hash| + message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id'))) - expect(hash[:message_deduplication_id]).to eq(message_deduplication_id) - end - expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) + expect(hash[:message_deduplication_id]).to eq(message_deduplication_id) + end + expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) - subject.enqueue(job) + subject.enqueue(job) + end end context 'with message_deduplication_id' do From 8d9d129dfdeb4c652e63242258ec5a20d1656315 Mon Sep 17 00:00:00 2001 From: Stefan Hoffmann Date: Thu, 26 Oct 2023 18:00:21 +0200 Subject: [PATCH 3/4] Removed ActiveSupport::Testing::TimeHelpers as it is not available in rails 4 --- Gemfile | 1 - spec/shared_examples_for_active_job.rb | 17 ++++++----------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/Gemfile b/Gemfile index ea278e45..6de67cc8 100644 --- a/Gemfile +++ b/Gemfile @@ -5,7 +5,6 @@ gemspec group :test do gem 'activejob' - gem 'activesupport' gem 'aws-sdk-core', '~> 3' gem 'aws-sdk-sqs' gem 'codeclimate-test-reporter', require: nil diff --git a/spec/shared_examples_for_active_job.rb b/spec/shared_examples_for_active_job.rb index 5ad7cb7b..49c13741 100644 --- a/spec/shared_examples_for_active_job.rb +++ b/spec/shared_examples_for_active_job.rb @@ -1,14 +1,11 @@ require 'active_job' require 'shoryuken/extensions/active_job_extensions' -require 'active_support/testing/time_helpers' # Stand-in for a job class specified by the user class TestJob < ActiveJob::Base; end # rubocop:disable Metrics/BlockLength RSpec.shared_examples 'active_job_adapters' do - include ActiveSupport::Testing::TimeHelpers - let(:job_sqs_send_message_parameters) { {} } let(:job) do job = TestJob.new @@ -46,16 +43,14 @@ class TestJob < ActiveJob::Base; end let(:fifo) { true } it 'does not include job_id in the deduplication_id' do - freeze_time do - expect(queue).to receive(:send_message) do |hash| - message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id'))) - - expect(hash[:message_deduplication_id]).to eq(message_deduplication_id) - end - expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) + expect(queue).to receive(:send_message) do |hash| + message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id'))) - subject.enqueue(job) + expect(hash[:message_deduplication_id]).to eq(message_deduplication_id) end + expect(Shoryuken).to receive(:register_worker).with(job.queue_name, described_class::JobWrapper) + + subject.enqueue(job) end context 'with message_deduplication_id' do From 7705262e0e6dcab80a74d97f902e9301c8f0edc9 Mon Sep 17 00:00:00 2001 From: Stefan Hoffmann Date: Thu, 26 Oct 2023 18:01:06 +0200 Subject: [PATCH 4/4] Exclude `enqueued_at` in the body for the deduplication id --- lib/shoryuken/extensions/active_job_adapter.rb | 7 +++++-- spec/shared_examples_for_active_job.rb | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/shoryuken/extensions/active_job_adapter.rb b/lib/shoryuken/extensions/active_job_adapter.rb index 21c81286..3a7482bc 100644 --- a/lib/shoryuken/extensions/active_job_adapter.rb +++ b/lib/shoryuken/extensions/active_job_adapter.rb @@ -66,8 +66,11 @@ def message(queue, job) } if queue.fifo? - # See https://github.com/phstc/shoryuken/issues/457 - msg[:message_deduplication_id] = Digest::SHA256.hexdigest(JSON.dump(body.except('job_id'))) + # See https://github.com/ruby-shoryuken/shoryuken/issues/457 and + # https://github.com/ruby-shoryuken/shoryuken/pull/750#issuecomment-1781317929 + msg[:message_deduplication_id] = Digest::SHA256.hexdigest( + JSON.dump(body.except('job_id', 'enqueued_at')) + ) end msg.merge(job_params.except(:message_attributes)) diff --git a/spec/shared_examples_for_active_job.rb b/spec/shared_examples_for_active_job.rb index 49c13741..c1f2be06 100644 --- a/spec/shared_examples_for_active_job.rb +++ b/spec/shared_examples_for_active_job.rb @@ -42,9 +42,11 @@ class TestJob < ActiveJob::Base; end context 'when fifo' do let(:fifo) { true } - it 'does not include job_id in the deduplication_id' do + it 'does not include job_id and enqueued_at in the deduplication_id' do expect(queue).to receive(:send_message) do |hash| - message_deduplication_id = Digest::SHA256.hexdigest(JSON.dump(job.serialize.except('job_id'))) + message_deduplication_id = Digest::SHA256.hexdigest( + JSON.dump(job.serialize.except('job_id', 'enqueued_at')) + ) expect(hash[:message_deduplication_id]).to eq(message_deduplication_id) end