Skip to content

Commit

Permalink
fix an engine bug from the field
Browse files Browse the repository at this point in the history
  • Loading branch information
TyberiusPrime committed Nov 22, 2024
1 parent 36df2d1 commit bd9559e
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 16 deletions.
1 change: 0 additions & 1 deletion .ppg/errors/latest

This file was deleted.

1 change: 0 additions & 1 deletion .ppg/logs/latest

This file was deleted.

1 change: 0 additions & 1 deletion .ppg/logs/runtimes.tsv

This file was deleted.

27 changes: 21 additions & 6 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ impl<T: PPGEvaluatorStrategy> PPGEvaluator<T> {
};
self.already_started = StartStatus::Running;

self.prune_leave_ephemerals();
self.prune_leaf_ephemerals();

// this is not particulary fast.
self.topo = Some(petgraph::algo::toposort(&self.dag, None).unwrap());
Expand All @@ -910,9 +910,9 @@ impl<T: PPGEvaluatorStrategy> PPGEvaluator<T> {
Ok(())
}

fn prune_leave_ephemerals(&mut self) {
fn prune_leaf_ephemerals(&mut self) {
//option: speed up by looking at the edges once,
//finding those that have no-no-ephemeral downstreams,
//finding those that have no non-ephemeral downstreams,
let mut ephemerals: HashSet<NodeIndex> = self
.dag
.nodes()
Expand Down Expand Up @@ -1015,7 +1015,7 @@ impl<T: PPGEvaluatorStrategy> PPGEvaluator<T> {
// (side-effect: if it changes in an output that's not used by any downstream,
// this would not trigger.
// It's an unlikely problem, and I currently don't see how to work around it.
// witohut turning the downstream_job_id into a magic value that means 'check all the
// without turning the downstream_job_id into a magic value that means 'check all the
// outputs (we need the upstream job to know what comparison to use)
//
let hist_key = &j.job_id;
Expand Down Expand Up @@ -2120,8 +2120,23 @@ impl<T: PPGEvaluatorStrategy> PPGEvaluator<T> {
} else {
match Self::downstream_requirement_status(dag, jobs, node_idx)? {
Required::Yes => {
// the Requirement will have been propagated upstream,
// do nothing
// //20241122, FF: this historical comment appears to be a lie:
// "the Requirement will have been propagated upstream,"
// We need to propagate teh requirement upstream
// python test case python/pypipegraph2/pypipegraph2.abi3.so
debug!(
"\tdownstream required {}, setting upstream edges to 'required'",
jobs[node_idx as usize].job_id
);
Self::set_upstream_edges(dag, node_idx, Required::Yes);
Self::reconsider_ephemeral_upstreams( // which is lazy.
dag,
jobs,
node_idx,
new_signals,
gen,
);

}
Required::Unknown => {
// let's try and update that in light of this node
Expand Down
55 changes: 48 additions & 7 deletions tests/tests_from_the_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def test_issue_20210726a(self, job_trace_log):
"""

job_0 = ppg.FileGeneratingJob("J0", dummy_fg, depend_on_function=False)
job_2 = ppg.DataLoadingJob("J2", lambda: None, depend_on_function=False)
job_3 = ppg.DataLoadingJob("J3", lambda: None, depend_on_function=False)
job_2 = ppg.DataLoadingJob("J2", lambda: 55, depend_on_function=False)
job_3 = ppg.DataLoadingJob("J3", lambda: 55, depend_on_function=False)
job_76 = ppg.FileGeneratingJob("J76", dummy_fg, depend_on_function=False)

edges = []
Expand All @@ -61,8 +61,8 @@ def test_issue_20210726a(self, job_trace_log):
def test_issue_20210726b(self, job_trace_log):
job_0 = ppg.FileGeneratingJob("0", dummy_fg, depend_on_function=False)
job_1 = ppg.FunctionInvariant("1", lambda: 55)
job_2 = ppg.DataLoadingJob("2", lambda: None, depend_on_function=False)
job_3 = ppg.DataLoadingJob("3", lambda: None, depend_on_function=False)
job_2 = ppg.DataLoadingJob("2", lambda: 55, depend_on_function=False)
job_3 = ppg.DataLoadingJob("3", lambda: 55, depend_on_function=False)
job_4 = ppg.FileGeneratingJob("4", dummy_fg, depend_on_function=False)
job_5 = ppg.SharedMultiFileGeneratingJob(
"5", ["url.txt"], dummy_smfg, depend_on_function=False
Expand Down Expand Up @@ -191,7 +191,7 @@ def test_issue_20210726b(self, job_trace_log):
job_77 = ppg.FunctionInvariant("77", lambda: 55)
job_78 = ppg.FunctionInvariant("78", lambda: 55)
job_79 = ppg.AttributeLoadingJob(
"79", DummyObject(), "attr_79", lambda: None, depend_on_function=False
"79", DummyObject(), "attr_79", lambda: 55, depend_on_function=False
)
job_80 = ppg.FileGeneratingJob("80", dummy_fg, depend_on_function=False)
job_81 = ppg.FileGeneratingJob("81", dummy_fg, depend_on_function=False)
Expand Down Expand Up @@ -1564,12 +1564,12 @@ def fail(): # fail on demand
if do_fail:
raise ValueError()

job_3 = ppg.DataLoadingJob("3", lambda: None, depend_on_function=False)
job_3 = ppg.DataLoadingJob("3", lambda: 55, depend_on_function=False)
job_48 = ppg.AttributeLoadingJob(
"48", DummyObject(), "attr_48", fail, depend_on_function=False
)
job_61 = ppg.FileGeneratingJob("61", dummy_fg, depend_on_function=False)
job_67 = ppg.JobGeneratingJob("67", lambda: None, depend_on_function=False)
job_67 = ppg.JobGeneratingJob("67", lambda: 55, depend_on_function=False)

edges = []

Expand Down Expand Up @@ -1633,6 +1633,18 @@ def test_20211221(self):
== ppg.enums.JobOutcome.Skipped
)

def test_20241121_evaluation_bug(self):
gen_20241121()
ppg.run()
ppg.new()
gen_20241121(True)
try:
ppg.run()
except KeyboardInterrupt as e:
assert 'Run aborted' in str(e)
raise ValueError("expected failure before engine patch")
# if we get here, everything's fine.

def test_cleanup_already_decided_to_skip_upstream_failed(self):
# debugged job CleanUp:cache/lanes/test/input.fastq
# this makes the TF run.
Expand Down Expand Up @@ -1871,3 +1883,32 @@ def test_pandas_hashing():
assert hf(df_a) != hf(df_f)
assert hf(df_a) != hf(df_g)
# assert hf(df_a) != hf(df_h)


def gen_20241121(change_edge=False):
job_DlA = ppg.DataLoadingJob("DlA", lambda: 35, depend_on_function=False)
job_FgB = ppg.FileGeneratingJob("FgB", dummy_fg, depend_on_function=False)

job_DlC = ppg.DataLoadingJob("DlC", lambda: 35, depend_on_function=False)
job_DlD = ppg.DataLoadingJob("DlD", lambda: 35, depend_on_function=False)

if change_edge:
job_FiE = ppg.FunctionInvariant("FiE", lambda: 56)
else:
job_FiE = ppg.FunctionInvariant("FiE", lambda: 55)

cjobs_by_no = {}
for k, v in locals().items():
if k.startswith("job_"):
no = k[k.find("_") + 1 :]
cjobs_by_no[no] = v
edges = []
ea = edges.append
ea(("DlC", "FiE"))
ea(("DlD", "DlA"))
ea(("DlC", "DlD"))
ea(("FgB", "DlC"))
for a, b in edges:
if a in cjobs_by_no and b in cjobs_by_no:
cjobs_by_no[a].depends_on(cjobs_by_no[b])
#print(f"ea(('{a}', '{b}'))")

0 comments on commit bd9559e

Please sign in to comment.