diff --git a/lib/autoload/kuroko2/workflow/engine.rb b/lib/autoload/kuroko2/workflow/engine.rb index 9a9cb0d0..367780cd 100644 --- a/lib/autoload/kuroko2/workflow/engine.rb +++ b/lib/autoload/kuroko2/workflow/engine.rb @@ -1,6 +1,9 @@ module Kuroko2 module Workflow class Engine + DEFAULT_EXPECTED_TIME = 60 * 24 # 24 hours + EXPECTED_TIME_NOTIFY_REMIND_TERM = 1.hours + def process_all Token.processable.each do |token| process(token) @@ -114,6 +117,7 @@ def process_with_lock(token) node = extract_node(token) execute_task(node, token) + notify_long_elapsed_time_if_needed(token) rescue EngineError => e message = "#{e.message}\n" + e.backtrace.map { |trace| " #{trace}" }.join("\n") @@ -134,6 +138,32 @@ def extract_node(token) root = ScriptParser.new(token.script).parse(validate: false) root.find(token.path) end + + def expected_time(token) + token.context['EXPECTED_TIME'].present? ? + token.context['EXPECTED_TIME'].to_i : + DEFAULT_EXPECTED_TIME + end + + def available_notify_long_elapsed_time?(token) + return false if token.parent && expected_time(token) == expected_time(token.parent) + token.context['EXPECTED_TIME_NOTIFIED_AT'].nil? || Time.zone.parse(token.context['EXPECTED_TIME_NOTIFIED_AT']) < EXPECTED_TIME_NOTIFY_REMIND_TERM.ago + end + + def elapsed_expected_time?(token) + (token.created_at + expected_time(token).minutes).past? + end + + def notify_long_elapsed_time_if_needed(token) + if available_notify_long_elapsed_time?(token) && elapsed_expected_time?(token) + token.context['EXPECTED_TIME_NOTIFIED_AT'] = Time.current + Notifier.notify(:long_elapsed_time, token.job_instance) + + message = "(token #{token.uuid}) The running time is longer than #{expected_time(token)} minutes!" + token.job_instance.logs.info(message) + Kuroko2.logger.info(message) + end + end end end end diff --git a/lib/autoload/kuroko2/workflow/task/execute.rb b/lib/autoload/kuroko2/workflow/task/execute.rb index 8c6bee77..a81461f5 100644 --- a/lib/autoload/kuroko2/workflow/task/execute.rb +++ b/lib/autoload/kuroko2/workflow/task/execute.rb @@ -2,9 +2,6 @@ module Kuroko2 module Workflow module Task class Execute < Base - DEFAULT_EXPECTED_TIME = 60 * 24 # 24 hours - EXPECTED_TIME_NOTIFY_REMIND_TERM = 1.hours - def execute if (execution = Execution.of(token).take) update_execution(execution) @@ -70,7 +67,6 @@ def update_execution(execution) end else process_timeout_if_needed(execution) - notify_long_elapsed_time_if_needed(execution) :pass end end @@ -92,36 +88,6 @@ def process_timeout_if_needed(execution) end end end - - def expected_time - @expected_time ||= token.context['EXPECTED_TIME'].present? ? - token.context['EXPECTED_TIME'].to_i : - DEFAULT_EXPECTED_TIME - end - - def available_notify_long_elapsed_time?(execution) - if token.context['EXPECTED_TIME_NOTIFIED_AT'].present? - token.context['EXPECTED_TIME_NOTIFIED_AT'] < EXPECTED_TIME_NOTIFY_REMIND_TERM.ago && - execution.pid.present? - else - execution.pid.present? - end - end - - def elapsed_expected_time?(execution) - (execution.created_at + expected_time.minutes).past? - end - - def notify_long_elapsed_time_if_needed(execution) - if available_notify_long_elapsed_time?(execution) && elapsed_expected_time?(execution) - token.context['EXPECTED_TIME_NOTIFIED_AT'] = Time.current - Notifier.notify(:long_elapsed_time, token.job_instance) - - message = "(token #{token.uuid}) The running time is longer than #{expected_time} minutes!" - token.job_instance.logs.info(message) - Kuroko2.logger.info(message) - end - end end end end diff --git a/spec/workflow/engine_spec.rb b/spec/workflow/engine_spec.rb index 184bbe91..c7c6eaae 100644 --- a/spec/workflow/engine_spec.rb +++ b/spec/workflow/engine_spec.rb @@ -178,6 +178,109 @@ module Kuroko2::Workflow end end + context 'if passed EXPECTED_TIME' do + context 'without EXPECTED_TIME' do + let!(:definition) do + create(:job_definition_with_instances, script: <<-EOF.strip_heredoc) + noop: + EOF + end + + it 'notifies messages' do + expect(Kuroko2::Workflow::Notifier).to receive(:notify).with(:long_elapsed_time, token.job_instance) + Timecop.travel((24.hours + 1.second).since) { + subject.process(token) + expect(token.context['EXPECTED_TIME_NOTIFIED_AT']).to be_present + } + end + end + + context 'with EXPECTED_TIME' do + let!(:definition) do + create(:job_definition_with_instances, script: <<-EOF.strip_heredoc) + expected_time: 1m + noop: noop1 + noop: noop2 + noop: noop3 + noop: noop4 + EOF + end + + it 'notifies messages and wait notifing until EXPECTED_TIME_NOTIFY_REMIND_TERM' do + subject.process(token) + subject.process(token) + + expect(Kuroko2::Workflow::Notifier).to receive(:notify).with(:long_elapsed_time, token.job_instance).twice + + Timecop.travel((1.minutes + 1.second).since) { + subject.process(token) # notify + expect(token.context['EXPECTED_TIME_NOTIFIED_AT']).to be_present + subject.process(token) # do not notify until EXPECTED_TIME_NOTIFIED_AT + EXPECTED_TIME_NOTIFY_REMIND_TERM + + Timecop.travel((1.hours + 1.second).since) { + subject.process(token) # notify + } + } + end + end + + context 'with fork process' do + let(:engine) { Kuroko2::Workflow::Engine.new } + + context 'if expected_time sets root only' do + let!(:definition) do + create(:job_definition_with_instances, script: <<-EOF.strip_heredoc) + fork: + noop: noop_fork1 + noop: noop_fork2 + EOF + end + + it 'notifies once from the parent token only' do + expect(Kuroko2::Workflow::Notifier).to receive(:notify).with(:long_elapsed_time, token.job_instance).once + engine.process_all + engine.process_all + + Timecop.travel((24.hours + 1.second).since) { + engine.process_all + expect(token.reload.context['EXPECTED_TIME_NOTIFIED_AT']).to be_present + engine.process_all + } + end + end + + context 'if expected_time settings is different between root and children' do + let!(:definition) do + create(:job_definition_with_instances, script: <<-EOF.strip_heredoc) + expected_time: 1h + parallel_fork: 2 + expected_time: 1m + noop: noop_parallel_fork1 + noop: noop_parallel_fork2 + EOF + end + + it 'notifies from each tokens' do + expect(Kuroko2::Workflow::Notifier).to receive(:notify).with(:long_elapsed_time, token.job_instance).twice + engine.process_all + engine.process_all + engine.process_all + + Timecop.travel((1.minute + 1.second).since) { + engine.process_all + engine.process_all + engine.process_all + token.reload + expect(token.context['EXPECTED_TIME_NOTIFIED_AT']).not_to be_present + expect(token.children.map{|child| child.context['EXPECTED_TIME_NOTIFIED_AT']}).to all(be_present) + + engine.process_all + } + end + end + end + end + context 'retry' do let!(:definition) do create(:job_definition_with_instances, script: <<-EOF.strip_heredoc) diff --git a/spec/workflow/task/execute_spec.rb b/spec/workflow/task/execute_spec.rb index 912b7305..62e0e3cf 100644 --- a/spec/workflow/task/execute_spec.rb +++ b/spec/workflow/task/execute_spec.rb @@ -64,64 +64,5 @@ module Kuroko2::Workflow::Task expect { Execute.new(node, token).execute }.to change { Kuroko2::ProcessSignal.where(pid: execution.pid, hostname: hostname).count }.from(0).to(1) end end - - context 'if job passed EXPECTED_TIME' do - let(:shell) { 'sleep 5 && echo $NAME' } - - context 'Without EXPECTED_TIME_NOTIFIED_AT' do - around do |example| - Execute.new(node, token).execute - Kuroko2::Execution.of(token).take.update!(pid: 1) - - Timecop.travel((24.hours + 1.second).since) { example.run } - end - - it 'alerts warnings' do - expect(Kuroko2::Workflow::Notifier).to receive(:notify).with(:long_elapsed_time, token.job_instance) - - Execute.new(node, token).execute - is_expected.to eq :pass - Execute.new(node, token).execute - expect(token.context['EXPECTED_TIME_NOTIFIED_AT']).to be_present - end - end - - context 'With EXPECTED_TIME_NOTIFIED_AT' do - around do |example| - Execute.new(node, token).execute - Kuroko2::Execution.of(token).take.update!(pid: 1) - - Timecop.travel((24.hours + 1.second).since) do - token.context['EXPECTED_TIME_NOTIFIED_AT'] = notified_time - example.run - end - end - - context 'When EXPECTED_TIME_NOTIFIED_AT is now' do - let(:notified_time) { Time.current } - - it 'does not alert warnings' do - expect(Kuroko2::Workflow::Notifier).not_to receive(:notify) - Execute.new(node, token).execute - is_expected.to eq :pass - Execute.new(node, token).execute - expect(token.context['EXPECTED_TIME_NOTIFIED_AT']).to be_present - end - end - - context 'When EXPECTED_TIME_NOTIFIED_AT is 1 hours ago' do - let(:notified_time) { (1.hours + 1.second).ago } - - it 'alert warnings' do - expect(Kuroko2::Workflow::Notifier).to receive(:notify).with(:long_elapsed_time, token.job_instance) - - Execute.new(node, token).execute - is_expected.to eq :pass - Execute.new(node, token).execute - expect(token.context['EXPECTED_TIME_NOTIFIED_AT']).to be_present - end - end - end - end end end