Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure job data in lineage query is not null or empty #2253

Merged
merged 9 commits into from
Dec 13, 2022
32 changes: 32 additions & 0 deletions api/src/main/java/marquez/api/BaseResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import marquez.api.exceptions.RunAlreadyExistsException;
import marquez.api.exceptions.RunNotFoundException;
import marquez.api.exceptions.SourceNotFoundException;
import marquez.common.models.DatasetFieldId;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.FieldName;
import marquez.common.models.JobId;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
Expand All @@ -37,6 +39,7 @@
import marquez.service.ServiceFactory;
import marquez.service.SourceService;
import marquez.service.TagService;
import marquez.service.models.NodeId;
import marquez.service.models.Run;

public class BaseResource {
Expand Down Expand Up @@ -74,6 +77,10 @@ void throwIfNotExists(@NonNull NamespaceName namespaceName) {
}
}

void throwIfNotExists(@NonNull DatasetId datasetId) {
throwIfNotExists(datasetId.getNamespace(), datasetId.getName());
}

void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull DatasetName datasetName) {
if (!datasetService.exists(namespaceName.getValue(), datasetName.getValue())) {
throw new DatasetNotFoundException(datasetName);
Expand All @@ -86,6 +93,13 @@ void throwIfSourceNotExists(SourceName sourceName) {
}
}

void throwIfNotExists(@NonNull DatasetFieldId datasetFieldId) {
throwIfNotExists(
datasetFieldId.getDatasetId().getNamespace(),
datasetFieldId.getDatasetId().getName(),
datasetFieldId.getFieldName());
}

void throwIfNotExists(
@NonNull NamespaceName namespaceName,
@NonNull DatasetName datasetName,
Expand All @@ -96,6 +110,10 @@ void throwIfNotExists(
}
}

void throwIfNotExists(@NonNull JobId jobId) {
throwIfNotExists(jobId.getNamespace(), jobId.getName());
}

void throwIfNotExists(@NonNull NamespaceName namespaceName, @NonNull JobName jobName) {
if (!jobService.exists(namespaceName.getValue(), jobName.getValue())) {
throw new JobNotFoundException(jobName);
Expand Down Expand Up @@ -137,6 +155,20 @@ void throwIfDatasetsNotExist(ImmutableSet<DatasetId> datasets) {
}
}

void throwIfNotExists(@NonNull NodeId nodeId) {
if (!nodeId.hasVersion()) {
if (nodeId.isDatasetType()) {
throwIfNotExists(nodeId.asDatasetId());
} else if (nodeId.isDatasetFieldType()) {
throwIfNotExists(nodeId.asDatasetFieldId());
} else if (nodeId.isJobType()) {
throwIfNotExists(nodeId.asJobId());
} else if (nodeId.isRunType()) {
throwIfNotExists(nodeId.asRunId());
}
}
}

URI locationFor(@NonNull UriInfo uriInfo, @NonNull Run run) {
return uriInfo
.getBaseUriBuilder()
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private int determineStatusCode(Throwable e) {
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
throwIfNotExists(nodeId);
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
}

Expand Down
44 changes: 33 additions & 11 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
Expand Down Expand Up @@ -49,14 +50,30 @@ public LineageService(LineageDao delegate, JobDao jobDao) {

// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth);
Optional<UUID> optionalUUID = getJobUuid(nodeId);
if (optionalUUID.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
log.warn(
"Failed to get job associated with node '{}', returning orphan graph...",
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}
UUID job = optionalUUID.get();

log.debug("Attempting to get lineage for job '{}'", job);
Set<JobData> jobData = getLineage(Collections.singleton(job), depth);

// Ensure job data is not empty, an empty set cannot be passed to LineageDao.getCurrentRuns() or
// LineageDao.getCurrentRunsWithFacets().
if (jobData.isEmpty()) {
// Log warning, then return an orphan lineage graph; a graph should contain at most one
// job->dataset relationship.
log.warn(
"Failed to get lineage for job '{}' associated with node '{}', returning orphan graph...",
job,
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}

List<Run> runs =
withRunFacets
? getCurrentRunsWithFacets(
Expand Down Expand Up @@ -85,19 +102,23 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
if (nodeId.isDatasetType()
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) {
log.warn(
"Found jobs {} which no longer share lineage with dataset {} - discarding",
jobData.stream().map(JobData::getId).toList());
DatasetId datasetId = nodeId.asDatasetId();
DatasetData datasetData =
getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
return new Lineage(
ImmutableSortedSet.of(
Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build()));
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
jobData.stream().map(JobData::getId).toList(),
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}

return toLineage(jobData, datasets);
}

private Lineage toLineageWithOrphanDataset(@NonNull DatasetId datasetId) {
final DatasetData datasetData =
getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());
return new Lineage(
ImmutableSortedSet.of(
Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build()));
}

private Lineage toLineage(Set<JobData> jobData, Set<DatasetData> datasets) {
Set<Node> nodes = new LinkedHashSet<>();
// build mapping for later
Expand Down Expand Up @@ -227,7 +248,8 @@ public Optional<UUID> getJobUuid(NodeId nodeId) {
return getJobFromInputOrOutput(
datasetId.getName().getValue(), datasetId.getNamespace().getValue());
} else {
throw new NodeIdNotFoundException("Node must be a dataset node or job node");
throw new NodeIdNotFoundException(
String.format("Node '%s' must be of type dataset or job!", nodeId.getValue()));
}
}
}
9 changes: 7 additions & 2 deletions api/src/test/java/marquez/api/OpenLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -20,6 +21,7 @@
import javax.ws.rs.core.Response;
import marquez.common.Utils;
import marquez.db.OpenLineageDao;
import marquez.service.JobService;
import marquez.service.LineageService;
import marquez.service.ServiceFactory;
import marquez.service.models.Lineage;
Expand All @@ -36,6 +38,8 @@ class OpenLineageResourceTest {
static {
LineageService lineageService = mock(LineageService.class);
OpenLineageDao openLineageDao = mock(OpenLineageDao.class);
JobService jobService = mock(JobService.class);
when(jobService.exists(anyString(), anyString())).thenReturn(true);

Node testNode =
Utils.fromJson(
Expand All @@ -45,7 +49,8 @@ class OpenLineageResourceTest {
when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE);

ServiceFactory serviceFactory =
ApiTestUtils.mockServiceFactory(Map.of(LineageService.class, lineageService));
ApiTestUtils.mockServiceFactory(
Map.of(LineageService.class, lineageService, JobService.class, jobService));

UNDER_TEST =
ResourceExtension.builder()
Expand All @@ -58,7 +63,7 @@ public void testGetLineage() {
final Lineage lineage =
UNDER_TEST
.target("/api/v1/lineage")
.queryParam("nodeId", "job:test")
.queryParam("nodeId", "job:test-namespace:test-job")
.request()
.get()
.readEntity(Lineage.class);
Expand Down