Skip to content

Commit

Permalink
Improve parallel execution security
Browse files Browse the repository at this point in the history
  • Loading branch information
lmrodriguezr committed Sep 13, 2024
1 parent 183fdab commit e7c9523
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 19 deletions.
3 changes: 2 additions & 1 deletion lib/miga/cli/action/doctor/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ def check_dist_fix(cli, p, fix)
return if fix.empty?

cli.say("- Fixing #{fix.size} datasets")
MiGA::Parallel.distribute(fix, cli[:threads]) do |d_n, idx, thr|
o = MiGA::Parallel.distribute(fix, cli[:threads]) do |d_n, idx, thr|
cli.advance(' > Fixing', idx + 1, fix.size, false) if thr == 0
p.dataset(d_n).cleanup_distances!
end
cli.say
MiGA::Parallel.assess_success(o)
end

##
Expand Down
17 changes: 10 additions & 7 deletions lib/miga/cli/action/doctor/distances.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def partial_bidir_tmp(project, ref_ds)
# Read data (threaded)
FileUtils.mkdir_p(tmp)
chunks_e = 0 .. chunks - 1
MiGA::Parallel.distribute(chunks_e, cli[:threads]) do |chunk, k, thr|
o = MiGA::Parallel.distribute(chunks_e, cli[:threads]) do |chunk, k, thr|
cli.advance('Reading:', k, chunks, false) if thr == 0
dist = {}
[:aai, :ani].each do |metric|
Expand All @@ -121,6 +121,7 @@ def partial_bidir_tmp(project, ref_ds)
end
cli.advance('Reading:', chunks, chunks, false)
cli.say
MiGA::Parallel.assess_success(o)

# Save information to indicate that the run is complete and return
File.open(tmp_chunks, 'w') { |fh| fh.puts(chunks, n) }
Expand All @@ -135,22 +136,24 @@ def merge_bidir_tmp(tmp)
tmp_done = File.join(tmp, 'chunks.txt')
chunks = File.readlines(tmp_done)[0].chomp.to_i

lower_triangle = []
lower_tr = []
chunks.times.each do |i|
(0 .. i).to_a.each { |j| lower_triangle << [i, j] }
(0 .. i).to_a.each { |j| lower_tr << [i, j] }
end
MiGA::Parallel.distribute(lower_triangle, cli[:threads]) do |cell, k, thr|
cli.advance('Writing:', k, lower_triangle.size, false) if thr == 0
o = MiGA::Parallel.distribute(lower_tr, cli[:threads]) do |cell, k, thr|
cli.advance('Writing:', k, lower_tr.size, false) if thr == 0
done_f = File.join(tmp, "#{cell[0]}-#{cell[1]}.txt")
next if File.exist?(done_f)

fixed_ds = merge_bidir_tmp_pair(tmp, cell[0], cell[1])
File.open("#{done_f}.tmp", 'w') { |fh| fixed_ds.each { |ds| fh.puts ds } }
File.rename("#{done_f}.tmp", done_f)
end
cli.advance('Writing:', lower_triangle.size, lower_triangle.size, false)
cli.advance('Writing:', lower_tr.size, lower_tr.size, false)
cli.say
lower_triangle.map do |cell|
MiGA::Parallel.assess_success(o)

lower_tr.map do |cell|
Set.new.tap do |y|
file = File.join(tmp, "#{cell[0]}-#{cell[1]}.txt")
raise MiGA::Error.new(
Expand Down
8 changes: 4 additions & 4 deletions lib/miga/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ def advance(step, n = 0, total = nil, bin = true)
''
else
left_time = @_advance_time[:avg] * (total - n) / 60 # <- in minutes
left_time < 0.01 ? ' ' :
left_time < 1 ? ('%.0fs left' % (left_time * 60)) :
left_time < 0.01 ? '' :
left_time < 1 ? ('%.0fs left' % (left_time * 60)) :
left_time > 1440 ? ('%.1fd left' % (left_time / 1440)) :
left_time > 60 ? ('%.1fh left' % (left_time / 60)) :
left_time > 60 ? ('%.1fh left' % (left_time / 60)) :
('%.1fm left' % left_time)
end
$stderr.print("[%s] %s %s %s \r" % [Time.now, step, adv, left])
$stderr.print("[%s] %s %s %-12s \r" % [Time.now, step, adv, left])
end

##
Expand Down
26 changes: 21 additions & 5 deletions lib/miga/parallel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ class << self
# Executes the passed block with the thread number as argument (0-numbered)
# in +threads+ processes
def process(threads)
threads.times do |i|
Process.fork { yield(i) }
end
Process.waitall
threads.times
.map { |i| Process.fork { yield(i) } }
.map { |pid| Process.waitpid2(pid) }
end

##
Expand All @@ -21,7 +20,6 @@ def process(threads)
# 3. Index of the acting thread
def distribute(enum, threads, &blk)
process(threads) { |thr| thread_enum(enum, threads, thr, &blk) }
Process.waitall # <- Just to double-check, but `process` should suffice
end

##
Expand All @@ -33,5 +31,23 @@ def thread_enum(enum, threads, thr)
yield(obj, idx, thr) if idx % threads == thr
end
end

##
# Assesses the success of all thread exit codes and raises an error if
# any of the children status in +status+ failed. It can be used as:
#
# status = MiGA::Parallel.process(3) { |i| 1/i }
# MiGA::Parallel.assess_success(status)
#
# Or in conjunction with +MiGA::Parallel.distribute+
def assess_success(status)
failed = status.map { |i| i[1].success? ? 0 : 1 }.inject(:+)
return if failed.zero?

raise MiGA::Error.new(
"Child threads failed: #{failed}/#{status.size}. " \
"Maximum exit status: #{status.map { |i| i[1].exitstatus || 0 }.max}"
)
end
end
end
4 changes: 2 additions & 2 deletions lib/miga/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ module MiGA
# - String indicating release status:
# - rc* release candidate, not released as gem
# - [0-9]+ stable release, released as gem
VERSION = [1.3, 20, 10].freeze
VERSION = [1.3, 20, 11].freeze

##
# Nickname for the current major.minor version.
VERSION_NAME = 'mezzotint'

##
# Date of the current gem relese.
VERSION_DATE = Date.new(2024, 9, 12)
VERSION_DATE = Date.new(2024, 9, 13)

##
# References of MiGA
Expand Down
12 changes: 12 additions & 0 deletions test/parallel_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ def test_distribute
assert_equal([0, 0, 1, 1], t.sort)
end

def test_assess_success
declare_forks

o = MiGA::Parallel.process(1) { |i| i / 2 }
assert(o.all? { |i| i[1].success? })
assert_nil(MiGA::Parallel.assess_success(o))

o = MiGA::Parallel.process(1) { |i| $stderr = StringIO.new ; 1 / i }
assert_equal(1, o.map { |i| i[1].success? ? 0 : 1 }.inject(:+))
assert_raise { MiGA::Parallel.assess_success(o) }
end

def test_thread_enum
MiGA::Parallel.thread_enum(%w[a b c d], 3, 1) do |o, _k, _t|
assert_equal('b', o)
Expand Down

0 comments on commit e7c9523

Please sign in to comment.