Skip to content

Commit

Permalink
Logstash cpu usage fix (#2607)
Browse files Browse the repository at this point in the history
* Replace concurrent-ruby-edge with agent lib

* Fix logstash cpu consumption

* increment gemspec version

* increment gemspec version

* fix tests
  • Loading branch information
adityacs authored Oct 12, 2020
1 parent 69f2eb2 commit 772b7ff
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 51 deletions.
96 changes: 55 additions & 41 deletions cmd/logstash/lib/logstash/outputs/loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
require "logstash/outputs/loki/batch"
require "logstash/namespace"
require 'net/http'
require 'concurrent-edge'
require 'time'
require 'uri'
require 'json'
Expand Down Expand Up @@ -68,10 +67,10 @@ def register

@logger.info("Loki output plugin", :class => self.class.name)

# initialize channels
@Channel = Concurrent::Channel
@entries = @Channel.new
@stop = @Channel.new
# initialize Queue and Mutex
@entries = Queue.new
@mutex = Mutex.new
@stop = false

# create nil batch object.
@batch = nil
Expand All @@ -82,7 +81,52 @@ def register
validate_ssl_key
end

@Channel.go{run()}
# start batch_max_wait and batch_max_size threads
@batch_wait_thread = Thread.new{max_batch_wait()}
@batch_size_thread = Thread.new{max_batch_size()}
end

def max_batch_size
loop do
@mutex.synchronize do
return if @stop
end

e = @entries.deq
return if e.nil?

@mutex.synchronize do
if !add_entry_to_batch(e)
@logger.debug("Max batch_size is reached. Sending batch to loki")
send(@batch)
@batch = Batch.new(e)
end
end
end
end

def max_batch_wait
# minimum wait frequency is 10 milliseconds
min_wait_checkfrequency = 1/100
max_wait_checkfrequency = @batch_wait
if max_wait_checkfrequency < min_wait_checkfrequency
max_wait_checkfrequency = min_wait_checkfrequency
end

loop do
@mutex.synchronize do
return if @stop
end

sleep(max_wait_checkfrequency)
if is_batch_expired
@mutex.synchronize do
@logger.debug("Max batch_wait time is reached. Sending batch to loki")
send(@batch)
@batch = nil
end
end
end
end

def ssl_cert?
Expand Down Expand Up @@ -128,39 +172,6 @@ def ssl_opts(uri)
opts
end

def run()
# minimum wait frequency is 1 millisecond
min_wait_checkfrequency = 1/100
max_wait_checkfrequency = @batch_wait / 10
if max_wait_checkfrequency < min_wait_checkfrequency
max_wait_checkfrequency = min_wait_checkfrequency
end

@max_wait_check = Concurrent::Channel.tick(max_wait_checkfrequency)
loop do
Concurrent::Channel.select do |s|
s.take(@stop) {
return
}
s.take(@entries) { |e|
if !add_entry_to_batch(e)
@logger.debug("Max batch_size is reached. Sending batch to loki")
send(@batch)
@batch = Batch.new(e)
end
}
s.take(@max_wait_check) {
# send batch if max wait time has been reached
if is_batch_expired
@logger.debug("Max batch_wait time is reached. Sending batch to loki")
send(@batch)
@batch = nil
end
}
end
end
end

# Add an entry to the current batch returns false if the batch is full
# and the entry can't be added.
def add_entry_to_batch(e)
Expand Down Expand Up @@ -192,8 +203,11 @@ def receive(event)

def close
@entries.close
@max_wait_check.close if !@max_wait_check.nil?
@stop << true # stop will block until it's accepted by the worker.
@mutex.synchronize do
@stop = true
end
@batch_wait_thread.join
@batch_size_thread.join

# if by any chance we still have a forming batch, we need to send it.
send(@batch) if !@batch.nil?
Expand Down
3 changes: 1 addition & 2 deletions cmd/logstash/logstash-output-loki.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-loki'
s.version = '1.0.2'
s.version = '1.0.3'
s.authors = ['Aditya C S','Cyril Tovena']
s.email = ['aditya.gnu@gmail.com','cyril.tovena@grafana.com']

Expand All @@ -22,6 +22,5 @@ Gem::Specification.new do |s|
#
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency "logstash-codec-plain", "3.0.6"
s.add_runtime_dependency "concurrent-ruby-edge", "0.6.0"
s.add_development_dependency 'logstash-devutils', "2.0.2"
end
17 changes: 9 additions & 8 deletions cmd/logstash/spec/outputs/loki_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,43 +95,44 @@
it 'should send entry if batch size reached with no tenant' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5,'batch_size'=>10}))
loki.register
sent = Concurrent::Channel.new(capacity: 3)
sent = Queue.new
allow(loki).to receive(:send) do |batch|
Thread.new do
sent << batch
end
end
loki.receive(event)
loki.receive(event)
sent.deq
sent.deq
loki.close
~sent
~sent
end
it 'should send entry while closing' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>10,'batch_size'=>10}))
loki.register
sent = Concurrent::Channel.new(capacity: 3)
sent = Queue.new
allow(loki).to receive(:send) do | batch|
Thread.new do
sent << batch
end
end
loki.receive(event)
loki.close
~sent
sent.deq
end
it 'should send entry when batch is expiring' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5,'batch_size'=>10}))
loki.register
sent = Concurrent::Channel.new(capacity: 3)
sent = Queue.new
allow(loki).to receive(:send) do | batch|
Thread.new do
sent << batch
end
end
loki.receive(event)
~sent
expect(loki.batch).to be_nil
sent.deq
sleep(0.01) # Adding a minimal sleep. In few cases @batch=nil might happen after evaluating for nil
expect(loki.batch).to be_nil
loki.close
end
end
Expand Down

0 comments on commit 772b7ff

Please sign in to comment.