Skip to content

Commit

Permalink
fix(que): Fix bulk_enqueue when enqueuing more than 5 jobs
Browse files Browse the repository at this point in the history
Previously, with Que `bulk_enqueue` a new span was created for every job
in the batch.

For each such span, a tag was added to _all_ jobs in the batch. This
meant that if you enqueued 3 jobs, then 3 spans were created and each
job had 3 tags pointing to 3 different spans.

This behavior became problematic when you tried to insert more than 5
jobs, as Que supports up to 5 tags per job. More specifically, a runtime
error was raised by Que.

This commit fixes that bug by creating only a single span when using
bulk_enqueue.
  • Loading branch information
laurglia committed Sep 24, 2024
1 parent 2debf6f commit db79b63
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class << base
# Module to prepend to Que singleton class
module ClassMethods
def enqueue(*args, job_options: {}, **arg_opts)
# In Que version 2.1.0 `bulk_enqueue` was introduced.
# In that case, the span is created inside the `bulk_enqueue` method.
return super(*args, **arg_opts) if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert]

tracer = Que::Instrumentation.instance.tracer
otel_config = Que::Instrumentation.instance.config

Expand All @@ -43,19 +47,8 @@ def enqueue(*args, job_options: {}, **arg_opts)
OpenTelemetry.propagation.inject(tags, setter: TagSetter)
end

# In Que version 2.1.0 `bulk_enqueue` was introduced and in order
# for it to work, we must pass `job_options` to `bulk_enqueue` instead of enqueue.
if gem_version >= Gem::Version.new('2.1.0') && Thread.current[:que_jobs_to_bulk_insert]
Thread.current[:que_jobs_to_bulk_insert][:job_options] = Thread.current[:que_jobs_to_bulk_insert][:job_options]&.merge(tags: tags) do |_, a, b|
a.is_a?(Array) && b.is_a?(Array) ? a.concat(b) : b
end

job = super(*args, **arg_opts)
job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs].last
else
job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts)
job_attrs = job.que_attrs
end
job = super(*args, job_options: job_options.merge(tags: tags), **arg_opts)
job_attrs = job.que_attrs

span.name = "#{job_attrs[:job_class]} publish"
span.add_attributes(QueJob.job_attributes(job_attrs))
Expand All @@ -67,6 +60,32 @@ def enqueue(*args, job_options: {}, **arg_opts)
def gem_version
@gem_version ||= Gem.loaded_specs['que'].version
end

if Gem.loaded_specs['que'].version >= Gem::Version.new('2.1.0')
def bulk_enqueue(**_kwargs, &block)
tracer = Que::Instrumentation.instance.tracer
otel_config = Que::Instrumentation.instance.config

tracer.in_span('publish', kind: :producer) do |span|
super do
yield

job_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]

unless job_attrs.empty?
span.name = "#{job_attrs.first[:job_class]} publish"
span.add_attributes(QueJob.job_attributes(job_attrs.first))
end

if otel_config[:propagation_style] != :none
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
job_options[:tags] ||= []
OpenTelemetry.propagation.inject(job_options[:tags], setter: TagSetter)
end
end
end
end
end
end

def self.job_attributes(job_attrs)
Expand Down
78 changes: 73 additions & 5 deletions instrumentation/que/test/opentelemetry/instrumentation/que_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,74 @@ def self.run(first, second); end
end
end

describe 'enqueueing multiple jobs' do
it 'creates a span' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

_(finished_spans.size).must_equal(1)

span = finished_spans.last
_(span.kind).must_equal(:producer)
end

it 'names the created span' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

span = finished_spans.last
_(span.name).must_equal('TestJobAsync publish')
end

it 'links spans together' do
bulk_jobs = Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

bulk_jobs.each { |job| Que.run_job_middleware(job) { job.tap(&:_run) } }

_(finished_spans.size).must_equal(11)

publish_span = finished_spans.first

process_spans = finished_spans.drop(1)

process_spans.each do |process_span|
_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end
end

it 'records attributes' do
Que.bulk_enqueue do
10.times { TestJobAsync.enqueue }
end

attributes = finished_spans.last.attributes
_(attributes['messaging.system']).must_equal('que')
_(attributes['messaging.destination']).must_equal('default')
_(attributes['messaging.destination_kind']).must_equal('queue')
_(attributes['messaging.operation']).must_equal('publish')
_(attributes['messaging.que.job_class']).must_equal('TestJobAsync')
_(attributes['messaging.que.priority']).must_equal(100)
_(attributes.key?('messaging.message_id')).must_equal(false)
end
end

describe 'enqueueing zero jobs' do
it 'creates a span' do
Que.bulk_enqueue do
end

_(finished_spans.size).must_equal(1)
end
end

describe 'processing a job' do
before do
bulk_job = Que.bulk_enqueue do
Expand Down Expand Up @@ -363,10 +431,10 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

span1 = finished_spans.first
span1 = finished_spans.last
_(span1.kind).must_equal(:producer)

span2 = finished_spans.last
span2 = finished_spans.first
_(span2.kind).must_equal(:consumer)
end

Expand All @@ -375,10 +443,10 @@ def self.run(first, second); end
TestJobSync.enqueue
end

span1 = finished_spans.first
span1 = finished_spans.last
_(span1.name).must_equal('TestJobSync publish')

span2 = finished_spans.last
span2 = finished_spans.first
_(span2.name).must_equal('TestJobSync process')
end

Expand All @@ -387,7 +455,7 @@ def self.run(first, second); end
TestJobSync.enqueue
end

attributes = finished_spans.last.attributes
attributes = finished_spans.first.attributes
_(attributes['messaging.system']).must_equal('que')
_(attributes['messaging.destination']).must_equal('default')
_(attributes['messaging.destination_kind']).must_equal('queue')
Expand Down

0 comments on commit db79b63

Please sign in to comment.