Skip to content

Commit

Permalink
Flink fix terminal streaming events
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Mar 15, 2024
1 parent 78a191b commit 5e63ebb
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.45.0...HEAD)

### Fixed

* Streaming API: fix behaviour for `COMPLETE`/`FAIL` events within streaming jobs [`#2768`](https://github.com/MarquezProject/marquez/pull/2768) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*New `job_version` is not created for a streaming job terminal event with no dataset information and existing version is kept.*

## [0.45.0](https://github.com/MarquezProject/marquez/compare/0.44.0...0.45.0) - 2024-03-07

### Added
Expand Down
8 changes: 6 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.INPUT);
}
Expand All @@ -390,7 +390,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.OUTPUT);
}
Expand Down Expand Up @@ -791,6 +791,10 @@ default void updateMarquezOnStreamingJob(
jobVersionDao.loadJobRowRunDetails(
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid());

if (event.isTerminalEventForStreamingJobWithNoDatasets()) {
return;
}

if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) {
// need to insert new job version
BagOfJobVersionInfo bagOfJobVersionInfo =
Expand Down
14 changes: 14 additions & 0 deletions api/src/main/java/marquez/service/models/LineageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ public class LineageEvent extends BaseEvent {
@Valid @NotNull private String producer;
@Valid private URI schemaURL;

@JsonIgnore
public boolean isTerminalEvent() {
return (eventType != null)
&& (eventType.equalsIgnoreCase("COMPLETE") || eventType.equalsIgnoreCase("FAIL"));
}

@JsonIgnore
public boolean isTerminalEventForStreamingJobWithNoDatasets() {
return isTerminalEvent()
&& (job != null && job.isStreamingJob())
&& (outputs == null || outputs.isEmpty())
&& (inputs == null || inputs.isEmpty());
}

@AllArgsConstructor
@NoArgsConstructor
@Setter
Expand Down
35 changes: 35 additions & 0 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,41 @@ public void testGetLineageForRunningStreamingJob() {
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
}

@Test
public void testGetLineageForCompleteStreamingJob() {
Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();
Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build();

LineageTestUtils.createLineageRow(
openLineageDao,
"streamingjob",
"RUNNING",
JobFacet.builder()
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
.build(),
Arrays.asList(input),
Arrays.asList(output));

LineageTestUtils.createLineageRow(
openLineageDao,
"streamingjob",
"COMPLETE",
JobFacet.builder()
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
.build(),
Collections.emptyList(),
Collections.emptyList());

Lineage lineage =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
5,
true);

assertThat(lineage.getGraph()).hasSize(3); // 1 job + 2 datasets
}

@Test
public void testLineageForOrphanedDataset() {
UpdateLineageRow writeJob =
Expand Down
58 changes: 58 additions & 0 deletions api/src/test/java/marquez/service/models/LineageEventTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package marquez.service.models;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -21,10 +23,12 @@
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import marquez.common.Utils;
import marquez.common.models.FlexibleDateTimeDeserializer;
import marquez.service.models.LineageEvent.JobTypeJobFacet;
import marquez.service.models.LineageEvent.LineageEventBuilder;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -109,4 +113,58 @@ public void testJobTypeJobFacetSerialization() throws IOException {
assertThat(facet.getIntegration()).isEqualTo("FLINK");
assertThat(facet.getProcessingType()).isEqualTo("STREAMING");
}

@Test
public void testIsTerminalEvent() {
LineageEventBuilder builder = LineageEvent.builder();

assertThat(builder.eventType("compleTe").build().isTerminalEvent()).isTrue();
assertThat(builder.eventType("Fail").build().isTerminalEvent()).isTrue();
assertThat(builder.eventType("start").build().isTerminalEvent()).isFalse();
}

@Test
public void testSsTerminalEventForStreamingJobWithNoDatasets() {
LineageEvent.Job streamingJob = mock(LineageEvent.Job.class);
when(streamingJob.isStreamingJob()).thenReturn(true);
LineageEventBuilder builder = LineageEvent.builder().job(streamingJob);

assertThat(builder.eventType("complete").build().isTerminalEventForStreamingJobWithNoDatasets())
.isTrue();

assertThat(builder.eventType("start").build().isTerminalEventForStreamingJobWithNoDatasets())
.isFalse();

assertThat(
builder
.eventType("complete")
.inputs(Collections.emptyList())
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isTrue();

assertThat(
builder
.eventType("complete")
.outputs(Collections.emptyList())
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isTrue();

assertThat(
builder
.eventType("complete")
.outputs(Collections.singletonList(mock(LineageEvent.Dataset.class)))
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isFalse();

assertThat(
builder
.eventType("complete")
.job(mock(LineageEvent.Job.class))
.build()
.isTerminalEventForStreamingJobWithNoDatasets())
.isFalse();
}
}

0 comments on commit 5e63ebb

Please sign in to comment.