Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken lineage for repeated runs #2710

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.43.0...HEAD)

### Fixed:
* API: fix broken lineage graph for multiple runs of the same job.[`#2710`](https://github.com/MarquezProject/marquez/pull/2710) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Problem: lineage graph was not available for jobs run multiple times of the same job as a result of bug introduced with recent release. In order to fix the inconsistent data, [this query](https://github.com/MarquezProject/marquez/blob/83608bb13bd4dc235c065f95bebf8a88dcb53c61/api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java#L19) should be run. This is not required when upgrading directly to this version.*

## [0.43.0](https://github.com/MarquezProject/marquez/compare/0.42.0...0.43.0) - 2023-12-15
### Added
* API: refactor the `RunDao` SQL query [`#2685`](https://github.com/MarquezProject/marquez/pull/2685) [@sophiely](https://github.com/sophiely)
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ ExtendedJobVersionRow upsertJobVersion(
INSERT INTO job_versions_io_mapping (
job_version_uuid, dataset_uuid, io_type, job_uuid, job_symlink_target_uuid, is_current_job_version, made_current_at)
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE, NOW())
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO NOTHING
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_current_job_version = TRUE
""")
void upsertCurrentInputOrOutputDatasetFor(
UUID jobVersionUuid,
Expand Down
37 changes: 36 additions & 1 deletion api/src/test/java/marquez/db/LineageTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,48 @@ public static UpdateLineageRow createLineageRow(
List<Dataset> outputs,
@Valid LineageEvent.ParentRunFacet parentRunFacet,
ImmutableMap<String, Object> runFacets) {
return createLineageRow(
dao,
jobName,
UUID.randomUUID(),
status,
jobFacet,
inputs,
outputs,
parentRunFacet,
runFacets);
}

/**
* Create an {@link UpdateLineageRow} from the input job details and datasets.
*
* @param dao
* @param jobName
* @param runId
* @param status
* @param jobFacet
* @param inputs
* @param outputs
* @param parentRunFacet
* @param runFacets
* @return
*/
public static UpdateLineageRow createLineageRow(
OpenLineageDao dao,
String jobName,
UUID runId,
String status,
JobFacet jobFacet,
List<Dataset> inputs,
List<Dataset> outputs,
@Valid LineageEvent.ParentRunFacet parentRunFacet,
ImmutableMap<String, Object> runFacets) {
NominalTimeRunFacet nominalTimeRunFacet = new NominalTimeRunFacet();
nominalTimeRunFacet.setNominalStartTime(
Instant.now().atZone(LOCAL_ZONE).truncatedTo(ChronoUnit.HOURS));
nominalTimeRunFacet.setNominalEndTime(
nominalTimeRunFacet.getNominalStartTime().plus(1, ChronoUnit.HOURS));

UUID runId = UUID.randomUUID();
LineageEvent event =
LineageEvent.builder()
.eventType(status)
Expand Down
73 changes: 73 additions & 0 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.api.JdbiUtils;
import marquez.common.models.DatasetId;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class LineageServiceTest {
Expand Down Expand Up @@ -427,6 +429,77 @@ public void testLineageWithWithCycle() {
.matches(n -> n.isJobType() && n.asJobId().getName().getValue().equals("writeJob"));
}

@Test
public void testGetLineageJobRunTwice() {
Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();
Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build();
UUID runId = UUID.randomUUID();

// (1) Run batch job which outputs input-dataset
LineageTestUtils.createLineageRow(
openLineageDao,
"someJob",
runId,
"START",
jobFacet,
Arrays.asList(input),
Collections.emptyList(),
null,
ImmutableMap.of());

LineageTestUtils.createLineageRow(
openLineageDao,
"someJob",
runId,
"COMPLETE",
jobFacet,
Collections.emptyList(),
Arrays.asList(output),
null,
ImmutableMap.of());

// (2) Rerun it
LineageTestUtils.createLineageRow(
openLineageDao,
"someJob",
runId,
"START",
jobFacet,
Arrays.asList(input),
Collections.emptyList(),
null,
ImmutableMap.of());

LineageTestUtils.createLineageRow(
openLineageDao,
"someJob",
runId,
"COMPLETE",
jobFacet,
Collections.emptyList(),
Arrays.asList(output),
null,
ImmutableMap.of());

// (4) lineage on output dataset shall be same as lineage on input dataset
Lineage lineageFromInput =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("input-dataset"))),
5,
true);

Lineage lineageFromOutput =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
5,
true);

assertThat(lineageFromInput.getGraph()).hasSize(3); // 2 datasets + 1 job
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
}

@Test
public void testGetLineageForRunningStreamingJob() {
Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();
Expand Down
Loading