Skip to content

Commit

Permalink
update plugin to use new shutdown stop semantics
Browse files Browse the repository at this point in the history
Fixes #27.
  • Loading branch information
talevy committed Sep 18, 2015
1 parent 2110642 commit cde88a4
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 24 deletions.
55 changes: 31 additions & 24 deletions lib/logstash/inputs/twitter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "logstash/timestamp"
require "logstash/util"
require "logstash/json"
require "stud/interval"

# Read events from the twitter streaming api.
class LogStash::Inputs::Twitter < LogStash::Inputs::Base
Expand Down Expand Up @@ -86,41 +87,47 @@ def run(queue)
@logger.info("Starting twitter tracking", :keywords => @keywords)
begin
@client.filter(:track => @keywords.join(",")) do |tweet|
return if stop?
if tweet.is_a?(Twitter::Tweet)
@logger.debug? && @logger.debug("Got tweet", :user => tweet.user.screen_name, :text => tweet.text)
if @full_tweet
event = LogStash::Event.new(LogStash::Util.stringify_symbols(tweet.to_hash))
event.timestamp = LogStash::Timestamp.new(tweet.created_at)
else
event = LogStash::Event.new(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at),
"message" => tweet.full_text,
"user" => tweet.user.screen_name,
"client" => tweet.source,
"retweeted" => tweet.retweeted?,
"source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}"
)
event["in-reply-to"] = tweet.in_reply_to_status_id if tweet.reply?
unless tweet.urls.empty?
event["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s)
end
end

# Work around bugs in JrJackson. The standard serializer won't work till we upgrade
event["in-reply-to"] = nil if event["in-reply-to"].is_a?(Twitter::NullObject)
event = from_tweet(tweet)
decorate(event)
queue << event
end
end # client.filter
rescue LogStash::ShutdownSignal
return
rescue Twitter::Error::TooManyRequests => e
@logger.warn("Twitter too many requests error, sleeping for #{e.rate_limit.reset_in}s")
sleep(e.rate_limit.reset_in)
Stud.stoppable_sleep(e.rate_limit.reset_in) { stop? }
retry
rescue => e
@logger.warn("Twitter client error", :message => e.message, :exception => e, :backtrace => e.backtrace)
retry
end
end # def run

private
def from_tweet(tweet)
@logger.debug? && @logger.debug("Got tweet", :user => tweet.user.screen_name, :text => tweet.text)
if @full_tweet
event = LogStash::Event.new(LogStash::Util.stringify_symbols(tweet.to_hash))
event.timestamp = LogStash::Timestamp.new(tweet.created_at)
else
event = LogStash::Event.new(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.new(tweet.created_at),
"message" => tweet.full_text,
"user" => tweet.user.screen_name,
"client" => tweet.source,
"retweeted" => tweet.retweeted?,
"source" => "http://twitter.com/#{tweet.user.screen_name}/status/#{tweet.id}"
)
event["in-reply-to"] = tweet.in_reply_to_status_id if tweet.reply?
unless tweet.urls.empty?
event["urls"] = tweet.urls.map(&:expanded_url).map(&:to_s)
end
end

# Work around bugs in JrJackson. The standard serializer won't work till we upgrade
event["in-reply-to"] = nil if event["in-reply-to"].is_a?(Twitter::NullObject)

event
end
end # class LogStash::Inputs::Twitter
2 changes: 2 additions & 0 deletions logstash-input-twitter.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ Gem::Specification.new do |s|
# Gem dependencies
s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0'
s.add_runtime_dependency 'twitter', ['5.12.0']
s.add_runtime_dependency 'stud', '>= 0.0.22', '< 0.1'

s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'logstash-codec-plain'
end
24 changes: 24 additions & 0 deletions spec/inputs/twitter_spec.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,29 @@
require "logstash/devutils/rspec/spec_helper"
require 'logstash/inputs/twitter'
require 'twitter'

class MockClient
def filter(options)
loop { yield }
end
end

describe LogStash::Inputs::Twitter do
context "when told to shutdown" do
before :each do
allow(Twitter::Streaming::Client).to receive(:new).and_return(MockClient.new)
end

it_behaves_like "an interruptible input plugin" do
let(:config) do
{
'consumer_key' => 'foo',
'consumer_secret' => 'foo',
'oauth_token' => 'foo',
'oauth_token_secret' => 'foo',
'keywords' => ['foo', 'bar']
}
end
end
end
end

0 comments on commit cde88a4

Please sign in to comment.