Skip to content

Commit

Permalink
fix broken lineage for repeated runs (#2710)
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski authored Dec 20, 2023
1 parent 83608bb commit 40d51f1
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.43.0...HEAD)

### Fixed:
* API: fix broken lineage graph for multiple runs of the same job.[`#2710`](https://github.com/MarquezProject/marquez/pull/2710) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Problem: lineage graph was not available for jobs run multiple times of the same job as a result of bug introduced with recent release. In order to fix the inconsistent data, [this query](https://github.com/MarquezProject/marquez/blob/83608bb13bd4dc235c065f95bebf8a88dcb53c61/api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java#L19) should be run. This is not required when upgrading directly to this version.*

## [0.43.0](https://github.com/MarquezProject/marquez/compare/0.42.0...0.43.0) - 2023-12-15
### Added
* API: refactor the `RunDao` SQL query [`#2685`](https://github.com/MarquezProject/marquez/pull/2685) [@sophiely](https://github.com/sophiely)
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ ExtendedJobVersionRow upsertJobVersion(
INSERT INTO job_versions_io_mapping (
job_version_uuid, dataset_uuid, io_type, job_uuid, job_symlink_target_uuid, is_current_job_version, made_current_at)
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE, NOW())
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO NOTHING
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_current_job_version = TRUE
""")
void upsertCurrentInputOrOutputDatasetFor(
UUID jobVersionUuid,
Expand Down
37 changes: 36 additions & 1 deletion api/src/test/java/marquez/db/LineageTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,48 @@ public static UpdateLineageRow createLineageRow(
List<Dataset> outputs,
@Valid LineageEvent.ParentRunFacet parentRunFacet,
ImmutableMap<String, Object> runFacets) {
return createLineageRow(
dao,
jobName,
UUID.randomUUID(),
status,
jobFacet,
inputs,
outputs,
parentRunFacet,
runFacets);
}

/**
* Create an {@link UpdateLineageRow} from the input job details and datasets.
*
* @param dao
* @param jobName
* @param runId
* @param status
* @param jobFacet
* @param inputs
* @param outputs
* @param parentRunFacet
* @param runFacets
* @return
*/
public static UpdateLineageRow createLineageRow(
OpenLineageDao dao,
String jobName,
UUID runId,
String status,
JobFacet jobFacet,
List<Dataset> inputs,
List<Dataset> outputs,
@Valid LineageEvent.ParentRunFacet parentRunFacet,
ImmutableMap<String, Object> runFacets) {
NominalTimeRunFacet nominalTimeRunFacet = new NominalTimeRunFacet();
nominalTimeRunFacet.setNominalStartTime(
Instant.now().atZone(LOCAL_ZONE).truncatedTo(ChronoUnit.HOURS));
nominalTimeRunFacet.setNominalEndTime(
nominalTimeRunFacet.getNominalStartTime().plus(1, ChronoUnit.HOURS));

UUID runId = UUID.randomUUID();
LineageEvent event =
LineageEvent.builder()
.eventType(status)
Expand Down
73 changes: 73 additions & 0 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.api.JdbiUtils;
import marquez.common.models.DatasetId;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class LineageServiceTest {
Expand Down Expand Up @@ -427,6 +429,77 @@ public void testLineageWithWithCycle() {
.matches(n -> n.isJobType() && n.asJobId().getName().getValue().equals("writeJob"));
}

@Test
public void testGetLineageJobRunTwice() {
Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();
Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build();
UUID runId = UUID.randomUUID();

// (1) Run batch job which outputs input-dataset
LineageTestUtils.createLineageRow(
openLineageDao,
"someJob",
runId,
"START",
jobFacet,
Arrays.asList(input),
Collections.emptyList(),
null,
ImmutableMap.of());

LineageTestUtils.createLineageRow(
openLineageDao,
"someJob",
runId,
"COMPLETE",
jobFacet,
Collections.emptyList(),
Arrays.asList(output),
null,
ImmutableMap.of());

// (2) Rerun it
LineageTestUtils.createLineageRow(
openLineageDao,
"someJob",
runId,
"START",
jobFacet,
Arrays.asList(input),
Collections.emptyList(),
null,
ImmutableMap.of());

LineageTestUtils.createLineageRow(
openLineageDao,
"someJob",
runId,
"COMPLETE",
jobFacet,
Collections.emptyList(),
Arrays.asList(output),
null,
ImmutableMap.of());

// (4) lineage on output dataset shall be same as lineage on input dataset
Lineage lineageFromInput =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("input-dataset"))),
5,
true);

Lineage lineageFromOutput =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
5,
true);

assertThat(lineageFromInput.getGraph()).hasSize(3); // 2 datasets + 1 job
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
}

@Test
public void testGetLineageForRunningStreamingJob() {
Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();
Expand Down

0 comments on commit 40d51f1

Please sign in to comment.