Skip to content

Commit

Permalink
Fix bug that caused a single run event to create multiple jobs (#2162)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Collado <collado.mike@gmail.com>

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike authored Oct 5, 2022
1 parent 67e9249 commit b9abb19
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 78 deletions.
183 changes: 105 additions & 78 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import marquez.service.models.LineageEvent.LifecycleStateChangeFacet;
import marquez.service.models.LineageEvent.NominalTimeRunFacet;
import marquez.service.models.LineageEvent.ParentRunFacet;
import marquez.service.models.LineageEvent.Run;
import marquez.service.models.LineageEvent.RunFacet;
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
Expand Down Expand Up @@ -150,24 +151,12 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
DEFAULT_NAMESPACE_OWNER);
bag.setNamespace(namespace);

String description =
Optional.ofNullable(event.getJob().getFacets())
.map(JobFacet::getDocumentation)
.map(DocumentationJobFacet::getDescription)
.orElse(null);

Map<String, String> context = buildJobContext(event);
JobContextRow jobContext =
jobContextDao.upsert(
UUID.randomUUID(), now, Utils.toJson(context), Utils.checksumFor(context));
bag.setJobContext(jobContext);

String location =
Optional.ofNullable(event.getJob().getFacets())
.flatMap(f -> Optional.ofNullable(f.getSourceCodeLocation()))
.flatMap(s -> Optional.ofNullable(s.getUrl()))
.orElse(null);

Instant nominalStartTime =
Optional.ofNullable(event.getRun().getFacets())
.flatMap(f -> Optional.ofNullable(f.getNominalTime()))
Expand All @@ -181,75 +170,26 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
.map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant())
.orElse(null);

Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
Optional<ParentRunFacet> parentRun =
Optional.ofNullable(event.getRun())
.map(LineageEvent.Run::getFacets)
.map(RunFacet::getParent);

Optional.ofNullable(event.getRun()).map(Run::getFacets).map(RunFacet::getParent);
Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);
Optional<JobRow> parentJob =
parentUuid.map(
uuid ->
findParentJobRow(
event,
namespace,
jobContext,
location,
nominalStartTime,
nominalEndTime,
log,
parentRun.get(),
uuid));

// construct the simple name of the job by removing the parent prefix plus the dot '.' separator
String jobName =
parentJob
.map(
p -> {
if (event.getJob().getName().startsWith(p.getName() + '.')) {
return event.getJob().getName().substring(p.getName().length() + 1);
} else {
return event.getJob().getName();
}
})
.orElse(event.getJob().getName());
log.debug(
"Calculated job name {} from job {} with parent {}",
jobName,
event.getJob().getName(),
parentJob.map(JobRow::getName));
JobRow job =
parentJob
.map(
parent ->
jobDao.upsertJob(
UUID.randomUUID(),
parent.getUuid(),
getJobType(event.getJob()),
now,
namespace.getUuid(),
namespace.getName(),
jobName,
description,
jobContext.getUuid(),
location,
null,
jobDao.toJson(toDatasetId(event.getInputs()), mapper)))
runDao
.findJobRowByRunUuid(runToUuid(event.getRun().getRunId()))
.orElseGet(
() ->
jobDao.upsertJob(
UUID.randomUUID(),
getJobType(event.getJob()),
buildJobFromEvent(
event,
mapper,
jobDao,
now,
namespace.getUuid(),
namespace.getName(),
jobName,
description,
jobContext.getUuid(),
location,
null,
jobDao.toJson(toDatasetId(event.getInputs()), mapper)));
namespace,
jobContext,
nominalStartTime,
nominalEndTime,
parentRun));

bag.setJob(job);

Map<String, String> runArgsMap = createRunArgs(event);
Expand Down Expand Up @@ -277,8 +217,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runStateType,
now,
namespace.getName(),
jobName,
location,
job.getName(),
job.getLocation(),
jobContext.getUuid());
} else {
run =
Expand All @@ -294,8 +234,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
nominalEndTime,
namespace.getUuid(),
namespace.getName(),
jobName,
location,
job.getName(),
job.getLocation(),
jobContext.getUuid());
}
bag.setRun(run);
Expand Down Expand Up @@ -363,6 +303,93 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
return bag;
}

private JobRow buildJobFromEvent(
LineageEvent event,
ObjectMapper mapper,
JobDao jobDao,
Instant now,
NamespaceRow namespace,
JobContextRow jobContext,
Instant nominalStartTime,
Instant nominalEndTime,
Optional<ParentRunFacet> parentRun) {
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
String description =
Optional.ofNullable(event.getJob().getFacets())
.map(JobFacet::getDocumentation)
.map(DocumentationJobFacet::getDescription)
.orElse(null);

String location =
Optional.ofNullable(event.getJob().getFacets())
.flatMap(f -> Optional.ofNullable(f.getSourceCodeLocation()))
.flatMap(s -> Optional.ofNullable(s.getUrl()))
.orElse(null);

Optional<UUID> parentUuid = parentRun.map(Utils::findParentRunUuid);
Optional<JobRow> parentJob =
parentUuid.map(
uuid ->
findParentJobRow(
event,
namespace,
jobContext,
location,
nominalStartTime,
nominalEndTime,
log,
parentRun.get(),
uuid));

// construct the simple name of the job by removing the parent prefix plus the dot '.' separator
String jobName =
parentJob
.map(
p -> {
if (event.getJob().getName().startsWith(p.getName() + '.')) {
return event.getJob().getName().substring(p.getName().length() + 1);
} else {
return event.getJob().getName();
}
})
.orElse(event.getJob().getName());
log.debug(
"Calculated job name {} from job {} with parent {}",
jobName,
event.getJob().getName(),
parentJob.map(JobRow::getName));
return parentJob
.map(
parent ->
jobDao.upsertJob(
UUID.randomUUID(),
parent.getUuid(),
getJobType(event.getJob()),
now,
namespace.getUuid(),
namespace.getName(),
jobName,
description,
jobContext.getUuid(),
location,
null,
jobDao.toJson(toDatasetId(event.getInputs()), mapper)))
.orElseGet(
() ->
jobDao.upsertJob(
UUID.randomUUID(),
getJobType(event.getJob()),
now,
namespace.getUuid(),
namespace.getName(),
jobName,
description,
jobContext.getUuid(),
location,
null,
jobDao.toJson(toDatasetId(event.getInputs()), mapper)));
}

private JobRow findParentJobRow(
LineageEvent event,
NamespaceRow namespace,
Expand Down
56 changes: 56 additions & 0 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.ImmutableMap;
import io.dropwizard.util.Resources;
import io.openlineage.client.OpenLineage;
Expand Down Expand Up @@ -369,6 +371,60 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunF
assertThat(runsList).isNotEmpty().hasSize(1);
}

@Test
public void testOpenLineageJobHierarchyAirflowIntegrationWithParentOnStartEventOnly()
throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException {
OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/"));
ZonedDateTime startOfHour =
Instant.now()
.atZone(LineageTestUtils.LOCAL_ZONE)
.with(ChronoField.MINUTE_OF_HOUR, 0)
.with(ChronoField.SECOND_OF_MINUTE, 0);
ZonedDateTime endOfHour = startOfHour.plusHours(1);
String airflowParentRunId = UUID.randomUUID().toString();
String task1Name = "task1";
String dagName = "the_dag";
RunEvent event1 =
createAirflowRunEvent(
ol,
startOfHour,
endOfHour,
airflowParentRunId,
dagName,
dagName + "." + task1Name,
NAMESPACE_NAME);
ObjectMapper mapper = Utils.newObjectMapper();
JsonNode eventOneJson = mapper.valueToTree(event1);
((ObjectNode) eventOneJson).set("eventType", new TextNode("START"));

event1.getRun().getFacets().getAdditionalProperties().remove("parent");
CompletableFuture.allOf(
sendLineage(mapper.writeValueAsString(eventOneJson))
.thenCompose(
r -> {
try {
return sendLineage(mapper.writeValueAsString(event1));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}))
.get(5, TimeUnit.SECONDS);

Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name);
assertThat(job)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name))
.hasFieldOrPropertyWithValue("parentJobName", dagName);

Job parentJob = client.getJob(NAMESPACE_NAME, dagName);
assertThat(parentJob)
.isNotNull()
.hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName))
.hasFieldOrPropertyWithValue("parentJobName", null);
List<Run> runsList = client.listRuns(NAMESPACE_NAME, dagName);
assertThat(runsList).isNotEmpty().hasSize(1);
}

@Test
public void testOpenLineageJobHierarchyAirflowIntegrationWithDagNameWithDot()
throws ExecutionException, InterruptedException, TimeoutException {
Expand Down

0 comments on commit b9abb19

Please sign in to comment.