diff --git a/src/script/hydra-eval-jobset b/src/script/hydra-eval-jobset index 793101654..4c5cbf742 100755 --- a/src/script/hydra-eval-jobset +++ b/src/script/hydra-eval-jobset @@ -17,6 +17,7 @@ use Hydra::Helper::Nix; use Hydra::Model::DB; use Hydra::Plugin; use Hydra::Schema; +use IPC::Run; use JSON::MaybeXS; use Net::Statsd; use Nix::Store; @@ -383,23 +384,33 @@ sub evalJobs { print STDERR "evaluator: @escaped\n"; } - (my $res, my $jobsJSONLines, my $stderr) = captureStdoutStderr(21600, @cmd); - die "nix-eval-jobs returned " . ($res & 127 ? "signal $res" : "exit code " . ($res >> 8)) - . ":\n" . ($stderr ? decode("utf-8", $stderr) : "(no output)\n") - if $res; - - print STDERR "$stderr"; + my $h = IPC::Run::start \@cmd, + '>', IPC::Run::new_chunker, \my $out, + '2>', \my $err; + + return sub { + while (1) { + $h->pump; + if (!defined $out && !defined $err) { + $h->finish; + if ($?) { + die "nix-eval-jobs returned " . ($? & 127 ? "signal $?" : "exit code " . ($? >> 8)) . "\n"; + } + return; + } - # XXX: take advantage of nix-eval-jobs's streaming instead of parsing everything in one block at - # the end. - my @jobs; - foreach my $line (split(/\n/, $jobsJSONLines)) { - last if $line eq ""; + if (defined $err) { + print STDERR "$err"; + undef $err; + } - push(@jobs, decode_json($line)); + if (defined $out && $out ne '') { + my $job = decode_json($out); + undef $out; + return $job; + } + } }; - - return @jobs; } @@ -716,17 +727,11 @@ sub checkJobsetWrapped { # Evaluate the job expression. my $evalStart = clock_gettime(CLOCK_MONOTONIC); - my @jobs = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef); - my $evalStop = clock_gettime(CLOCK_MONOTONIC); - - if ($jobsetsJobset) { - die "The .jobsets jobset must only have a single job named 'jobsets'" - unless (scalar @jobs) == 1 && $jobs[0]->{attr} eq "jobsets"; - } - Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000)); + my $evalStop; + my $jobsIter = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef); if ($dryRun) { - foreach my $job (@jobs) { + while (defined(my $job = $jobsIter->())) { my $name = $job->{attr}; if (defined $job->{drvPath}) { print STDERR "good job $name: $job->{drvPath}\n"; @@ -737,31 +742,20 @@ sub checkJobsetWrapped { return; } - my $jobOutPathMap = {}; - my $jobsetChanged = 0; - my $dbStart = clock_gettime(CLOCK_MONOTONIC); - - # Store the error messages for jobs that failed to evaluate. my $evaluationErrorTime = time; my $evaluationErrorMsg = ""; - foreach my $job (@jobs) { - next unless defined $job->{error}; - $evaluationErrorMsg .= - ($job->{attr} ne "" ? "in job ‘$job->{attr}’" : "at top-level") . - ":\n" . $job->{error} . "\n\n"; - } - setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime); - my $evaluationErrorRecord = $db->resultset('EvaluationErrors')->create( { errormsg => $evaluationErrorMsg , errortime => $evaluationErrorTime } ); + my $jobOutPathMap = {}; + my $jobsetChanged = 0; my %buildMap; - $db->txn_do(sub { + $db->txn_do(sub { my $prevEval = getPrevJobsetEval($db, $jobset, 1); # Clear the "current" flag on all builds. Since we're in a @@ -774,7 +768,7 @@ sub checkJobsetWrapped { , evaluationerror => $evaluationErrorRecord , timestamp => time , checkouttime => abs(int($checkoutStop - $checkoutStart)) - , evaltime => abs(int($evalStop - $evalStart)) + , evaltime => 0 , hasnewbuilds => 0 , nrbuilds => 0 , flake => $flakeRef @@ -782,11 +776,18 @@ sub checkJobsetWrapped { , nixexprpath => $jobset->nixexprpath }); - # Schedule each successfully evaluated job. - foreach my $job (permute(@jobs)) { - next if defined $job->{error}; - #print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n"; - checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins); + while (defined(my $job = $jobsIter->())) { + if ($jobsetsJobset) { + die "The .jobsets jobset must only have a single job named 'jobsets'" + unless $job->{attr} eq "jobsets"; + } + + $evaluationErrorMsg .= + ($job->{attr} ne "" ? "in job ‘$job->{attr}’" : "at top-level") . + ":\n" . $job->{error} . "\n\n" if defined $job->{error}; + + checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins) + unless defined $job->{error}; } # Have any builds been added or removed since last time? @@ -846,11 +847,15 @@ sub checkJobsetWrapped { $jobset->update({ enabled => 0 }) if $jobset->enabled == 2; $jobset->update({ lastcheckedtime => time, forceeval => undef }); - }); - my $dbStop = clock_gettime(CLOCK_MONOTONIC); + $evaluationErrorRecord->update({ errormsg => $evaluationErrorMsg }); + setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime); - Net::Statsd::timing("hydra.evaluator.db_time", int(($dbStop - $dbStart) * 1000)); + $evalStop = clock_gettime(CLOCK_MONOTONIC); + $ev->update({ evaltime => abs(int($evalStop - $evalStart)) }); + }); + + Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000)); Net::Statsd::increment("hydra.evaluator.evals"); Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged; }