Skip to content

Commit

Permalink
Add integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Sep 17, 2015
1 parent d8feb79 commit aef94c5
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 3 deletions.
11 changes: 9 additions & 2 deletions lib/logstash/outputs/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ def register
@codec.on_event(&method(:publish))
end

def receive(event)
return unless output?(event)

@codec.encode(event)
rescue StandardError => e
@logger.warn("Error encoding event", :exception => e, :event => event)
end

def publish(event, message)
raise ArgumentError, "No exchange set in HareInfo!!!" unless @hare_info.exchange
@hare_info.exchange.publish(message, :routing_key => event.sprintf(@key), :properties => { :persistent => @persistent })
Expand All @@ -43,8 +51,7 @@ def publish(event, message)
retry
end

def stop
super
def close
close_connection
end
end
Expand Down
84 changes: 83 additions & 1 deletion spec/outputs/rabbitmq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,86 @@
end
end
end
end
end


describe "with a live server", :integration => true do
let(:klass) { LogStash::Outputs::RabbitMQ }
let(:exchange) { "myexchange" }
let(:exchange_type) { "topic" }
let(:default_plugin_config) {
{
"host" => "127.0.0.1",
"exchange" => exchange,
"exchange_type" => exchange_type,
"key" => "foo"
}
}
let(:config) { default_plugin_config }
let(:instance) { klass.new(config) }
let(:hare_info) { instance.instance_variable_get(:@hare_info) }

# Spawn a connection in the bg and wait up (n) seconds
def spawn_and_wait(instance)
instance.register

20.times do
instance.connected? ? break : sleep(0.1)
end

# Extra time to make sure the output can attach
sleep 1
end

let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) }
let(:test_channel) { test_connection.create_channel }
let(:test_queue) {
test_channel.queue("testq", :auto_delete => true).bind(exchange, :key => config["key"])
}

before do
# Materialize the instance in the current thread to prevent dupes
# If you use multiple threads with lazy evaluation weird stuff happens
instance
spawn_and_wait(instance)

test_channel # Start up the test client as well
test_queue
end

after do
instance.close()
test_channel.close
test_connection.close
end

context "using defaults" do
it "should start, connect, and stop cleanly" do
expect(instance.connected?).to be_truthy
end

it "should close cleanly" do
instance.close
expect(instance.connected?).to be_falsey
end
end

describe "sending a message with an exchange specified" do
let(:message) { LogStash::Event.new(:message => "Foo Message") }

it "should process the message" do
@received = nil
test_queue.subscribe do |metadata,payload|
@received = payload
end

instance.receive(message)

until @received
sleep 1
end

expect(@received).to eql(message.to_s)
end
end
end

0 comments on commit aef94c5

Please sign in to comment.