Skip to content

Commit

Permalink
differentiate between a crash and a normal pipeline completion
Browse files Browse the repository at this point in the history
A handy `pipelines.yml` to hack one pipeline into crashing while leaving
another long-lived pipeline running:

> ~~~ yaml
> - pipeline.id: uppy-one
>   config.string: |
>     input { heartbeat {} }
>     output { sink {} }
> - pipeline.id: crashy
>   config.string: |
>     input { heartbeat { } }
>     filter {
>       ruby {
>         init => "@poison = ::Class.new(::Exception) { def backtrace; throw(:boom); end; }"
>         code => "fail @poison.new"
>       }
>     }
>     output { sink {} }
>     output { stdout { codec => rubydebug } }
> ~~~
  • Loading branch information
yaauie committed Sep 6, 2024
1 parent 25c7262 commit 25083d6
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 2 deletions.
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ def pipeline_details(pipeline_id)
run_state = pipeline_state.synchronize do |sync_state|
case
when sync_state.loading? then PipelineIndicator::RunState::LOADING
when sync_state.crashed? then PipelineIndicator::RunState::TERMINATED
when sync_state.running? then PipelineIndicator::RunState::RUNNING
when sync_state.finished? then PipelineIndicator::RunState::FINISHED # must check before terminated
when sync_state.terminated? then PipelineIndicator::RunState::TERMINATED
when sync_state.finished? then PipelineIndicator::RunState::FINISHED
end
end

Expand Down
7 changes: 7 additions & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
@flushing = java.util.concurrent.atomic.AtomicBoolean.new(false)
@flushRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
@shutdownRequested = java.util.concurrent.atomic.AtomicBoolean.new(false)
@crash_detected = Concurrent::AtomicBoolean.new(false)
@outputs_registered = Concurrent::AtomicBoolean.new(false)

# @finished_execution signals that the pipeline thread has finished its execution
Expand Down Expand Up @@ -233,6 +234,10 @@ def stopped?
@running.false?
end

def crashed?
@crash_detected.true?
end

# register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins
# @param plugins [Array[Plugin]] the list of plugins to register
def register_plugins(plugins)
Expand Down Expand Up @@ -309,6 +314,7 @@ def start_workers
rescue => e
# WorkerLoop.run() catches all Java Exception class and re-throws as IllegalStateException with the
# original exception as the cause
@crash_detected.make_true
@logger.error(
"Pipeline worker error, the pipeline will be stopped",
default_logging_keys(:error => e.cause.message, :exception => e.cause.class, :backtrace => e.cause.backtrace)
Expand All @@ -323,6 +329,7 @@ def start_workers
begin
start_inputs
rescue => e
@crash_detected.make_true
# if there is any exception in starting inputs, make sure we shutdown workers.
# exception will already by logged in start_inputs
shutdown_workers
Expand Down
6 changes: 6 additions & 0 deletions logstash-core/lib/logstash/pipelines_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ def finished?
end
end

def crashed?
@lock.synchronize do
@pipeline&.crashed?
end
end

def running?
@lock.synchronize do
# not terminated and not loading
Expand Down
3 changes: 3 additions & 0 deletions logstash-core/spec/logstash/java_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ def flush(options)

# wait until there is no more worker thread since we have a single worker that should have died
wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey
expect(subject.crashed?).to be true

# at this point the input plugin should have been asked to stop
wait(5).for {dummyinput.stop?}.to be_truthy
Expand Down Expand Up @@ -437,6 +438,7 @@ def flush(options)

# wait until there is no more worker thread since we have a single worker that should have died
wait(5).for {subject.worker_threads.any?(&:alive?)}.to be_falsey
expect(subject.crashed?).to be true

# at this point the input plugin should have been asked to stop
wait(5).for {dummyinput.stop?}.to be_truthy
Expand Down Expand Up @@ -602,6 +604,7 @@ def flush(options)
expect(input).to receive(:do_close).once
pipeline.start
pipeline.shutdown
expect(pipeline.crashed?).to be false
end
end
end
Expand Down

0 comments on commit 25083d6

Please sign in to comment.