diff --git a/tests/converter/test_workflow_xml_parser.py b/tests/converter/test_workflow_xml_parser.py index 8708638a8..9666f6a56 100644 --- a/tests/converter/test_workflow_xml_parser.py +++ b/tests/converter/test_workflow_xml_parser.py @@ -14,7 +14,7 @@ # limitations under the License. """Tests workflow xml parser""" from os import path -from typing import NamedTuple, Set, Dict +from typing import Dict, List, NamedTuple, Optional import unittest from unittest import mock @@ -311,9 +311,14 @@ def test_parse_node_decision(self, decision_mock): decision_mock.assert_called_once_with(decision) -class WorkflowTestCase(NamedTuple): +class NodeExpectedResult(NamedTuple): + downstream_names: List[str] + error_xml: Optional[str] = None + + +class WorkflowExpectedResult(NamedTuple): name: str - node_names: Set[str] + nodes: Dict[str, NodeExpectedResult] job_properties: Dict[str, str] config: Dict[str, str] @@ -322,112 +327,164 @@ class TestOozieExamples(unittest.TestCase): @parameterized.expand( [ ( - WorkflowTestCase( + WorkflowExpectedResult( name="decision", - node_names={"start_node_1234", "decision-node", "first", "end", "kill"}, + nodes={ + "decision-node": NodeExpectedResult(downstream_names=["first", "end", "kill"]), + "kill": NodeExpectedResult(downstream_names=[]), + "end": NodeExpectedResult(downstream_names=[]), + "start_node_1234": NodeExpectedResult(downstream_names=["decision-node"]), + "first": NodeExpectedResult(downstream_names=["end"], error_xml="end"), + }, job_properties={"nameNode": "hdfs://"}, config={}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="demo", - node_names={ - "start_node_1234", - "fork-node", - "pig-node", - "subworkflow-node", - "shell-node", - "join-node", - "decision-node", - "hdfs-node", - "end", - "fail", + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["fork-node"]), + "fork-node": NodeExpectedResult( + downstream_names=["pig-node", "subworkflow-node", "shell-node"] + ), + "pig-node": NodeExpectedResult(downstream_names=["join-node"], error_xml="fail"), + "subworkflow-node": NodeExpectedResult( + downstream_names=["join-node"], error_xml="fail" + ), + "shell-node": NodeExpectedResult(downstream_names=["join-node"], error_xml="fail"), + "join-node": NodeExpectedResult(downstream_names=["decision-node"]), + "decision-node": NodeExpectedResult(downstream_names=["hdfs-node", "end"]), + "hdfs-node": NodeExpectedResult(downstream_names=["end"], error_xml="fail"), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), }, job_properties={"nameNode": "hdfs://"}, config={}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="el", - node_names={"start_node_1234", "ssh", "end", "fail"}, + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["ssh"]), + "ssh": NodeExpectedResult(downstream_names=["end"], error_xml="fail"), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), + }, job_properties={"hostname": "user@BBB", "nameNode": "hdfs://"}, config={}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="fs", - node_names={ - "start_node_1234", - "end", - "fail", - "chmod", - "mkdir", - "fs-node", - "delete", - "move", - "touchz", - "chgrp", - "join", + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["fs-node"]), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), + "chmod": NodeExpectedResult(downstream_names=["join"], error_xml="fail"), + "mkdir": NodeExpectedResult(downstream_names=["join"], error_xml="fail"), + "delete": NodeExpectedResult(downstream_names=["join"], error_xml="fail"), + "move": NodeExpectedResult(downstream_names=["join"], error_xml="fail"), + "touchz": NodeExpectedResult(downstream_names=["join"], error_xml="fail"), + "chgrp": NodeExpectedResult(downstream_names=["join"], error_xml="fail"), + "join": NodeExpectedResult(downstream_names=["end"]), + "fs-node": NodeExpectedResult( + downstream_names=["mkdir", "delete", "move", "chmod", "touchz", "chgrp"] + ), }, job_properties={"hostname": "user@BBB", "nameNode": "hdfs://localhost:8020/"}, config={}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="mapreduce", - node_names={"start_node_1234", "end", "fail", "mr-node"}, + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["mr-node"]), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), + "mr-node": NodeExpectedResult(downstream_names=["end"], error_xml="fail"), + }, job_properties={"dataproc_cluster": "A", "nameNode": "hdfs://"}, config={"gcp_region": "B"}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="pig", - node_names={"start_node_1234", "end", "fail", "pig-node"}, + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["pig-node"]), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), + "pig-node": NodeExpectedResult(downstream_names=["end"], error_xml="fail"), + }, job_properties={"oozie.wf.application.path": "hdfs://", "nameNode": "hdfs://"}, config={}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="shell", - node_names={"start_node_1234", "end", "fail", "shell-node"}, + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["shell-node"]), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), + "shell-node": NodeExpectedResult(downstream_names=["end"], error_xml="fail"), + }, job_properties={"nameNode": "hdfs://"}, config={}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="spark", - node_names={"start_node_1234", "end", "fail", "spark-node"}, + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["spark-node"]), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), + "spark-node": NodeExpectedResult(downstream_names=["end"], error_xml="fail"), + }, job_properties={"nameNode": "hdfs://"}, config={"dataproc_cluster": "A", "gcp_region": "B"}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="ssh", - node_names={"start_node_1234", "end", "fail", "ssh"}, + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["ssh"]), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), + "ssh": NodeExpectedResult(downstream_names=["end"], error_xml="fail"), + }, job_properties={"hostname": "user@BBB", "nameNode": "hdfs://"}, config={}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="subwf", - node_names={"start_node_1234", "end", "fail", "subworkflow-node"}, + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["subworkflow-node"]), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), + "subworkflow-node": NodeExpectedResult(downstream_names=["end"], error_xml="fail"), + }, job_properties={}, config={}, ), ), ( - WorkflowTestCase( + WorkflowExpectedResult( name="distcp", - node_names={"start_node_1234", "end", "fail", "distcp-node"}, + nodes={ + "start_node_1234": NodeExpectedResult(downstream_names=["distcp-node"]), + "end": NodeExpectedResult(downstream_names=[]), + "fail": NodeExpectedResult(downstream_names=[]), + "distcp-node": NodeExpectedResult(downstream_names=["end"], error_xml="fail"), + }, job_properties={ "hostname": "AAAA@BBB", "nameNode": "hdfs://", @@ -441,7 +498,7 @@ class TestOozieExamples(unittest.TestCase): name_func=lambda func, num, p: f"{func.__name__}_{num}_{p.args[0].name}", ) @mock.patch("uuid.uuid4", return_value="1234") - def test_parse_workflow_examples(self, case: WorkflowTestCase, _): + def test_parse_workflow_examples(self, case: WorkflowExpectedResult, _): workflow = Workflow( input_directory_path=path.join(EXAMPLES_PATH, case.name), output_directory_path="/tmp", @@ -453,6 +510,11 @@ def test_parse_workflow_examples(self, case: WorkflowTestCase, _): action_mapper=ACTION_MAP, renderer=mock.MagicMock(), ) + current_parser.parse_workflow() - self.assertEqual(case.node_names, set(current_parser.workflow.nodes.keys())) self.assertEqual(set(), current_parser.workflow.relations) + self.assertEqual(case.nodes.keys(), set(current_parser.workflow.nodes.keys())) + for node_name, expected_node in case.nodes.items(): + node = workflow.nodes[node_name] + self.assertEqual(expected_node.downstream_names, node.downstream_names) + self.assertEqual(expected_node.error_xml, node.error_xml)