Skip to content

Commit

Permalink
Add fix and tests for handling Airflow dags with dots and task groups (
Browse files Browse the repository at this point in the history
…#2126)

Signed-off-by: Michael Collado <collado.mike@gmail.com>

Signed-off-by: Michael Collado <collado.mike@gmail.com>
Co-authored-by: Willy Lulciuc <willy@datakin.com>
  • Loading branch information
collado-mike and wslulciuc authored Sep 19, 2022
1 parent 9bb877d commit e61fe48
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 17 deletions.
15 changes: 11 additions & 4 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@

@RegisterRowMapper(LineageEventMapper.class)
public interface OpenLineageDao extends BaseDao {
public String DEFAULT_SOURCE_NAME = "default";
public String DEFAULT_NAMESPACE_OWNER = "anonymous";
String DEFAULT_SOURCE_NAME = "default";
String DEFAULT_NAMESPACE_OWNER = "anonymous";

@SqlUpdate(
"INSERT INTO lineage_events ("
Expand Down Expand Up @@ -370,7 +370,10 @@ private JobRow findParentJobRow(
.findJobRowByRunUuid(uuid)
.map(
j -> {
String parentJobName = Utils.parseParentJobName(facet.getJob().getName());
String parentJobName =
facet.getJob().getName().equals(event.getJob().getName())
? Utils.parseParentJobName(facet.getJob().getName())
: facet.getJob().getName();
if (j.getNamespaceName().equals(facet.getJob().getNamespace())
&& j.getName().equals(parentJobName)) {
return j;
Expand Down Expand Up @@ -432,6 +435,10 @@ private JobRow createParentJobRunRecord(
PGobject inputs) {
Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
String parentJobName =
facet.getJob().getName().equals(event.getJob().getName())
? Utils.parseParentJobName(facet.getJob().getName())
: facet.getJob().getName();
JobRow newParentJobRow =
createJobDao()
.upsertJob(
Expand All @@ -440,7 +447,7 @@ private JobRow createParentJobRunRecord(
now,
namespace.getUuid(),
namespace.getName(),
Utils.parseParentJobName(facet.getJob().getName()),
parentJobName,
null,
jobContext.getUuid(),
location,
Expand Down
174 changes: 161 additions & 13 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,127 @@ public void testOpenLineageJobHierarchyAirflowIntegration()
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task2Name,
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);

Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
assertThat(job)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
.hasFieldOrPropertyWithValue("parentJobName", dagName);

Job parentJob = client.getJob(NAMESPACE_NAME, dagName);
assertThat(parentJob)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
}

@Test
public void testOpenLineageJobHierarchyAirflowIntegrationWithDagNameWithDot()
throws ExecutionException, InterruptedException, TimeoutException {
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
ZonedDateTime startOfHour =
Instant.now()
.atZone(LineageTestUtils.LOCAL_ZONE)
.with(ChronoField.MINUTE_OF_HOUR, 0)
.with(ChronoField.SECOND_OF_MINUTE, 0);
ZonedDateTime endOfHour = startOfHour.plusHours(1);
String airflowParentRunId = UUID.randomUUID().toString();
String task1Name = "task1";
String task2Name = "task2";
String dagName = "the.dag";
RunEvent airflowTask1 =
createAirflowRunEvent(
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task2Name,
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);

Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
assertThat(job)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
.hasFieldOrPropertyWithValue("parentJobName", dagName);

Job parentJob = client.getJob(NAMESPACE_NAME, dagName);
assertThat(parentJob)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
}

@Test
public void testOpenLineageJobHierarchyAirflowIntegrationWithTaskGroup()
throws ExecutionException, InterruptedException, TimeoutException {
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
ZonedDateTime startOfHour =
Instant.now()
.atZone(LineageTestUtils.LOCAL_ZONE)
.with(ChronoField.MINUTE_OF_HOUR, 0)
.with(ChronoField.SECOND_OF_MINUTE, 0);
ZonedDateTime endOfHour = startOfHour.plusHours(1);
String airflowParentRunId = UUID.randomUUID().toString();
String task1Name = "task_group.task1";
String task2Name = "task_group.task2";
String dagName = "dag_with_task_group";
RunEvent airflowTask1 =
createAirflowRunEvent(
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task2Name,
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -242,13 +358,27 @@ public void testOpenLineageJobHierarchyOldAirflowIntegration()
String task1Name = "task1";
String task2Name = "task2";
String dagName = "the_dag";

// the old integration also used the fully qualified task name as the parent job name
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName + "." + task1Name,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName + "." + task2Name,
dagName + "." + task2Name,
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -291,12 +421,24 @@ public void testOpenLineageJobHierarchyAirflowIntegrationConflictingRunUuid()
// two dag runs with different namespaces - should result in two distinct jobs
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

String secondNamespace = "another_namespace";
RunEvent airflowTask2 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, secondNamespace);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
secondNamespace);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -332,16 +474,22 @@ public void testOpenLineageJobHierarchySparkAndAirflow()
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent sparkTask =
createRunEvent(
ol,
startOfHour,
endOfHour,
airflowTask1.getRun().getRunId().toString(),
sparkTaskName,
dagName + "." + task1Name,
dagName + "." + task1Name + "." + sparkTaskName,
Optional.empty(),
NAMESPACE_NAME);

Expand Down Expand Up @@ -609,8 +757,8 @@ private RunEvent createAirflowRunEvent(
ZonedDateTime startOfHour,
ZonedDateTime endOfHour,
String airflowParentRunId,
String taskName,
String dagName,
String taskName,
String namespace) {
RunFacet airflowVersionFacet = ol.newRunFacet();
airflowVersionFacet
Expand All @@ -622,8 +770,8 @@ private RunEvent createAirflowRunEvent(
startOfHour,
endOfHour,
airflowParentRunId,
taskName,
dagName,
taskName,
Optional.of(airflowVersionFacet),
namespace);
}
Expand All @@ -634,8 +782,8 @@ private RunEvent createRunEvent(
ZonedDateTime startOfHour,
ZonedDateTime endOfHour,
String airflowParentRunId,
String taskName,
String dagName,
String taskName,
Optional<RunFacet> airflowVersionFacet,
String namespace) {
// The Java SDK requires parent run ids to be a UUID, but the python SDK doesn't. In order to
Expand All @@ -650,7 +798,7 @@ private RunEvent createRunEvent(
"run",
ImmutableMap.of("runId", airflowParentRunId),
"job",
ImmutableMap.of("namespace", namespace, "name", dagName + "." + taskName)));
ImmutableMap.of("namespace", namespace, "name", dagName)));
RunFacetsBuilder runFacetBuilder =
ol.newRunFacetsBuilder()
.nominalTime(ol.newNominalTimeRunFacet(startOfHour, endOfHour))
Expand All @@ -663,7 +811,7 @@ private RunEvent createRunEvent(
.job(
ol.newJob(
namespace,
dagName + "." + taskName,
taskName,
ol.newJobFacetsBuilder()
.documentation(ol.newDocumentationJobFacet("the job docs"))
.sql(ol.newSQLJobFacet("SELECT * FROM the_table"))
Expand Down

0 comments on commit e61fe48

Please sign in to comment.