Skip to content

Commit

Permalink
feat(logstash): clients logstash output structured metadata support (#…
Browse files Browse the repository at this point in the history
…10899)

**What this PR does / why we need it**:
This PR adds support for Loki Structured Metadata to the logstash output plugin.

**Special notes for your reviewer**:
Given that Structure Metadata is enabled as an experimental feature plugin changes were done in a way that is backward compatible for users not wanted structured metadata.

Co-authored-by: Jared King <kingjs@gmail.com>
Co-authored-by: J Stickler <julie.stickler@grafana.com>
  • Loading branch information
3 people authored Nov 10, 2023
1 parent 42e8148 commit 32f1ec2
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 27 deletions.
13 changes: 8 additions & 5 deletions clients/cmd/logstash/lib/logstash/outputs/loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
## 'An array of fields to map to labels, if defined only fields in this list will be mapped.'
config :include_fields, :validate => :array, :default => [], :required => false

## 'An array of fields to map to structure metadata, if defined only fields in this list will be mapped.'
config :metadata_fields, :validate => :array, :default => [], :required => false

## 'Backoff configuration. Maximum backoff time between retries. Default 300s'
config :max_delay, :validate => :number, :default => 300, :required => false

Expand All @@ -71,7 +74,7 @@ def register
@logger.info("Loki output plugin", :class => self.class.name)

# initialize Queue and Mutex
@entries = Queue.new
@entries = Queue.new
@mutex = Mutex.new
@stop = false

Expand All @@ -94,7 +97,7 @@ def max_batch_size
@mutex.synchronize do
return if @stop
end

e = @entries.deq
return if e.nil?

Expand Down Expand Up @@ -201,13 +204,13 @@ def is_batch_expired
## Receives logstash events
public
def receive(event)
@entries << Entry.new(event, @message_field, @include_fields)
@entries << Entry.new(event, @message_field, @include_fields, @metadata_fields)
end

def close
@entries.close
@mutex.synchronize do
@stop = true
@mutex.synchronize do
@stop = true
end
@batch_wait_thread.join
@batch_size_thread.join
Expand Down
14 changes: 13 additions & 1 deletion clients/cmd/logstash/lib/logstash/outputs/loki/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,19 @@ def to_json
def build_stream(stream)
values = []
stream['entries'].each { |entry|
values.append([entry['ts'].to_s, entry['line']])
if entry.key?('metadata')
sorted_metadata = entry['metadata'].sort.to_h
values.append([
entry['ts'].to_s,
entry['line'],
sorted_metadata
])
else
values.append([
entry['ts'].to_s,
entry['line']
])
end
}
return {
'stream'=>stream['labels'],
Expand Down
18 changes: 17 additions & 1 deletion clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def to_ns(s)
class Entry
include Loki
attr_reader :labels, :entry
def initialize(event,message_field,include_fields)
def initialize(event,message_field,include_fields,metadata_fields)
@entry = {
"ts" => to_ns(event.get("@timestamp")),
"line" => event.get(message_field).to_s
Expand All @@ -21,6 +21,22 @@ def initialize(event,message_field,include_fields)
next if include_fields.length() > 0 and not include_fields.include?(key)
@labels[key] = value.to_s
}

# Unlike include_fields we should skip if no metadata_fields provided
if metadata_fields.length() > 0
@metadata = {}
event.to_hash.each { |key,value|
next if key.start_with?('@')
next if value.is_a?(Hash)
next if metadata_fields.length() > 0 and not metadata_fields.include?(key)
@metadata[key] = value.to_s
}

# Add @metadata to @entry if there was a match
if @metadata.size > 0
@entry.merge!('metadata' => @metadata)
end
end
end
end
end
2 changes: 1 addition & 1 deletion clients/cmd/logstash/logstash-output-loki.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-loki'
s.version = '1.1.0'
s.version = '1.2.0'
s.authors = ['Aditya C S','Cyril Tovena']
s.email = ['aditya.gnu@gmail.com','cyril.tovena@grafana.com']

Expand Down
15 changes: 15 additions & 0 deletions clients/cmd/logstash/loki-metadata.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
input {
generator {
message => "Hello world!"
count => 10
add_field => {cluster=> "foo" namespace=>"bar" trace_id=> "trace_001"}
}
}

output {
loki {
url => "http://localhost:3100"
include_fields => ["cluster"]
metadata_fields => ["trace_id"]
}
}
3 changes: 3 additions & 0 deletions clients/cmd/logstash/loki.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ output {
# If include_fields is set, only fields in this list will be sent to Loki as labels.
#include_fields => ["service","host","app","env"] #default empty array, all labels included.

# If metadata_fields is set, fields in this list will be sent to Loki as structured metadata for the associated log.
#metadata_fields => ["trace_id"] #default empty array, no structure metadata will be included

#batch_wait => 1 ## in seconds #default 1 second

#batch_size => 102400 #bytes #default 102400 bytes
Expand Down
21 changes: 15 additions & 6 deletions clients/cmd/logstash/spec/outputs/loki/entry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,40 @@
{'@path' => '/path/to/file.log'},
},
'host' => '172.0.0.1',
'trace_id' => 'trace_001',
'@timestamp' => Time.now
}
)
}

it 'labels extracted should not contains object and metadata or timestamp' do
entry = Entry.new(event,"message", [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5'})
entry = Entry.new(event,"message", [], [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5', 'trace_id'=>'trace_001'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end

it 'labels extracted should only contain allowlisted labels' do
entry = Entry.new(event, "message", %w[agent foo])
entry = Entry.new(event, "message", %w[agent foo], [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end

it 'labels and structured metadata extracted should only contain allow listed labels and metadata' do
entry = Entry.new(event, "message", %w[agent foo], %w[trace_id])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
expect(entry.entry['metadata']).to eql({'trace_id' => 'trace_001'})
end
end

context 'test batch generation with label order' do
let (:entries) {[
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", []),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", []),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", []),
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], []),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", [], []),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", [], []),

]}
let (:expected) {
Expand Down
55 changes: 49 additions & 6 deletions clients/cmd/logstash/spec/outputs/loki_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@

context 'when adding en entry to the batch' do
let (:simple_loki_config) {{'url' => 'http://localhost:3100'}}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}
let (:lbs) {{"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h}
let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"])}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])}
let (:include_lbs) {{"cluster"=>"us-central1"}.sort.to_h}

it 'should not add empty line' do
plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config)
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [])
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [], [])
expect(plugin.add_entry_to_batch(emptyEntry)).to eql true
expect(plugin.batch).to eql nil
end
Expand Down Expand Up @@ -83,8 +83,51 @@
end
end

context 'when building json from batch to send' do
let (:basic_loki_config) {{'url' => 'http://localhost:3100'}}
let (:basic_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", [], [])}
let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])}
let (:metadata_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id"] }}
let (:metadata_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id"])}
let (:metadata_multi_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id", "user_id"] }}
let (:metadata_multi_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","user_id"=>"user_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id", "user_id"])}

it 'should not include labels or metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(basic_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(basic_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"buzz":"bar","cluster":"us-central1","trace_id":"trace_001"},"values":[["1000000000","foobuzz"]]}]}'
end

it 'should include metadata with no labels' do
plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(metadata_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001"}]]}]}'
end

it 'should include labels with no metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(include_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(include_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz"]]}]}'
end

it 'should include labels with multiple metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_multi_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(metadata_multi_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001","user_id":"user_001"}]]}]}'
end
end

context 'batch expiration' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}

it 'should not expire if empty' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5}))
Expand Down Expand Up @@ -147,13 +190,13 @@
loki.receive(event)
sent.deq
sleep(0.01) # Adding a minimal sleep. In few cases @batch=nil might happen after evaluating for nil
expect(loki.batch).to be_nil
expect(loki.batch).to be_nil
loki.close
end
end

context 'http requests' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}

it 'should send credentials' do
conf = {
Expand Down
4 changes: 3 additions & 1 deletion docs/sources/get-started/labels/structured-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ For more information on how to push logs to Loki via the HTTP endpoint, refer to
Alternatively, you can use the Grafana Agent or Promtail to extract and attach structured metadata to your log lines.
See the [Promtail: Structured metadata stage]({{< relref "../../send-data/promtail/stages/structured_metadata" >}}) for more information.

With Loki version 1.2.0, support for structured metadata has been added to the Logstash output plugin. For more information, see [logstash]({{< relref "../../send-data/logstash/_index.md" >}}).

## Querying structured metadata

Structured metadata is extracted automatically for each returned log line and added to the labels returned for the query.
Expand All @@ -49,7 +51,7 @@ Of course, you can filter by multiple labels of structured metadata at the same
{job="example"} | trace_id="0242ac120002" | user_id="superUser123"
```

Note that since structured metadata is extracted automatically to the results labels, some metric queries might return
Note that since structured metadata is extracted automatically to the results labels, some metric queries might return
an error like `maximum of series (50000) reached for a single query`. You can use the [Keep]({{< relref "../../query/log_queries#keep-labels-expression" >}}) and [Drop]({{< relref "../../query/log_queries#drop-labels-expression" >}}) stages to filter out labels that you don't need.
For example:

Expand Down
24 changes: 18 additions & 6 deletions docs/sources/send-data/logstash/_index.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
---
title: Logstash plugin
menuTitle:
menuTitle:
description: Instructions to install, configure, and use the Logstash plugin to send logs to Loki.
aliases:
aliases:
- ../send-data/a/logstash/
weight: 800
---
Expand Down Expand Up @@ -61,9 +61,11 @@ output {
[tenant_id => string | default = nil | required=false]
[message_field => string | default = "message" | required=false]
[include_fields => array | default = [] | required=false]
[metadata_fields => array | default = [] | required=false]
[batch_wait => number | default = 1(s) | required=false]
[batch_size => number | default = 102400(bytes) | required=false]
Expand Down Expand Up @@ -112,8 +114,6 @@ Contains a `message` and `@timestamp` fields, which are respectively used to for
All other fields (except nested fields) will form the label set (key value pairs) attached to the log line. [This means you're responsible for mutating and dropping high cardinality labels](/blog/2020/04/21/how-labels-in-loki-can-make-log-queries-faster-and-easier/) such as client IPs.
You can usually do so by using a [`mutate`](https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html) filter.

**Note:** In version 1.1.0 and greater of this plugin you can also specify a list of labels to allowlist via the `include_fields` configuration.

For example the configuration below :

```conf
Expand Down Expand Up @@ -194,6 +194,13 @@ filter {
}
```

### Version Notes

Important notes regarding versions:

- Version 1.1.0 and greater of this plugin you can also specify a list of labels to allow list via the `include_fields` configuration.
- Version 1.2.0 and greater of this plugin you can also specify structured metadata via the `metadata_fields` configuration.

### Configuration Properties

#### url
Expand All @@ -216,6 +223,10 @@ Message field to use for log lines. You can use logstash key accessor language t

An array of fields which will be mapped to labels and sent to Loki, when this list is configured **only** these fields will be sent, all other fields will be ignored.

#### metadata_fields

An array of fields which will be mapped to [structured metadata]({{< relref "../../get-started/labels/structured-metadata.md" >}}) and sent to Loki for each log line

#### batch_wait

Interval in seconds to wait before pushing a batch of records to Loki. This means even if the [batch size](#batch_size) is not reached after `batch_wait` a partial batch will be sent, this is to ensure freshness of the data.
Expand Down Expand Up @@ -246,7 +257,7 @@ Loki is a multi-tenant log storage platform and all requests sent must include a

Specify a pair of client certificate and private key with `cert` and `key` if a reverse proxy with client certificate verification is configured in front of Loki. `ca_cert` can also be specified if the server uses custom certificate authority.

### insecure_skip_verify
#### insecure_skip_verify

A flag to disable server certificate verification. By default it is set to `false`.

Expand Down Expand Up @@ -286,6 +297,7 @@ output {
max_delay => 500
message_field => "message"
include_fields => ["container_name","namespace","pod","host"]
metadata_fields => ["pod"]
}
# stdout { codec => rubydebug }
}
Expand Down

0 comments on commit 32f1ec2

Please sign in to comment.