diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 6b09f4142f..75ce7f4b70 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -64,8 +64,8 @@ @RegisterRowMapper(LineageEventMapper.class) public interface OpenLineageDao extends BaseDao { - public String DEFAULT_SOURCE_NAME = "default"; - public String DEFAULT_NAMESPACE_OWNER = "anonymous"; + String DEFAULT_SOURCE_NAME = "default"; + String DEFAULT_NAMESPACE_OWNER = "anonymous"; @SqlUpdate( "INSERT INTO lineage_events (" @@ -370,7 +370,10 @@ private JobRow findParentJobRow( .findJobRowByRunUuid(uuid) .map( j -> { - String parentJobName = Utils.parseParentJobName(facet.getJob().getName()); + String parentJobName = + facet.getJob().getName().equals(event.getJob().getName()) + ? Utils.parseParentJobName(facet.getJob().getName()) + : facet.getJob().getName(); if (j.getNamespaceName().equals(facet.getJob().getNamespace()) && j.getName().equals(parentJobName)) { return j; @@ -432,6 +435,10 @@ private JobRow createParentJobRunRecord( PGobject inputs) { Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); Logger log = LoggerFactory.getLogger(OpenLineageDao.class); + String parentJobName = + facet.getJob().getName().equals(event.getJob().getName()) + ? Utils.parseParentJobName(facet.getJob().getName()) + : facet.getJob().getName(); JobRow newParentJobRow = createJobDao() .upsertJob( @@ -440,7 +447,7 @@ private JobRow createParentJobRunRecord( now, namespace.getUuid(), namespace.getName(), - Utils.parseParentJobName(facet.getJob().getName()), + parentJobName, null, jobContext.getUuid(), location, diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 3bf575c07a..4233ebe603 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -199,11 +199,127 @@ public void testOpenLineageJobHierarchyAirflowIntegration() String dagName = "the_dag"; RunEvent airflowTask1 = createAirflowRunEvent( - ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME); + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task1Name, + NAMESPACE_NAME); + + RunEvent airflowTask2 = + createAirflowRunEvent( + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task2Name, + NAMESPACE_NAME); + + CompletableFuture future = sendAllEvents(airflowTask1, airflowTask2); + future.get(5, TimeUnit.SECONDS); + + Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name); + assertThat(job) + .isNotNull() + .hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name)) + .hasFieldOrPropertyWithValue("parentJobName", dagName); + + Job parentJob = client.getJob(NAMESPACE_NAME, dagName); + assertThat(parentJob) + .isNotNull() + .hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName)) + .hasFieldOrPropertyWithValue("parentJobName", null); + List runsList = client.listRuns(NAMESPACE_NAME, dagName); + assertThat(runsList).isNotEmpty().hasSize(1); + } + + @Test + public void testOpenLineageJobHierarchyAirflowIntegrationWithDagNameWithDot() + throws ExecutionException, InterruptedException, TimeoutException { + OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/")); + ZonedDateTime startOfHour = + Instant.now() + .atZone(LineageTestUtils.LOCAL_ZONE) + .with(ChronoField.MINUTE_OF_HOUR, 0) + .with(ChronoField.SECOND_OF_MINUTE, 0); + ZonedDateTime endOfHour = startOfHour.plusHours(1); + String airflowParentRunId = UUID.randomUUID().toString(); + String task1Name = "task1"; + String task2Name = "task2"; + String dagName = "the.dag"; + RunEvent airflowTask1 = + createAirflowRunEvent( + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task1Name, + NAMESPACE_NAME); + + RunEvent airflowTask2 = + createAirflowRunEvent( + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task2Name, + NAMESPACE_NAME); + + CompletableFuture future = sendAllEvents(airflowTask1, airflowTask2); + future.get(5, TimeUnit.SECONDS); + + Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name); + assertThat(job) + .isNotNull() + .hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name)) + .hasFieldOrPropertyWithValue("parentJobName", dagName); + + Job parentJob = client.getJob(NAMESPACE_NAME, dagName); + assertThat(parentJob) + .isNotNull() + .hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName)) + .hasFieldOrPropertyWithValue("parentJobName", null); + List runsList = client.listRuns(NAMESPACE_NAME, dagName); + assertThat(runsList).isNotEmpty().hasSize(1); + } + + @Test + public void testOpenLineageJobHierarchyAirflowIntegrationWithTaskGroup() + throws ExecutionException, InterruptedException, TimeoutException { + OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/")); + ZonedDateTime startOfHour = + Instant.now() + .atZone(LineageTestUtils.LOCAL_ZONE) + .with(ChronoField.MINUTE_OF_HOUR, 0) + .with(ChronoField.SECOND_OF_MINUTE, 0); + ZonedDateTime endOfHour = startOfHour.plusHours(1); + String airflowParentRunId = UUID.randomUUID().toString(); + String task1Name = "task_group.task1"; + String task2Name = "task_group.task2"; + String dagName = "dag_with_task_group"; + RunEvent airflowTask1 = + createAirflowRunEvent( + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task1Name, + NAMESPACE_NAME); RunEvent airflowTask2 = createAirflowRunEvent( - ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME); + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task2Name, + NAMESPACE_NAME); CompletableFuture future = sendAllEvents(airflowTask1, airflowTask2); future.get(5, TimeUnit.SECONDS); @@ -242,13 +358,27 @@ public void testOpenLineageJobHierarchyOldAirflowIntegration() String task1Name = "task1"; String task2Name = "task2"; String dagName = "the_dag"; + + // the old integration also used the fully qualified task name as the parent job name RunEvent airflowTask1 = createAirflowRunEvent( - ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME); + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName + "." + task1Name, + dagName + "." + task1Name, + NAMESPACE_NAME); RunEvent airflowTask2 = createAirflowRunEvent( - ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME); + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName + "." + task2Name, + dagName + "." + task2Name, + NAMESPACE_NAME); CompletableFuture future = sendAllEvents(airflowTask1, airflowTask2); future.get(5, TimeUnit.SECONDS); @@ -291,12 +421,24 @@ public void testOpenLineageJobHierarchyAirflowIntegrationConflictingRunUuid() // two dag runs with different namespaces - should result in two distinct jobs RunEvent airflowTask1 = createAirflowRunEvent( - ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME); + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task1Name, + NAMESPACE_NAME); String secondNamespace = "another_namespace"; RunEvent airflowTask2 = createAirflowRunEvent( - ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, secondNamespace); + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task1Name, + secondNamespace); CompletableFuture future = sendAllEvents(airflowTask1, airflowTask2); future.get(5, TimeUnit.SECONDS); @@ -332,7 +474,13 @@ public void testOpenLineageJobHierarchySparkAndAirflow() String dagName = "the_dag"; RunEvent airflowTask1 = createAirflowRunEvent( - ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME); + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task1Name, + NAMESPACE_NAME); RunEvent sparkTask = createRunEvent( @@ -340,8 +488,8 @@ public void testOpenLineageJobHierarchySparkAndAirflow() startOfHour, endOfHour, airflowTask1.getRun().getRunId().toString(), - sparkTaskName, dagName + "." + task1Name, + dagName + "." + task1Name + "." + sparkTaskName, Optional.empty(), NAMESPACE_NAME); @@ -609,8 +757,8 @@ private RunEvent createAirflowRunEvent( ZonedDateTime startOfHour, ZonedDateTime endOfHour, String airflowParentRunId, - String taskName, String dagName, + String taskName, String namespace) { RunFacet airflowVersionFacet = ol.newRunFacet(); airflowVersionFacet @@ -622,8 +770,8 @@ private RunEvent createAirflowRunEvent( startOfHour, endOfHour, airflowParentRunId, - taskName, dagName, + taskName, Optional.of(airflowVersionFacet), namespace); } @@ -634,8 +782,8 @@ private RunEvent createRunEvent( ZonedDateTime startOfHour, ZonedDateTime endOfHour, String airflowParentRunId, - String taskName, String dagName, + String taskName, Optional airflowVersionFacet, String namespace) { // The Java SDK requires parent run ids to be a UUID, but the python SDK doesn't. In order to @@ -650,7 +798,7 @@ private RunEvent createRunEvent( "run", ImmutableMap.of("runId", airflowParentRunId), "job", - ImmutableMap.of("namespace", namespace, "name", dagName + "." + taskName))); + ImmutableMap.of("namespace", namespace, "name", dagName))); RunFacetsBuilder runFacetBuilder = ol.newRunFacetsBuilder() .nominalTime(ol.newNominalTimeRunFacet(startOfHour, endOfHour)) @@ -663,7 +811,7 @@ private RunEvent createRunEvent( .job( ol.newJob( namespace, - dagName + "." + taskName, + taskName, ol.newJobFacetsBuilder() .documentation(ol.newDocumentationJobFacet("the job docs")) .sql(ol.newSQLJobFacet("SELECT * FROM the_table"))