diff --git a/lib/miga/cli/action/doctor/base.rb b/lib/miga/cli/action/doctor/base.rb index b1fb2e1..b15edec 100644 --- a/lib/miga/cli/action/doctor/base.rb +++ b/lib/miga/cli/action/doctor/base.rb @@ -37,11 +37,12 @@ def check_dist_fix(cli, p, fix) return if fix.empty? cli.say("- Fixing #{fix.size} datasets") - MiGA::Parallel.distribute(fix, cli[:threads]) do |d_n, idx, thr| + o = MiGA::Parallel.distribute(fix, cli[:threads]) do |d_n, idx, thr| cli.advance(' > Fixing', idx + 1, fix.size, false) if thr == 0 p.dataset(d_n).cleanup_distances! end cli.say + MiGA::Parallel.assess_success(o) end ## diff --git a/lib/miga/cli/action/doctor/distances.rb b/lib/miga/cli/action/doctor/distances.rb index 6ef0b58..598a17c 100644 --- a/lib/miga/cli/action/doctor/distances.rb +++ b/lib/miga/cli/action/doctor/distances.rb @@ -103,7 +103,7 @@ def partial_bidir_tmp(project, ref_ds) # Read data (threaded) FileUtils.mkdir_p(tmp) chunks_e = 0 .. chunks - 1 - MiGA::Parallel.distribute(chunks_e, cli[:threads]) do |chunk, k, thr| + o = MiGA::Parallel.distribute(chunks_e, cli[:threads]) do |chunk, k, thr| cli.advance('Reading:', k, chunks, false) if thr == 0 dist = {} [:aai, :ani].each do |metric| @@ -121,6 +121,7 @@ def partial_bidir_tmp(project, ref_ds) end cli.advance('Reading:', chunks, chunks, false) cli.say + MiGA::Parallel.assess_success(o) # Save information to indicate that the run is complete and return File.open(tmp_chunks, 'w') { |fh| fh.puts(chunks, n) } @@ -135,12 +136,12 @@ def merge_bidir_tmp(tmp) tmp_done = File.join(tmp, 'chunks.txt') chunks = File.readlines(tmp_done)[0].chomp.to_i - lower_triangle = [] + lower_tr = [] chunks.times.each do |i| - (0 .. i).to_a.each { |j| lower_triangle << [i, j] } + (0 .. i).to_a.each { |j| lower_tr << [i, j] } end - MiGA::Parallel.distribute(lower_triangle, cli[:threads]) do |cell, k, thr| - cli.advance('Writing:', k, lower_triangle.size, false) if thr == 0 + o = MiGA::Parallel.distribute(lower_tr, cli[:threads]) do |cell, k, thr| + cli.advance('Writing:', k, lower_tr.size, false) if thr == 0 done_f = File.join(tmp, "#{cell[0]}-#{cell[1]}.txt") next if File.exist?(done_f) @@ -148,9 +149,11 @@ def merge_bidir_tmp(tmp) File.open("#{done_f}.tmp", 'w') { |fh| fixed_ds.each { |ds| fh.puts ds } } File.rename("#{done_f}.tmp", done_f) end - cli.advance('Writing:', lower_triangle.size, lower_triangle.size, false) + cli.advance('Writing:', lower_tr.size, lower_tr.size, false) cli.say - lower_triangle.map do |cell| + MiGA::Parallel.assess_success(o) + + lower_tr.map do |cell| Set.new.tap do |y| file = File.join(tmp, "#{cell[0]}-#{cell[1]}.txt") raise MiGA::Error.new( diff --git a/lib/miga/common.rb b/lib/miga/common.rb index 1e139a3..144de7c 100644 --- a/lib/miga/common.rb +++ b/lib/miga/common.rb @@ -106,13 +106,13 @@ def advance(step, n = 0, total = nil, bin = true) '' else left_time = @_advance_time[:avg] * (total - n) / 60 # <- in minutes - left_time < 0.01 ? ' ' : - left_time < 1 ? ('%.0fs left' % (left_time * 60)) : + left_time < 0.01 ? '' : + left_time < 1 ? ('%.0fs left' % (left_time * 60)) : left_time > 1440 ? ('%.1fd left' % (left_time / 1440)) : - left_time > 60 ? ('%.1fh left' % (left_time / 60)) : + left_time > 60 ? ('%.1fh left' % (left_time / 60)) : ('%.1fm left' % left_time) end - $stderr.print("[%s] %s %s %s \r" % [Time.now, step, adv, left]) + $stderr.print("[%s] %s %s %-12s \r" % [Time.now, step, adv, left]) end ## diff --git a/lib/miga/parallel.rb b/lib/miga/parallel.rb index 809f150..d167d5d 100644 --- a/lib/miga/parallel.rb +++ b/lib/miga/parallel.rb @@ -8,10 +8,9 @@ class << self # Executes the passed block with the thread number as argument (0-numbered) # in +threads+ processes def process(threads) - threads.times do |i| - Process.fork { yield(i) } - end - Process.waitall + threads.times + .map { |i| Process.fork { yield(i) } } + .map { |pid| Process.waitpid2(pid) } end ## @@ -21,7 +20,6 @@ def process(threads) # 3. Index of the acting thread def distribute(enum, threads, &blk) process(threads) { |thr| thread_enum(enum, threads, thr, &blk) } - Process.waitall # <- Just to double-check, but `process` should suffice end ## @@ -33,5 +31,23 @@ def thread_enum(enum, threads, thr) yield(obj, idx, thr) if idx % threads == thr end end + + ## + # Assesses the success of all thread exit codes and raises an error if + # any of the children status in +status+ failed. It can be used as: + # + # status = MiGA::Parallel.process(3) { |i| 1/i } + # MiGA::Parallel.assess_success(status) + # + # Or in conjunction with +MiGA::Parallel.distribute+ + def assess_success(status) + failed = status.map { |i| i[1].success? ? 0 : 1 }.inject(:+) + return if failed.zero? + + raise MiGA::Error.new( + "Child threads failed: #{failed}/#{status.size}. " \ + "Maximum exit status: #{status.map { |i| i[1].exitstatus || 0 }.max}" + ) + end end end diff --git a/lib/miga/version.rb b/lib/miga/version.rb index e1636b0..b98e4ec 100644 --- a/lib/miga/version.rb +++ b/lib/miga/version.rb @@ -12,7 +12,7 @@ module MiGA # - String indicating release status: # - rc* release candidate, not released as gem # - [0-9]+ stable release, released as gem - VERSION = [1.3, 20, 10].freeze + VERSION = [1.3, 20, 11].freeze ## # Nickname for the current major.minor version. @@ -20,7 +20,7 @@ module MiGA ## # Date of the current gem relese. - VERSION_DATE = Date.new(2024, 9, 12) + VERSION_DATE = Date.new(2024, 9, 13) ## # References of MiGA diff --git a/test/parallel_test.rb b/test/parallel_test.rb index 21b6275..ab7eaad 100644 --- a/test/parallel_test.rb +++ b/test/parallel_test.rb @@ -19,6 +19,18 @@ def test_distribute assert_equal([0, 0, 1, 1], t.sort) end + def test_assess_success + declare_forks + + o = MiGA::Parallel.process(1) { |i| i / 2 } + assert(o.all? { |i| i[1].success? }) + assert_nil(MiGA::Parallel.assess_success(o)) + + o = MiGA::Parallel.process(1) { |i| $stderr = StringIO.new ; 1 / i } + assert_equal(1, o.map { |i| i[1].success? ? 0 : 1 }.inject(:+)) + assert_raise { MiGA::Parallel.assess_success(o) } + end + def test_thread_enum MiGA::Parallel.thread_enum(%w[a b c d], 3, 1) do |o, _k, _t| assert_equal('b', o)