Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
Added step back when Amazon complains about throttling, added paramet…
Browse files Browse the repository at this point in the history
…er log_group_prefix
  • Loading branch information
davidwestlund authored Sep 5, 2016
1 parent 50d8a58 commit e3e9828
Showing 1 changed file with 51 additions and 13 deletions.
64 changes: 51 additions & 13 deletions lib/logstash/inputs/cloudwatch_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ class LogStash::Inputs::CloudWatch_Logs < LogStash::Inputs::Base
# Value is in seconds.
config :interval, :validate => :number, :default => 60

# Decide of log_group is a prefix or an absolute name
config :log_group_prefix, :validate => :boolean, :default => false


# def register
public
def register
Expand All @@ -59,29 +63,54 @@ def run(queue)

# def list_new_streams
public
def list_new_streams(token = nil, objects = [])
def list_new_streams()
if @log_group_prefix
log_groups = @cloudwatch.describe_log_groups(log_group_name_prefix: @log_group)
groups = log_groups.log_groups.map {|n| n.log_group_name}
else
groups = [@log_group]
end
objects = []
for log_group in groups
objects.concat(list_new_streams_for_log_group(log_group))
end
objects
end

# def list_new_streams_for_log_group
public
def list_new_streams_for_log_group(log_group, token = nil, objects = [], stepback=0)
params = {
:log_group_name => @log_group,
:order_by => "LastEventTime",
:descending => false
:log_group_name => log_group,
:order_by => "LastEventTime",
:descending => false
}

@logger.debug("CloudWatch Logs for log_group #{log_group}")

if token != nil
params[:next_token] = token
end

streams = @cloudwatch.describe_log_streams(params)
begin
streams = @cloudwatch.describe_log_streams(params)
rescue Aws::CloudWatchLogs::Errors::ThrottlingException
@logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60)
sleep(2 ** stepback * 60)
stepback += 1
@logger.debug("CloudWatch Logs repeating list_new_streams again with token", :token => token)
return list_new_streams_for_log_group(log_group, token=token, objects=objects, stepback=stepback)
end

objects.push(*streams.log_streams)
if streams.next_token == nil
@logger.debug("CloudWatch Logs hit end of tokens for streams")
objects
else
@logger.debug("CloudWatch Logs calling list_new_streams again on token", :token => streams.next_token)
list_new_streams(streams.next_token, objects)
list_new_streams_for_log_group(log_group, streams.next_token, objects)
end

end # def list_new_streams
end # def list_new_streams_for_log_group

# def process_log
private
Expand All @@ -90,7 +119,7 @@ def process_log(queue, log, stream)
@codec.decode(log.message.to_str) do |event|
event[LogStash::Event::TIMESTAMP] = parse_time(log.timestamp)
event["[cloudwatch][ingestion_time]"] = parse_time(log.ingestion_time)
event["[cloudwatch][log_group]"] = @log_group
event["[cloudwatch][log_group]"] = stream.arn.split(/:/)[6]
event["[cloudwatch][log_stream]"] = stream.log_stream_name
decorate(event)

Expand Down Expand Up @@ -128,17 +157,17 @@ def process_group(queue)

# def process_log_stream
private
def process_log_stream(queue, stream, last_read, current_window, token = nil)
def process_log_stream(queue, stream, last_read, current_window, token = nil, stepback=0)
@logger.debug("CloudWatch Logs processing stream",
:log_stream => stream.log_stream_name,
:log_group => @log_group,
:log_group => stream.arn.split(":")[6],
:lastRead => last_read,
:currentWindow => current_window,
:token => token
)

params = {
:log_group_name => @log_group,
:log_group_name => stream.arn.split(":")[6],
:log_stream_name => stream.log_stream_name,
:start_from_head => true
}
Expand All @@ -147,7 +176,16 @@ def process_log_stream(queue, stream, last_read, current_window, token = nil)
params[:next_token] = token
end

logs = @cloudwatch.get_log_events(params)

begin
logs = @cloudwatch.get_log_events(params)
rescue Aws::CloudWatchLogs::Errors::ThrottlingException
@logger.debug("CloudWatch Logs stepping back ", :stepback => 2 ** stepback * 60)
sleep(2 ** stepback * 60)
stepback += 1
@logger.debug("CloudWatch Logs repeating process_log_stream again with token", :token => token)
return process_log_stream(queue, stream, last_read, current_window, token, stepback)
end

logs.events.each do |log|
if log.ingestion_time > last_read
Expand Down

0 comments on commit e3e9828

Please sign in to comment.