Skip to content

Commit

Permalink
Bunny::Session#recovery_completed (with a block) => #after_recovery_c…
Browse files Browse the repository at this point in the history
…ompleted
  • Loading branch information
michaelklishin committed Feb 19, 2019
1 parent 4d7cb5b commit 7fb5116
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 23 deletions.
10 changes: 7 additions & 3 deletions lib/bunny/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class Session
# @option connection_string_or_opts [Integer] :read_timeout (30) TCP socket read timeout in seconds. If heartbeats are disabled this will be ignored.
# @option connection_string_or_opts [Integer] :write_timeout (30) TCP socket write timeout in seconds.
# @option connection_string_or_opts [Proc] :hosts_shuffle_strategy a callable that reorders a list of host strings, defaults to Array#shuffle
# @option connection_string_or_opts [Proc] :recovery_completed a callable that will be called when a network recovery is performed
# @option connection_string_or_opts [Proc] : a callable that will be called when a network recovery is performed
# @option connection_string_or_opts [Logger] :logger The logger. If missing, one is created using :log_file and :log_level.
# @option connection_string_or_opts [IO, String] :log_file The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.
Expand Down Expand Up @@ -751,7 +751,7 @@ def recover_from_network_failure
end

recover_channels
recovery_completed
notify_of_recovery_completion
end
rescue HostListDepleted
reset_address_index
Expand Down Expand Up @@ -806,8 +806,12 @@ def recover_channels
end
end

def after_recovery_completed(&block)
@recovery_completed = block
end

# @private
def recovery_completed
def notify_of_recovery_completion
@recovery_completed.call if @recovery_completed
end

Expand Down
34 changes: 14 additions & 20 deletions spec/higher_level_api/integration/connection_recovery_spec.rb
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
require "spec_helper"
require "rabbitmq/http/client"

require "bunny/concurrent/condition"

describe "Connection recovery" do
let(:http_client) { RabbitMQ::HTTP::Client.new("http://127.0.0.1:15672") }
let(:logger) { Logger.new($stderr).tap {|logger| logger.level = ENV["BUNNY_LOG_LEVEL"] || Logger::WARN } }
let(:recovery_interval) { 0.2 }

let(:recovery_completed) { double("recovery_callback", call: nil) }

let(:c_with_callback) do
Bunny.new(network_recovery_interval: recovery_interval,
recover_from_connection_close: true,
logger: logger,
recovery_completed: recovery_completed)
end

it "reconnects after grace period" do
with_open do |c|
close_all_connections!
Expand Down Expand Up @@ -49,17 +42,19 @@
end
end

it "calls the network recovery callback" do
with_open(c_with_callback) do |c|
_ = c.create_channel
_ = c.create_channel
sleep 1.5
it "provides a recovery completion callback" do
with_open do |c|
latch = Bunny::Concurrent::Condition.new
c.after_recovery_completed do
latch.notify
end

ch = c.create_channel
sleep 1.0
close_all_connections!
sleep 0.5
poll_until { channels.count == 2 }
poll_until { c.open? && ch.open? }
poll_until { latch.none_threads_waiting? }
end

expect(recovery_completed).to have_received(:call).exactly(1).times
end

it "recovers channels (with multiple hosts)" do
Expand Down Expand Up @@ -292,7 +287,6 @@
destination = ch.fanout("destination.exchange.recovery.example", auto_delete: true)

source2 = ch2.fanout("source.exchange.recovery.example", no_declare: true)
destination2 = ch2.fanout("destination.exchange.recovery.example", no_declare: true)

destination.bind(source)

Expand Down Expand Up @@ -448,7 +442,7 @@ def with_open(c = Bunny.new(network_recovery_interval: recovery_interval,
c.start
block.call(c)
ensure
c.close
c.close(false) rescue nil
end

def with_open_multi_host(&block)
Expand Down

0 comments on commit 7fb5116

Please sign in to comment.