Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Formalize Barrier behavior during waiting #3464

Closed
wants to merge 20 commits into from
96 changes: 71 additions & 25 deletions lib/datadog/core/remote/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,60 +87,106 @@ def shutdown!
@worker.stop
end

# Barrier provides a mechanism to fence execution until a condition happens
# Provides a mechanism to fence execution until a condition happens.
#
# The barrier is created when a lengthy process (e.g. remote
# configuration retrieval over network) starts.
# The barrier is initialized with an optional timeout, which is
# the upper bound on how long the clients want to wait for the work
# to complete.
#
# When work completes, the thread performing the work should call
# +lift+ to lift the barrier.
#
# Other threads can call +wait_once+ at any time to wait for the
# work to complete, up to the smaller of the barrier timeout since
# the work started or the per-wait timeout since waiting began.
# Once the barrier timeout elapsed since creation of the barrier,
# all waits return immediately.
#
# @note This is an internal class.
class Barrier
def initialize(timeout = nil)
@once = false
@timeout = timeout
@lifted = false
@deadline = timeout && Core::Utils::Time.get_time + timeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm not sure this is the correct intended semantics.

From reading the code, my understanding is that the intention is that the timeout gets configured at component initialization time, but the actual timeout would only start counting later, when the worker gets lazily initialized.

I guess cc @lloeki can help clarify :)


@mutex = Mutex.new
@condition = ConditionVariable.new
end

# Wait for first lift to happen, otherwise don't wait
# Wait for first lift to happen, up to the barrier timeout since
# the barrier was created.
#
# If timeout is provided in this call, waits up to the smaller of
# the provided wait timeout and the barrier timeout since the
# barrier was created.
Comment on lines +120 to +122
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a private class, and we never actually need to provide this second timeout in production, I'm wondering if we should remove this feature until we need it.

#
# If neither wait timeout is provided in this call nor the
# barrier timeout in the constructor, waits indefinitely until
# the barrier is lifted.
Comment on lines +124 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should remove this feature too, since again, the one user of this class doesn't actually use this functionality ;)

#
# Returns:
# - :lift if the barrier was lifted while this method was waiting
# on it
# - :pass if the barrier had been lifted prior to this method
# being called
# - :timeout if this method waited for the maximum permitted time
# and the barrier has not been lifted
# - :expired if the barrier timeout had elapsed but barrier had
# not yet been lifted
def wait_once(timeout = nil)
# TTAS (Test and Test-And-Set) optimisation
# Since @once only ever goes from false to true, this is semantically valid
return :pass if @once

begin
@mutex.lock

@mutex.synchronize do
return :pass if @once

timeout ||= @timeout
now = Core::Utils::Time.get_time
deadline = [
timeout ? now + timeout : nil,
@deadline,
].compact.min

timeout = deadline ? deadline - now : nil
# workaround for rubocop & steep trying to mangle the code
if timeout && timeout.public_send(:<=, 0)
Comment on lines +152 to +153
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Rubocop is being annoying, I suggest using an inline # rubocop:disable instead of making the code "worse" just to make it happy.

I'm curious about the issue with steep -- maybe it's something we can fix in the type signatures? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

steep refuses to permit timeout <= 0 on account of timeout allegedly being nil (the preceding check for it being truthy doesn't count, apparently).

ret = :expired
return ret
end

# - starting with Ruby 3.2, ConditionVariable#wait returns nil on
# timeout and an integer otherwise
# - before Ruby 3.2, ConditionVariable returns itself
# so we have to rely on @once having been set
if RUBY_VERSION >= '3.2'
lifted = @condition.wait(@mutex, timeout)
else
@condition.wait(@mutex, timeout)
lifted = @once
end
# so we have to rely on @lifted having been set
lifted = if RUBY_VERSION >= '3.2'
!!@condition.wait(@mutex, timeout)
else
@condition.wait(@mutex, timeout)
@lifted
end
Comment on lines 158 to +167
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: To be honest, I'm not sure it's worth keeping a multiple line comment + 2 implementations, rather than just using the one implementation that works on all Rubies ;)


if lifted
:lift
else
@once = true
:timeout
end
ensure
@mutex.unlock
end
end

# Release all current waiters
# Lift the barrier, releasing all current waiters.
#
# Internally we only use Barrier to wait for one event, thus
# in practice there should only ever be one call to +lift+
# per instance of Barrier. But, multiple calls to +lift+ are
# technically permitted; second and subsequent calls have no
# effect.
def lift
@mutex.lock

@once ||= true
@mutex.synchronize do
@once ||= true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still correct? Should this be @lifted?


@condition.broadcast
ensure
@mutex.unlock
@condition.broadcast
end
end
end

Expand Down
212 changes: 212 additions & 0 deletions spec/datadog/core/remote/component/barrier_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# frozen_string_literal: true

require 'spec_helper'
require 'datadog/core/remote/component'

RSpec.describe Datadog::Core::Remote::Component::Barrier do
let(:delay) { 1.0 }
let(:record) { [] }
let(:timeout) { nil }
let(:instance_timeout) { nil }

subject(:barrier) { described_class.new(instance_timeout) }

shared_context('lifter thread') do
let(:thr) do
Thread.new do
loop do
sleep delay
record << :lift
barrier.lift
end
end
end

before do
record
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I believe this can be simplified by making record a let!

thr.run
end

after do
thr.kill
thr.join
end
end

describe '#initialize' do
it 'accepts one argument' do
expect { described_class.new(instance_timeout) }.to_not raise_error
end

it 'accepts zero argument' do
expect { described_class.new }.to_not raise_error
end
end

describe '#lift' do
context 'without waiters' do
it 'does not block' do
record << :one
barrier.lift
record << :two

expect(record).to eq [:one, :two]
end
end

context 'with waiters' do
it 'unblocks waiters' do
waiter_thread = Thread.new(record) do |record|
record << :one
expect(barrier.wait_once).to eq :lift
record << :two
end.run

sleep delay

record << :lift
barrier.lift
waiter_thread.join

expect(record).to eq [:one, :lift, :two]
end
end
end

describe '#wait_once' do
include_context 'lifter thread'

it 'blocks once' do
record << :one
expect(barrier.wait_once).to eq :lift
record << :two

expect(record).to eq [:one, :lift, :two]
end

it 'blocks only once' do
record << :one
expect(barrier.wait_once).to eq :lift
record << :two
expect(barrier.wait_once).to eq :pass
record << :three

expect(record).to eq [:one, :lift, :two, :three]
end

context('with a local timeout') do
let(:timeout) { delay / 4 }

context('shorter than lift') do
it 'unblocks on timeout' do
elapsed = Datadog::Core::Utils::Time.measure do
record << :one
expect(barrier.wait_once(timeout)).to eq :timeout
record << :two
end

expect(record).to eq [:one, :two]

# Should have waited just over the timeout.
expect(elapsed).to be < delay
expect(elapsed).to be < timeout * 1.1
expect(elapsed).to be > timeout
end

context 'when waiting repeatedly' do
context 'and barrier is lifted' do
it 'waits up to barrier timeout' do
record << :one
expect(barrier.wait_once(timeout)).to eq :timeout
record << :two
expect(barrier.wait_once(timeout)).to eq :timeout
record << :three
# Small sleep to make the tests not flaky.
sleep(timeout / 2)
expect(barrier.wait_once(timeout)).to eq :timeout
record << :four
# Due to the added sleep, the fourth wait should always exceed
# the delay, thus the fourth wait should happen while the
# barrier is being lifted.
expect(barrier.wait_once(timeout)).to eq :lift
record << :five

expect(record).to eq [:one, :two, :three, :four, :lift, :five]
end
end

context 'and barrier is not lifted' do
let(:instance_timeout) { delay / 2 }

it 'waits up to barrier timeout' do
record << :one
expect(barrier.wait_once(timeout)).to eq :timeout
record << :two
# This call should time out, but the barrier timeout is
# passed here and subsequent waits will be expired.
expect(barrier.wait_once(timeout)).to eq :timeout
record << :three
expect(barrier.wait_once(timeout)).to eq :expired
record << :four

expect(record).to eq [:one, :two, :three, :four]
end
end
end
end

context('longer than lift') do
let(:timeout) { delay * 2 }

it 'unblocks before timeout' do
elapsed = Datadog::Core::Utils::Time.measure do
record << :one
expect(barrier.wait_once(timeout)).to eq :lift
record << :two
expect(barrier.wait_once(timeout)).to eq :pass
record << :three
end

expect(record).to eq [:one, :lift, :two, :three]

# We should have waited strictly more than the delay time.
expect(elapsed).to be > delay
# But, the only wait should have been for the delay to pass,
# i.e. the elapsed time should be only slightly greater than the
# delay time
expect(elapsed).to be < delay * 1.1
# And, just to verify, this is below the timeout.
expect(elapsed).to be < timeout
end
end

context('and an instance timeout') do
let(:instance_timeout) { delay * 2 }

it 'prefers the local timeout' do
record << :one
expect(barrier.wait_once(timeout)).to eq :timeout
record << :two
expect(barrier.wait_once(timeout)).to eq :timeout
record << :three

expect(record).to eq [:one, :two, :three]
end
end
end

context('with an instance timeout') do
let(:instance_timeout) { delay / 4 }

it 'unblocks on timeout' do
record << :one
expect(barrier.wait_once).to eq :timeout
record << :two
expect(barrier.wait_once).to eq :expired
record << :three

expect(record).to eq [:one, :two, :three]
end
end
end
end
Loading
Loading