Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink fix terminal streaming events #2768

Merged
merged 1 commit into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.45.0...HEAD)

### Fixed

* Streaming API: fix behaviour for `COMPLETE`/`FAIL` events within streaming jobs [`#2768`](https://github.com/MarquezProject/marquez/pull/2768) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*New `job_version` is not created for a streaming job terminal event with no dataset information and existing version is kept.*

## [0.45.0](https://github.com/MarquezProject/marquez/compare/0.44.0...0.45.0) - 2024-03-07

### Added
Expand Down
8 changes: 6 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.INPUT);
}
Expand All @@ -390,7 +390,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.OUTPUT);
}
Expand Down Expand Up @@ -791,6 +791,10 @@ default void updateMarquezOnStreamingJob(
jobVersionDao.loadJobRowRunDetails(
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid());

if (event.isTerminalEventForStreamingJobWithNoDatasets()) {
return;
}

if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) {
// need to insert new job version
BagOfJobVersionInfo bagOfJobVersionInfo =
Expand Down
14 changes: 14 additions & 0 deletions api/src/main/java/marquez/service/models/LineageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ public class LineageEvent extends BaseEvent {
@Valid @NotNull private String producer;
@Valid private URI schemaURL;

@JsonIgnore
public boolean isTerminalEvent() {
return (eventType != null)
&& (eventType.equalsIgnoreCase("COMPLETE") || eventType.equalsIgnoreCase("FAIL"));
}

@JsonIgnore
public boolean isTerminalEventForStreamingJobWithNoDatasets() {
return isTerminalEvent()
&& (job != null && job.isStreamingJob())
&& (outputs == null || outputs.isEmpty())
&& (inputs == null || inputs.isEmpty());
}

@AllArgsConstructor
@NoArgsConstructor
@Setter
Expand Down
35 changes: 35 additions & 0 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,41 @@ public void testGetLineageForRunningStreamingJob() {
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
}

@Test
public void testGetLineageForCompleteStreamingJob() {
Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();
Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build();

LineageTestUtils.createLineageRow(
openLineageDao,
"streamingjob",
"RUNNING",
JobFacet.builder()
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
.build(),
Arrays.asList(input),
Arrays.asList(output));

LineageTestUtils.createLineageRow(
openLineageDao,
"streamingjob",
"COMPLETE",
JobFacet.builder()
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
.build(),
Collections.emptyList(),
Collections.emptyList());

Lineage lineage =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
5,
true);

assertThat(lineage.getGraph()).hasSize(3); // 1 job + 2 datasets
}

@Test
public void testLineageForOrphanedDataset() {
UpdateLineageRow writeJob =
Expand Down
80 changes: 80 additions & 0 deletions api/src/test/java/marquez/service/models/LineageEventTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package marquez.service.models;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -21,10 +23,12 @@
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import marquez.common.Utils;
import marquez.common.models.FlexibleDateTimeDeserializer;
import marquez.service.models.LineageEvent.JobTypeJobFacet;
import marquez.service.models.LineageEvent.LineageEventBuilder;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -109,4 +113,80 @@ public void testJobTypeJobFacetSerialization() throws IOException {
assertThat(facet.getIntegration()).isEqualTo("FLINK");
assertThat(facet.getProcessingType()).isEqualTo("STREAMING");
}

@Test
public void testIsTerminalEvent() {
LineageEventBuilder builder = LineageEvent.builder();

assertThat(builder.eventType("compleTe").build().isTerminalEvent()).isTrue();
assertThat(builder.eventType("Fail").build().isTerminalEvent()).isTrue();
assertThat(builder.eventType("start").build().isTerminalEvent()).isFalse();
}

@Test
public void testIsTerminalEventForStreamingJobWithNoDatasets() {
LineageEvent.Job streamingJob = mock(LineageEvent.Job.class);
when(streamingJob.isStreamingJob()).thenReturn(true);

assertThat(
LineageEvent.builder()
.job(streamingJob)
.eventType("complete")
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isTrue();

assertThat(
LineageEvent.builder()
.job(streamingJob)
.eventType("start")
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isFalse();

assertThat(
LineageEvent.builder()
.job(streamingJob)
.eventType("complete")
.inputs(Collections.emptyList())
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isTrue();

assertThat(
LineageEvent.builder()
.job(streamingJob)
.eventType("complete")
.inputs(Collections.singletonList(mock(LineageEvent.Dataset.class)))
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isFalse();

assertThat(
LineageEvent.builder()
.job(streamingJob)
.eventType("complete")
.outputs(Collections.emptyList())
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isTrue();

assertThat(
LineageEvent.builder()
.job(streamingJob)
.eventType("complete")
.outputs(Collections.singletonList(mock(LineageEvent.Dataset.class)))
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isFalse();

assertThat(
LineageEvent.builder()
.job(streamingJob)
.eventType("complete")
.job(mock(LineageEvent.Job.class))
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isFalse();
}
}
Loading