Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fix and tests for handling Airflow dags with dots and task groups #2126

Merged
merged 2 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@

@RegisterRowMapper(LineageEventMapper.class)
public interface OpenLineageDao extends BaseDao {
public String DEFAULT_SOURCE_NAME = "default";
public String DEFAULT_NAMESPACE_OWNER = "anonymous";
String DEFAULT_SOURCE_NAME = "default";
String DEFAULT_NAMESPACE_OWNER = "anonymous";

@SqlUpdate(
"INSERT INTO lineage_events ("
Expand Down Expand Up @@ -370,7 +370,10 @@ private JobRow findParentJobRow(
.findJobRowByRunUuid(uuid)
.map(
j -> {
String parentJobName = Utils.parseParentJobName(facet.getJob().getName());
String parentJobName =
facet.getJob().getName().equals(event.getJob().getName())
? Utils.parseParentJobName(facet.getJob().getName())
: facet.getJob().getName();
if (j.getNamespaceName().equals(facet.getJob().getNamespace())
&& j.getName().equals(parentJobName)) {
return j;
Expand Down Expand Up @@ -432,6 +435,10 @@ private JobRow createParentJobRunRecord(
PGobject inputs) {
Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
String parentJobName =
facet.getJob().getName().equals(event.getJob().getName())
? Utils.parseParentJobName(facet.getJob().getName())
: facet.getJob().getName();
JobRow newParentJobRow =
createJobDao()
.upsertJob(
Expand All @@ -440,7 +447,7 @@ private JobRow createParentJobRunRecord(
now,
namespace.getUuid(),
namespace.getName(),
Utils.parseParentJobName(facet.getJob().getName()),
parentJobName,
null,
jobContext.getUuid(),
location,
Expand Down
174 changes: 161 additions & 13 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,127 @@ public void testOpenLineageJobHierarchyAirflowIntegration()
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task2Name,
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);

Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
assertThat(job)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
.hasFieldOrPropertyWithValue("parentJobName", dagName);

Job parentJob = client.getJob(NAMESPACE_NAME, dagName);
assertThat(parentJob)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
}

@Test
public void testOpenLineageJobHierarchyAirflowIntegrationWithDagNameWithDot()
throws ExecutionException, InterruptedException, TimeoutException {
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
ZonedDateTime startOfHour =
Instant.now()
.atZone(LineageTestUtils.LOCAL_ZONE)
.with(ChronoField.MINUTE_OF_HOUR, 0)
.with(ChronoField.SECOND_OF_MINUTE, 0);
ZonedDateTime endOfHour = startOfHour.plusHours(1);
String airflowParentRunId = UUID.randomUUID().toString();
String task1Name = "task1";
String task2Name = "task2";
String dagName = "the.dag";
RunEvent airflowTask1 =
createAirflowRunEvent(
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task2Name,
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);

Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
assertThat(job)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
.hasFieldOrPropertyWithValue("parentJobName", dagName);

Job parentJob = client.getJob(NAMESPACE_NAME, dagName);
assertThat(parentJob)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
}

@Test
public void testOpenLineageJobHierarchyAirflowIntegrationWithTaskGroup()
throws ExecutionException, InterruptedException, TimeoutException {
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
ZonedDateTime startOfHour =
Instant.now()
.atZone(LineageTestUtils.LOCAL_ZONE)
.with(ChronoField.MINUTE_OF_HOUR, 0)
.with(ChronoField.SECOND_OF_MINUTE, 0);
ZonedDateTime endOfHour = startOfHour.plusHours(1);
String airflowParentRunId = UUID.randomUUID().toString();
String task1Name = "task_group.task1";
String task2Name = "task_group.task2";
String dagName = "dag_with_task_group";
RunEvent airflowTask1 =
createAirflowRunEvent(
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task2Name,
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -242,13 +358,27 @@ public void testOpenLineageJobHierarchyOldAirflowIntegration()
String task1Name = "task1";
String task2Name = "task2";
String dagName = "the_dag";

// the old integration also used the fully qualified task name as the parent job name
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName + "." + task1Name,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent airflowTask2 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task2Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName + "." + task2Name,
dagName + "." + task2Name,
NAMESPACE_NAME);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -291,12 +421,24 @@ public void testOpenLineageJobHierarchyAirflowIntegrationConflictingRunUuid()
// two dag runs with different namespaces - should result in two distinct jobs
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

String secondNamespace = "another_namespace";
RunEvent airflowTask2 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, secondNamespace);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
secondNamespace);

CompletableFuture<Integer> future = sendAllEvents(airflowTask1, airflowTask2);
future.get(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -332,16 +474,22 @@ public void testOpenLineageJobHierarchySparkAndAirflow()
String dagName = "the_dag";
RunEvent airflowTask1 =
createAirflowRunEvent(
ol, startOfHour, endOfHour, airflowParentRunId, task1Name, dagName, NAMESPACE_NAME);
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);

RunEvent sparkTask =
createRunEvent(
ol,
startOfHour,
endOfHour,
airflowTask1.getRun().getRunId().toString(),
sparkTaskName,
dagName + "." + task1Name,
dagName + "." + task1Name + "." + sparkTaskName,
Optional.empty(),
NAMESPACE_NAME);

Expand Down Expand Up @@ -609,8 +757,8 @@ private RunEvent createAirflowRunEvent(
ZonedDateTime startOfHour,
ZonedDateTime endOfHour,
String airflowParentRunId,
String taskName,
String dagName,
String taskName,
String namespace) {
RunFacet airflowVersionFacet = ol.newRunFacet();
airflowVersionFacet
Expand All @@ -622,8 +770,8 @@ private RunEvent createAirflowRunEvent(
startOfHour,
endOfHour,
airflowParentRunId,
taskName,
dagName,
taskName,
Optional.of(airflowVersionFacet),
namespace);
}
Expand All @@ -634,8 +782,8 @@ private RunEvent createRunEvent(
ZonedDateTime startOfHour,
ZonedDateTime endOfHour,
String airflowParentRunId,
String taskName,
String dagName,
String taskName,
Optional<RunFacet> airflowVersionFacet,
String namespace) {
// The Java SDK requires parent run ids to be a UUID, but the python SDK doesn't. In order to
Expand All @@ -650,7 +798,7 @@ private RunEvent createRunEvent(
"run",
ImmutableMap.of("runId", airflowParentRunId),
"job",
ImmutableMap.of("namespace", namespace, "name", dagName + "." + taskName)));
ImmutableMap.of("namespace", namespace, "name", dagName)));
RunFacetsBuilder runFacetBuilder =
ol.newRunFacetsBuilder()
.nominalTime(ol.newNominalTimeRunFacet(startOfHour, endOfHour))
Expand All @@ -663,7 +811,7 @@ private RunEvent createRunEvent(
.job(
ol.newJob(
namespace,
dagName + "." + taskName,
taskName,
ol.newJobFacetsBuilder()
.documentation(ol.newDocumentationJobFacet("the job docs"))
.sql(ol.newSQLJobFacet("SELECT * FROM the_table"))
Expand Down