From 05573b87b62eb08a75aacc9225e13f1db518287a Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Tue, 4 Oct 2022 17:39:07 -0700 Subject: [PATCH] Fix bug that caused a single run event to create multiple jobs Signed-off-by: Michael Collado --- .../main/java/marquez/db/OpenLineageDao.java | 183 ++++++++++-------- .../marquez/OpenLineageIntegrationTest.java | 56 ++++++ 2 files changed, 161 insertions(+), 78 deletions(-) diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 4e94ea1748..96611f9e65 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -58,6 +58,7 @@ import marquez.service.models.LineageEvent.LifecycleStateChangeFacet; import marquez.service.models.LineageEvent.NominalTimeRunFacet; import marquez.service.models.LineageEvent.ParentRunFacet; +import marquez.service.models.LineageEvent.Run; import marquez.service.models.LineageEvent.RunFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; @@ -150,24 +151,12 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper DEFAULT_NAMESPACE_OWNER); bag.setNamespace(namespace); - String description = - Optional.ofNullable(event.getJob().getFacets()) - .map(JobFacet::getDocumentation) - .map(DocumentationJobFacet::getDescription) - .orElse(null); - Map context = buildJobContext(event); JobContextRow jobContext = jobContextDao.upsert( UUID.randomUUID(), now, Utils.toJson(context), Utils.checksumFor(context)); bag.setJobContext(jobContext); - String location = - Optional.ofNullable(event.getJob().getFacets()) - .flatMap(f -> Optional.ofNullable(f.getSourceCodeLocation())) - .flatMap(s -> Optional.ofNullable(s.getUrl())) - .orElse(null); - Instant nominalStartTime = Optional.ofNullable(event.getRun().getFacets()) .flatMap(f -> Optional.ofNullable(f.getNominalTime())) @@ -181,75 +170,26 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) .orElse(null); - Logger log = LoggerFactory.getLogger(OpenLineageDao.class); Optional parentRun = - Optional.ofNullable(event.getRun()) - .map(LineageEvent.Run::getFacets) - .map(RunFacet::getParent); - + Optional.ofNullable(event.getRun()).map(Run::getFacets).map(RunFacet::getParent); Optional parentUuid = parentRun.map(Utils::findParentRunUuid); - Optional parentJob = - parentUuid.map( - uuid -> - findParentJobRow( - event, - namespace, - jobContext, - location, - nominalStartTime, - nominalEndTime, - log, - parentRun.get(), - uuid)); - // construct the simple name of the job by removing the parent prefix plus the dot '.' separator - String jobName = - parentJob - .map( - p -> { - if (event.getJob().getName().startsWith(p.getName() + '.')) { - return event.getJob().getName().substring(p.getName().length() + 1); - } else { - return event.getJob().getName(); - } - }) - .orElse(event.getJob().getName()); - log.debug( - "Calculated job name {} from job {} with parent {}", - jobName, - event.getJob().getName(), - parentJob.map(JobRow::getName)); JobRow job = - parentJob - .map( - parent -> - jobDao.upsertJob( - UUID.randomUUID(), - parent.getUuid(), - getJobType(event.getJob()), - now, - namespace.getUuid(), - namespace.getName(), - jobName, - description, - jobContext.getUuid(), - location, - null, - jobDao.toJson(toDatasetId(event.getInputs()), mapper))) + runDao + .findJobRowByRunUuid(runToUuid(event.getRun().getRunId())) .orElseGet( () -> - jobDao.upsertJob( - UUID.randomUUID(), - getJobType(event.getJob()), + buildJobFromEvent( + event, + mapper, + jobDao, now, - namespace.getUuid(), - namespace.getName(), - jobName, - description, - jobContext.getUuid(), - location, - null, - jobDao.toJson(toDatasetId(event.getInputs()), mapper))); + namespace, + jobContext, + nominalStartTime, + nominalEndTime, + parentRun)); + bag.setJob(job); Map runArgsMap = createRunArgs(event); @@ -277,8 +217,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper runStateType, now, namespace.getName(), - jobName, - location, + job.getName(), + job.getLocation(), jobContext.getUuid()); } else { run = @@ -294,8 +234,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper nominalEndTime, namespace.getUuid(), namespace.getName(), - jobName, - location, + job.getName(), + job.getLocation(), jobContext.getUuid()); } bag.setRun(run); @@ -363,6 +303,93 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper return bag; } + private JobRow buildJobFromEvent( + LineageEvent event, + ObjectMapper mapper, + JobDao jobDao, + Instant now, + NamespaceRow namespace, + JobContextRow jobContext, + Instant nominalStartTime, + Instant nominalEndTime, + Optional parentRun) { + Logger log = LoggerFactory.getLogger(OpenLineageDao.class); + String description = + Optional.ofNullable(event.getJob().getFacets()) + .map(JobFacet::getDocumentation) + .map(DocumentationJobFacet::getDescription) + .orElse(null); + + String location = + Optional.ofNullable(event.getJob().getFacets()) + .flatMap(f -> Optional.ofNullable(f.getSourceCodeLocation())) + .flatMap(s -> Optional.ofNullable(s.getUrl())) + .orElse(null); + + Optional parentUuid = parentRun.map(Utils::findParentRunUuid); + Optional parentJob = + parentUuid.map( + uuid -> + findParentJobRow( + event, + namespace, + jobContext, + location, + nominalStartTime, + nominalEndTime, + log, + parentRun.get(), + uuid)); + + // construct the simple name of the job by removing the parent prefix plus the dot '.' separator + String jobName = + parentJob + .map( + p -> { + if (event.getJob().getName().startsWith(p.getName() + '.')) { + return event.getJob().getName().substring(p.getName().length() + 1); + } else { + return event.getJob().getName(); + } + }) + .orElse(event.getJob().getName()); + log.debug( + "Calculated job name {} from job {} with parent {}", + jobName, + event.getJob().getName(), + parentJob.map(JobRow::getName)); + return parentJob + .map( + parent -> + jobDao.upsertJob( + UUID.randomUUID(), + parent.getUuid(), + getJobType(event.getJob()), + now, + namespace.getUuid(), + namespace.getName(), + jobName, + description, + jobContext.getUuid(), + location, + null, + jobDao.toJson(toDatasetId(event.getInputs()), mapper))) + .orElseGet( + () -> + jobDao.upsertJob( + UUID.randomUUID(), + getJobType(event.getJob()), + now, + namespace.getUuid(), + namespace.getName(), + jobName, + description, + jobContext.getUuid(), + location, + null, + jobDao.toJson(toDatasetId(event.getInputs()), mapper))); + } + private JobRow findParentJobRow( LineageEvent event, NamespaceRow namespace, diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 8e3f8ce0a3..77cddb62ad 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -12,6 +12,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; import com.google.common.collect.ImmutableMap; import io.dropwizard.util.Resources; import io.openlineage.client.OpenLineage; @@ -369,6 +371,60 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunF assertThat(runsList).isNotEmpty().hasSize(1); } + @Test + public void testOpenLineageJobHierarchyAirflowIntegrationWithParentOnStartEventOnly() + throws ExecutionException, InterruptedException, TimeoutException, JsonProcessingException { + OpenLineage ol = new OpenLineage(URI.create("http://openlineage.test.com/")); + ZonedDateTime startOfHour = + Instant.now() + .atZone(LineageTestUtils.LOCAL_ZONE) + .with(ChronoField.MINUTE_OF_HOUR, 0) + .with(ChronoField.SECOND_OF_MINUTE, 0); + ZonedDateTime endOfHour = startOfHour.plusHours(1); + String airflowParentRunId = UUID.randomUUID().toString(); + String task1Name = "task1"; + String dagName = "the_dag"; + RunEvent event1 = + createAirflowRunEvent( + ol, + startOfHour, + endOfHour, + airflowParentRunId, + dagName, + dagName + "." + task1Name, + NAMESPACE_NAME); + ObjectMapper mapper = Utils.newObjectMapper(); + JsonNode eventOneJson = mapper.valueToTree(event1); + ((ObjectNode) eventOneJson).set("eventType", new TextNode("START")); + + event1.getRun().getFacets().getAdditionalProperties().remove("parent"); + CompletableFuture.allOf( + sendLineage(mapper.writeValueAsString(eventOneJson)) + .thenCompose( + r -> { + try { + return sendLineage(mapper.writeValueAsString(event1)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + })) + .get(5, TimeUnit.SECONDS); + + Job job = client.getJob(NAMESPACE_NAME, dagName + "." + task1Name); + assertThat(job) + .isNotNull() + .hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName + "." + task1Name)) + .hasFieldOrPropertyWithValue("parentJobName", dagName); + + Job parentJob = client.getJob(NAMESPACE_NAME, dagName); + assertThat(parentJob) + .isNotNull() + .hasFieldOrPropertyWithValue("id", new JobId(NAMESPACE_NAME, dagName)) + .hasFieldOrPropertyWithValue("parentJobName", null); + List runsList = client.listRuns(NAMESPACE_NAME, dagName); + assertThat(runsList).isNotEmpty().hasSize(1); + } + @Test public void testOpenLineageJobHierarchyAirflowIntegrationWithDagNameWithDot() throws ExecutionException, InterruptedException, TimeoutException {