Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update aws sdk v3 & support fluentd 1.x #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: 2.1

orbs:
ruby-orbs: sue445/ruby-orbs@volatile

jobs:
test:
docker:
- image: cimg/ruby:3.0
environment:
AWS_REGION: ap-northeast-1
AWS_ACCESS_KEY_ID: dummy
AWS_SECRET_ACCESS_KEY: dummy
- image: amazon/dynamodb-local
environment:
AWS_REGION: ap-northeast-1
steps:
- checkout
- ruby-orbs/bundle-install:
gemspec_name: fluent-plugin-dynamodb-add
with_gemfile_lock: false

- run: bundle exec rake test

workflows:
test:
jobs:
- test
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
source 'https://rubygems.org'

gemspec

gem 'rexml'
4 changes: 3 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
require "bundler/gem_tasks"
require 'rake/testtask'

Rake::Task[:release].clear

Rake::TestTask.new(:test) do |test|
test.libs << 'lib' << 'test'
test.test_files = FileList['test/test_*.rb']
test.pattern = 'test/**/test_*.rb'
test.verbose = true
end

Expand Down
10 changes: 6 additions & 4 deletions fluent-plugin-dynamodb-add.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]

spec.add_development_dependency "bundler", "~> 1.7"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_dependency "fluentd", "~> 0.10.0"
spec.add_dependency "aws-sdk", ">= 1.56.0", "< 2.0.0"
spec.add_development_dependency "bundler"
spec.add_development_dependency "rake"
spec.add_development_dependency "test-unit"

spec.add_dependency "fluentd", ">= 1", "< 2"
spec.add_dependency "aws-sdk-dynamodb"
end
151 changes: 82 additions & 69 deletions lib/fluent/plugin/out_dynamodb_add.rb
Original file line number Diff line number Diff line change
@@ -1,87 +1,100 @@
module Fluent
class DynamodbAdd < Fluent::Output
Fluent::Plugin.register_output('dynamodb_add', self)
require 'aws-sdk-dynamodb'
require 'fluent/plugin/output'

unless method_defined?(:log)
define_method(:log) { $log }
end
class Fluent::Plugin::DynamodbAdd < Fluent::Plugin::Output
Fluent::Plugin.register_output('dynamodb_add', self)

config_param :count_key, :string
config_param :dynamo_count_key, :string
config_param :table_name, :string
config_param :use_iam_role, :bool, :default => false
config_param :aws_key_id, :string, :default => nil
config_param :aws_sec_key, :string, :default => nil
config_param :endpoint, :string, :default => nil
config_param :hash_key, :string, :default => nil
config_param :hash_key_delimiter, :string, :default => ":"
config_param :add_hash_key_prefix, :string, :default => nil
config_param :range_key, :string, :default => nil
config_param :set_timestamp, :string, :default => nil

def initialize
super
require 'aws-sdk-v1'
end
helpers :event_emitter
helpers :compat_parameters

config_param :count_key, :string
config_param :dynamo_count_key, :string
config_param :table_name, :string
config_param :use_iam_role, :bool, :default => false
config_param :aws_key_id, :string, :default => nil
config_param :aws_sec_key, :string, :default => nil
config_param :region, :string, :default => nil
config_param :endpoint, :string, :default => nil
config_param :hash_key, :string, :default => nil
config_param :hash_key_delimiter, :string, :default => ":"
config_param :add_hash_key_prefix, :string, :default => nil
config_param :range_key, :string, :default => nil
config_param :set_timestamp, :string, :default => nil

def initialize
super
end

def configure(conf)
super
def configure(conf)
compat_parameters_convert(conf)

unless use_iam_role
[:aws_key_id, :aws_sec_key].each do |name|
unless self.instance_variable_get("@#{name}")
raise ConfigError, "'#{name}' is required"
end
super

unless use_iam_role
[:aws_key_id, :aws_sec_key].each do |name|
unless self.instance_variable_get("@#{name}")
raise ConfigError, "'#{name}' is required"
end
end
@hash_key = hash_key.split(/\s*,\s*/)
end
@hash_key = hash_key.split(/\s*,\s*/)
end

def start
super
if use_iam_role
AWS.config(:credential_provider => AWS::Core::CredentialProviders::EC2Provider.new)
else
AWS.config(:access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key)
end
def start
super

AWS.config(:dynamo_db_endpoint => @endpoint) if @endpoint
options = {}

@dynamo_db = AWS::DynamoDB.new
@table = @dynamo_db.tables[table_name]
@table.load_schema
unless use_iam_role
options[:access_key_id] = @aws_key_id
options[:secret_access_key] = @aws_sec_key
end

def emit(tag, es, chain)
chain.next
es.each do |time, record|
hash_key = create_key(record)
next unless hash_key || record[@count_key]

if @range_key
next unless record[@range_key]
item = @table.items[hash_key, record[@range_key]]
else
item = @table.items[hash_key]
end
item.attributes.update {|u|
u.add @dynamo_count_key => record[@count_key]
if @set_timestamp
u.set @set_timestamp => Time.now.to_i
end
}
options[:region] = region
options[:endpoint] = endpoint

client = Aws::DynamoDB::Client.new(options)

resource = Aws::DynamoDB::Resource.new(client: client)
@table = resource.table(table_name)

@dynamo_hash_key = @table.key_schema.find{|e| e.key_type == "HASH" }.attribute_name
@dynamo_range_key = @table.key_schema.find{|e| e.key_type == "RANGE" }&.attribute_name
end

def process(tag, es)
es.each do |time, record|
hash_key = create_key(record)
next unless hash_key || record[@count_key]

key = { @dynamo_hash_key => hash_key }

if @range_key
next unless record[@range_key]
key[@dynamo_range_key] = record[@range_key]
end

@table.update_item({
key: key,
attribute_updates: {
@dynamo_count_key => {
value: record[@count_key],
action: "ADD"
},
},
})
end
end

private
def create_key(record)
key_array = []
key_array << @add_hash_key_prefix if @add_hash_key_prefix
@hash_key.each do |h|
return nil unless record[h]
key_array << record[h]
end
key_array.join(@hash_key_delimiter)
private

def create_key(record)
key_array = []
key_array << @add_hash_key_prefix if @add_hash_key_prefix
@hash_key.each do |h|
return nil unless record[h]
key_array << record[h]
end
key_array.join(@hash_key_delimiter)
end
end
8 changes: 8 additions & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
require "test-unit"
require "fluent/test"
require "fluent/test/driver/output"
require "fluent/test/helpers"

Test::Unit::TestCase.include(Fluent::Test::Helpers)
Test::Unit::TestCase.extend(Fluent::Test::Helpers)
Loading