Skip to content

Commit

Permalink
FluentD loki plugin: add support for bearer_token_file parameter (#2739)
Browse files Browse the repository at this point in the history
* fluentd plugin: improve param descriptions

* fluentd plugin: make code comments more correct

* fluentd plugin: rename things for clarity

* fluentd plugin: add bearer_token_file parameter

* fluentd plugin: read and set bearer token

* fluentd plugin: improve error message

* fluentd-plugin: bump version

* fluentd plugin: fix header, log success

* fluentd plugin: improve log messages
  • Loading branch information
jgehrcke authored Oct 16, 2020
1 parent 9d2f827 commit 4e661cd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 26 deletions.
2 changes: 1 addition & 1 deletion cmd/fluentd/fluent-plugin-grafana-loki.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ $LOAD_PATH.push File.expand_path('lib', __dir__)

Gem::Specification.new do |spec|
spec.name = 'fluent-plugin-grafana-loki'
spec.version = '1.2.14'
spec.version = '1.2.15'
spec.authors = %w[woodsaj briangann cyriltovena]
spec.email = ['awoods@grafana.com', 'brian@grafana.com', 'cyril.tovena@grafana.com']

Expand Down
69 changes: 44 additions & 25 deletions cmd/fluentd/lib/fluent/plugin/out_loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,24 @@ class LogPostError < StandardError; end

DEFAULT_BUFFER_TYPE = 'memory'

desc 'url of loki server'
desc 'Loki API base URL'
config_param :url, :string, default: 'https://logs-prod-us-central1.grafana.net'

desc 'BasicAuth credentials'
desc 'Authentication: basic auth credentials'
config_param :username, :string, default: nil
config_param :password, :string, default: nil, secret: true

desc 'Client certificate'
desc 'Authentication: Authorization header with Bearer token scheme'
config_param :bearer_token_file, :string, default: nil

desc 'TLS: parameters for presenting a client certificate'
config_param :cert, :string, default: nil
config_param :key, :string, default: nil

desc 'TLS'
desc 'TLS: CA certificate file for server certificate verification'
config_param :ca_cert, :string, default: nil

desc 'Disable server certificate verification'
desc 'TLS: disable server certificate verification'
config_param :insecure_tls, :bool, default: false

desc 'Loki tenant id'
Expand Down Expand Up @@ -80,7 +83,7 @@ def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity
super
@uri = URI.parse(@url + '/loki/api/v1/push')
unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS)
raise Fluent::ConfigError, 'url parameter must be valid HTTP'
raise Fluent::ConfigError, 'URL parameter must have HTTP/HTTPS scheme'
end

@record_accessors = {}
Expand All @@ -96,24 +99,42 @@ def configure(conf) # rubocop:disable Metrics/CyclomaticComplexity
@remove_keys_accessors.push(record_accessor_create(key))
end

if ssl_cert?
load_ssl
validate_ssl_key
# If configured, load and validate client certificate (and corresponding key)
if client_cert_configured?
load_client_cert
validate_client_cert_key
end

raise "bearer_token_file #{@bearer_token_file} not found" if !@bearer_token_file.nil? && !File.exist?(@bearer_token_file)

@auth_token_bearer = nil
if !@bearer_token_file.nil?
if !File.exist?(@bearer_token_file)
raise "bearer_token_file #{@bearer_token_file} not found"
end

# Read the file once, assume long-lived authentication token.
@auth_token_bearer = File.read(@bearer_token_file)
if @auth_token_bearer.empty?
raise "bearer_token_file #{@bearer_token_file} is empty"
end
log.info "will use Bearer token from bearer_token_file #{@bearer_token_file} in Authorization header"
end


raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert)
end

def ssl_cert?
def client_cert_configured?
!@key.nil? && !@cert.nil?
end

def load_ssl
def load_client_cert
@cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
@key = OpenSSL::PKey.read(File.read(@key)) if @key
end

def validate_ssl_key
def validate_client_cert_key
if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
raise "Unsupported private key type #{key.class}"
end
Expand All @@ -123,13 +144,6 @@ def multi_workers_ready?
true
end

def http_opts(uri)
opts = {
use_ssl: uri.scheme == 'https'
}
opts
end

# flush a chunk to loki
def write(chunk)
# streams by label
Expand All @@ -141,7 +155,10 @@ def write(chunk)
# add ingest path to loki url
res = loki_http_request(body, tenant)

return if res.is_a?(Net::HTTPSuccess)
if res.is_a?(Net::HTTPSuccess)
log.debug "POST request was responded to with status code #{res.code}"
return
end

res_summary = "#{res.code} #{res.message} #{res.body}"
log.warn "failed to write post to #{@uri} (#{res_summary})"
Expand All @@ -151,27 +168,28 @@ def write(chunk)
raise(LogPostError, res_summary) if res.is_a?(Net::HTTPTooManyRequests) || res.is_a?(Net::HTTPServerError)
end

def ssl_opts(uri)
def http_request_opts(uri)
opts = {
use_ssl: uri.scheme == 'https'
}

# Disable server TLS certificate verification
# Optionally disable server server certificate verification.
if @insecure_tls
opts = opts.merge(
verify_mode: OpenSSL::SSL::VERIFY_NONE
)
end

# Verify client TLS certificate
# Optionally present client certificate
if !@cert.nil? && !@key.nil?
opts = opts.merge(
cert: @cert,
key: @key
)
end

# Specify custom certificate authority
# For server certificate verification: set custom CA bundle.
# Only takes effect when `insecure_tls` is not set.
unless @ca_cert.nil?
opts = opts.merge(
ca_file: @ca_cert
Expand All @@ -194,11 +212,12 @@ def loki_http_request(body, tenant)
@uri.request_uri
)
req.add_field('Content-Type', 'application/json')
req.add_field('Authorization', "Bearer #{@auth_token_bearer}") if !@auth_token_bearer.nil?
req.add_field('X-Scope-OrgID', tenant) if tenant
req.body = Yajl.dump(body)
req.basic_auth(@username, @password) if @username

opts = ssl_opts(@uri)
opts = http_request_opts(@uri)

msg = "sending #{req.body.length} bytes to loki"
msg += " (tenant: \"#{tenant}\")" if tenant
Expand Down

0 comments on commit 4e661cd

Please sign in to comment.