Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash cpu usage fix #2607

Merged
merged 5 commits into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 55 additions & 41 deletions cmd/logstash/lib/logstash/outputs/loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
require "logstash/outputs/loki/batch"
require "logstash/namespace"
require 'net/http'
require 'concurrent-edge'
require 'time'
require 'uri'
require 'json'
Expand Down Expand Up @@ -68,10 +67,10 @@ def register

@logger.info("Loki output plugin", :class => self.class.name)

# initialize channels
@Channel = Concurrent::Channel
@entries = @Channel.new
@stop = @Channel.new
# initialize Queue and Mutex
@entries = Queue.new
@mutex = Mutex.new
@stop = false

# create nil batch object.
@batch = nil
Expand All @@ -82,7 +81,52 @@ def register
validate_ssl_key
end

@Channel.go{run()}
# start batch_max_wait and batch_max_size threads
@batch_wait_thread = Thread.new{max_batch_wait()}
@batch_size_thread = Thread.new{max_batch_size()}
end

def max_batch_size
loop do
@mutex.synchronize do
return if @stop
end

e = @entries.deq
return if e.nil?

@mutex.synchronize do
if !add_entry_to_batch(e)
@logger.debug("Max batch_size is reached. Sending batch to loki")
send(@batch)
@batch = Batch.new(e)
end
end
end
end

def max_batch_wait
# minimum wait frequency is 10 milliseconds
min_wait_checkfrequency = 1/100
max_wait_checkfrequency = @batch_wait
if max_wait_checkfrequency < min_wait_checkfrequency
max_wait_checkfrequency = min_wait_checkfrequency
end

loop do
@mutex.synchronize do
return if @stop
end

sleep(max_wait_checkfrequency)
if is_batch_expired
@mutex.synchronize do
@logger.debug("Max batch_wait time is reached. Sending batch to loki")
send(@batch)
@batch = nil
end
end
end
end

def ssl_cert?
Expand Down Expand Up @@ -128,39 +172,6 @@ def ssl_opts(uri)
opts
end

def run()
# minimum wait frequency is 1 millisecond
min_wait_checkfrequency = 1/100
max_wait_checkfrequency = @batch_wait / 10
if max_wait_checkfrequency < min_wait_checkfrequency
max_wait_checkfrequency = min_wait_checkfrequency
end

@max_wait_check = Concurrent::Channel.tick(max_wait_checkfrequency)
loop do
Concurrent::Channel.select do |s|
s.take(@stop) {
return
}
s.take(@entries) { |e|
if !add_entry_to_batch(e)
@logger.debug("Max batch_size is reached. Sending batch to loki")
send(@batch)
@batch = Batch.new(e)
end
}
s.take(@max_wait_check) {
# send batch if max wait time has been reached
if is_batch_expired
@logger.debug("Max batch_wait time is reached. Sending batch to loki")
send(@batch)
@batch = nil
end
}
end
end
end

# Add an entry to the current batch returns false if the batch is full
# and the entry can't be added.
def add_entry_to_batch(e)
Expand Down Expand Up @@ -192,8 +203,11 @@ def receive(event)

def close
@entries.close
@max_wait_check.close if !@max_wait_check.nil?
@stop << true # stop will block until it's accepted by the worker.
@mutex.synchronize do
@stop = true
end
@batch_wait_thread.join
@batch_size_thread.join

# if by any chance we still have a forming batch, we need to send it.
send(@batch) if !@batch.nil?
Expand Down
3 changes: 1 addition & 2 deletions cmd/logstash/logstash-output-loki.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-loki'
s.version = '1.0.2'
s.version = '1.0.3'
s.authors = ['Aditya C S','Cyril Tovena']
s.email = ['aditya.gnu@gmail.com','cyril.tovena@grafana.com']

Expand All @@ -22,6 +22,5 @@ Gem::Specification.new do |s|
#
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency "logstash-codec-plain", "3.0.6"
s.add_runtime_dependency "concurrent-ruby-edge", "0.6.0"
s.add_development_dependency 'logstash-devutils', "2.0.2"
end
17 changes: 9 additions & 8 deletions cmd/logstash/spec/outputs/loki_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,43 +95,44 @@
it 'should send entry if batch size reached with no tenant' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5,'batch_size'=>10}))
loki.register
sent = Concurrent::Channel.new(capacity: 3)
sent = Queue.new
allow(loki).to receive(:send) do |batch|
Thread.new do
sent << batch
end
end
loki.receive(event)
loki.receive(event)
sent.deq
sent.deq
loki.close
~sent
~sent
end
it 'should send entry while closing' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>10,'batch_size'=>10}))
loki.register
sent = Concurrent::Channel.new(capacity: 3)
sent = Queue.new
allow(loki).to receive(:send) do | batch|
Thread.new do
sent << batch
end
end
loki.receive(event)
loki.close
~sent
sent.deq
end
it 'should send entry when batch is expiring' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5,'batch_size'=>10}))
loki.register
sent = Concurrent::Channel.new(capacity: 3)
sent = Queue.new
allow(loki).to receive(:send) do | batch|
Thread.new do
sent << batch
end
end
loki.receive(event)
~sent
expect(loki.batch).to be_nil
sent.deq
sleep(0.01) # Adding a minimal sleep. In few cases @batch=nil might happen after evaluating for nil
expect(loki.batch).to be_nil
loki.close
end
end
Expand Down