Skip to content

Commit

Permalink
Merge branch 'main' into fix/handle_airflow_dags_and_groups
Browse files Browse the repository at this point in the history
  • Loading branch information
wslulciuc authored Sep 19, 2022
2 parents 47d0b22 + 9bb877d commit f82331e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
15 changes: 15 additions & 0 deletions api/src/main/java/marquez/service/OpenLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,21 @@
import marquez.common.models.JobVersionId;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.RunState;
import marquez.db.BaseDao;
import marquez.db.DatasetDao;
import marquez.db.DatasetVersionDao;
import marquez.db.models.ExtendedDatasetVersionRow;
import marquez.db.models.JobRow;
import marquez.db.models.RunArgsRow;
import marquez.db.models.RunRow;
import marquez.db.models.RunStateRow;
import marquez.db.models.UpdateLineageRow;
import marquez.service.RunTransitionListener.JobInputUpdate;
import marquez.service.RunTransitionListener.JobOutputUpdate;
import marquez.service.RunTransitionListener.RunInput;
import marquez.service.RunTransitionListener.RunOutput;
import marquez.service.RunTransitionListener.RunTransition;
import marquez.service.models.LineageEvent;
import marquez.service.models.RunMeta;

Expand Down Expand Up @@ -91,6 +94,7 @@ public CompletableFuture<Void> createAsync(LineageEvent event) {
buildJobOutputUpdate(update).ifPresent(runService::notify);
}
buildJobInputUpdate(update).ifPresent(runService::notify);
buildRunTransition(update).ifPresent(runService::notify);
}
});

Expand Down Expand Up @@ -222,4 +226,15 @@ private DatasetVersionId buildDatasetVersionId(ExtendedDatasetVersionRow ds) {
.name(DatasetName.of(ds.getDatasetName()))
.build();
}

private Optional<RunTransition> buildRunTransition(UpdateLineageRow record) {
RunId runId = RunId.of(record.getRun().getUuid());
RunStateRow runStateRow = record.getRunState();
if (runStateRow == null) {
return Optional.empty();
}
RunState newState = RunState.valueOf(runStateRow.getState());
RunState oldState = newState.isStarting() ? null : RunState.RUNNING;
return Optional.of(new RunTransition(runId, oldState, newState));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.RunTransitionListener.JobInputUpdate;
import marquez.service.RunTransitionListener.JobOutputUpdate;
import marquez.service.RunTransitionListener.RunTransition;
import marquez.service.models.Dataset;
import marquez.service.models.Job;
import marquez.service.models.LineageEvent;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class OpenLineageServiceIntegrationTest {
private DatasetVersionDao datasetVersionDao;
private ArgumentCaptor<JobInputUpdate> runInputListener;
private ArgumentCaptor<JobOutputUpdate> runOutputListener;
private ArgumentCaptor<RunTransition> runTransitionListener;
private OpenLineageService lineageService;

public static String EVENT_REQUIRED_ONLY = "open_lineage/event_required_only.json";
Expand Down Expand Up @@ -145,6 +147,8 @@ public void setup(Jdbi jdbi) throws SQLException {
doNothing().when(runService).notify(runInputListener.capture());
runOutputListener = ArgumentCaptor.forClass(JobOutputUpdate.class);
doNothing().when(runService).notify(runOutputListener.capture());
runTransitionListener = ArgumentCaptor.forClass(RunTransition.class);
doNothing().when(runService).notify(runTransitionListener.capture());
lineageService = new OpenLineageService(openLineageDao, runService);
datasetDao = jdbi.onDemand(DatasetDao.class);

Expand Down Expand Up @@ -243,6 +247,19 @@ public void testRunListenerOutput(List<URI> uris, ExpectedResults expectedResult
}
}

@ParameterizedTest
@MethodSource("getData")
public void testRunTransition(List<URI> uris, ExpectedResults expectedResults) {
initEvents(uris);

if (expectedResults.inputEventCount > 0) {
Assertions.assertEquals(
uris.size(),
runTransitionListener.getAllValues().size(),
"RunTransition happens once for each run");
}
}

@ParameterizedTest
@MethodSource({"getData"})
public void serviceCalls(List<URI> uris, ExpectedResults expectedResults) {
Expand Down

0 comments on commit f82331e

Please sign in to comment.