Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor available #2532

Merged
merged 4 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def initialize(sender, server, failure:, connection_manager:)

attr_reader :name, :host, :port, :weight, :standby, :state
attr_reader :sockaddr # used by on_udp_heatbeat_response_recv
attr_reader :failure, :available # for test
attr_reader :failure # for test

def validate_host_resolution!
resolved_host
Expand Down Expand Up @@ -590,7 +590,7 @@ def verify_connection
end

def establish_connection(sock, ri)
while available? && ri.state != :established
while ri.state != :established
begin
# TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly.
# We need rewrite around here using new socket/server plugin helper.
Expand Down Expand Up @@ -630,10 +630,6 @@ def establish_connection(sock, ri)
end

def send_data_actual(sock, tag, chunk)
unless available?
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end

option = { 'size' => chunk.size, 'compressed' => @compress }
option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response

Expand All @@ -659,6 +655,10 @@ def send_data(tag, chunk)
connect(nil, require_ack: @sender.require_ack_response) do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)

if ri.state != :established
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
end

send_data_actual(sock, tag, chunk)
Expand Down Expand Up @@ -743,7 +743,7 @@ def resolve_dns!

def tick
now = Time.now.to_f
if !@available
unless available?
if @failure.hard_timeout?(now)
@failure.clear
end
Expand All @@ -752,7 +752,7 @@ def tick

if @failure.hard_timeout?(now)
@log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, hard_timeout: true
@available = false
disable!
@resolved_host = nil # expire cached host
@failure.clear
return true
Expand All @@ -762,7 +762,7 @@ def tick
phi = @failure.phi(now)
if phi > @sender.phi_threshold
@log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, phi: phi, phi_threshold: @sender.phi_threshold
@available = false
disable!
@resolved_host = nil # expire cached host
@failure.clear
return true
Expand All @@ -774,7 +774,7 @@ def tick
def heartbeat(detect=true)
now = Time.now.to_f
@failure.add(now)
if detect && !@available && @failure.sample_size > @sender.recover_sample_size
if detect && !available? && @failure.sample_size > @sender.recover_sample_size
@available = true
@log.warn "recovered forwarding server '#{@name}'", host: @host, port: @port
true
Expand Down
10 changes: 5 additions & 5 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,14 @@ def read_ack_from_sock(sock, unpacker)
node = d.instance.nodes.first
stub(node.failure).phi { raise 'Should not be called' }
node.tick
assert_equal node.available, true
assert_true node.available?
end

test 'phi_failure_detector enabled' do
@d = d = create_driver(CONFIG + %[phi_failure_detector true \n phi_threshold 0])
node = d.instance.nodes.first
node.tick
assert_equal node.available, false
assert_false node.available?
end

test 'require_ack_response is disabled in default' do
Expand Down Expand Up @@ -555,7 +555,7 @@ def read_ack_from_sock(sock, unpacker)
{"a" => 2}
]
target_input_driver.end_if{ d.instance.rollback_count > 0 }
target_input_driver.end_if{ !node.available }
target_input_driver.end_if{ !node.available? }
target_input_driver.run(expect_records: 2, timeout: 25) do
d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false, flush: false) do
delayed_commit_timeout_value = d.instance.delayed_commit_timeout
Expand Down Expand Up @@ -600,7 +600,7 @@ def read_ack_from_sock(sock, unpacker)
{"a" => 2}
]
target_input_driver.end_if{ d.instance.rollback_count > 0 }
target_input_driver.end_if{ !node.available }
target_input_driver.end_if{ !node.available? }
target_input_driver.run(expect_records: 2, timeout: 25) do
d.run(default_tag: 'test', timeout: 20, wait_flush_completion: false, shutdown: false, flush: false) do
delayed_commit_timeout_value = d.instance.delayed_commit_timeout
Expand Down Expand Up @@ -840,7 +840,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG

stub(node.failure).phi { raise 'Should not be called' }
node.tick
assert_equal node.available, true
assert_true node.available?
end

test 'heartbeat_type_udp' do
Expand Down