Skip to content

Commit

Permalink
delete: add possibility to soft-delete datasets
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Jul 7, 2022
1 parent 7a7a2a8 commit cf50ad0
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 58 deletions.
22 changes: 22 additions & 0 deletions api/src/main/java/marquez/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -150,6 +151,27 @@ public Response list(
return Response.ok(new ResultsPage<>("datasets", datasets, totalCount)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@DELETE
@Path("{dataset}")
@Produces(APPLICATION_JSON)
public Response delete(
@PathParam("namespace") NamespaceName namespaceName,
@PathParam("dataset") DatasetName datasetName) {
throwIfNotExists(namespaceName);

datasetService
.softDelete(namespaceName.getValue(), datasetName.getValue())
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
Dataset dataset =
datasetService
.findDatasetByName(namespaceName.getValue(), datasetName.getValue())
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
return Response.ok(dataset).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
Expand Down
102 changes: 57 additions & 45 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,51 +138,53 @@ default void setFields(Dataset ds) {
Optional<DatasetRow> getUuid(String namespaceName, String datasetName);

@SqlQuery(
"WITH selected_datasets AS (\n"
+ " SELECT d.*\n"
+ " FROM datasets d\n"
+ " WHERE d.namespace_name = :namespaceName\n"
+ " ORDER BY d.name\n"
+ " LIMIT :limit OFFSET :offset\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = rim.run_uuid\n"
+ " ) e ON e.run_uuid = rim.run_uuid\n"
+ ")\n"
+ "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n"
+ "FROM selected_datasets d\n"
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n"
+ " FROM tags AS t\n"
+ " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n"
+ " GROUP BY m.dataset_uuid\n"
+ ") t ON t.dataset_uuid = d.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets\n"
+ " FROM dataset_runs d2,\n"
+ " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n"
+ " WHERE d2.run_uuid = d2.run_uuid\n"
+ " AND ds -> 'facets' IS NOT NULL\n"
+ " AND ds ->> 'name' = d2.name\n"
+ " AND ds ->> 'namespace' = d2.namespace_name\n"
+ " GROUP BY d2.uuid\n"
+ ") f ON f.dataset_uuid = d.uuid\n"
+ "ORDER BY d.name")
"""
WITH selected_datasets AS (
SELECT d.*
FROM datasets d
WHERE d.namespace_name = :namespaceName
AND d.is_deleted is false
ORDER BY d.name
LIMIT :limit OFFSET :offset
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = dv.run_uuid
) e ON e.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = rim.run_uuid
) e ON e.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets
FROM dataset_runs d2,
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE d2.run_uuid = d2.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = d2.name
AND ds ->> 'namespace' = d2.namespace_name
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid
ORDER BY d.name""")
List<Dataset> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM datasets")
Expand Down Expand Up @@ -284,6 +286,16 @@ DatasetRow upsert(
String name,
String physicalName);

@SqlQuery(
"""
UPDATE datasets
SET is_deleted = true
WHERE namespace_name = :namespaceName
AND name = :name
RETURNING *
""")
Optional<DatasetRow> softDelete(String namespaceName, String name);

@Transaction
default Dataset upsertDatasetMeta(
NamespaceName namespaceName, DatasetName datasetName, DatasetMeta datasetMeta) {
Expand Down
13 changes: 8 additions & 5 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ public interface LineageDao {
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

@SqlQuery(
"SELECT ds.*, dv.fields, dv.lifecycle_state\n"
+ "FROM datasets ds\n"
+ "LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid\n"
+ "WHERE ds.uuid IN (<dsUuids>);")
Set<DatasetData> getDatasetData(@BindList Set<UUID> dsUuids);
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
FROM datasets ds
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
WHERE ds.uuid IN (<dsUuids>)
AND ds.is_deleted is false
""")
Set<DatasetData> getNonDeletedDatasetData(@BindList Set<UUID> dsUuids);

@SqlQuery(
"select j.uuid from jobs j\n"
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public Lineage lineage(NodeId nodeId, int depth) {
.collect(Collectors.toSet());
Set<DatasetData> datasets = new HashSet<>();
if (!datasetIds.isEmpty()) {
datasets.addAll(getDatasetData(datasetIds));
datasets.addAll(this.getNonDeletedDatasetData(datasetIds));
}

return toLineage(jobData, datasets);
Expand Down
83 changes: 83 additions & 0 deletions api/src/test/java/marquez/DatasetIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.net.http.HttpResponse;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -357,4 +360,84 @@ public void testApp_upsertDescription() {
// Description stays
assertThat(dataset2.getDescription()).isEqualTo(DESCRIPTION.getDescription());
}

@Test
public void testApp_doesNotShowDeletedDataset() throws IOException {
String namespace = "namespace";
String name = "table";
LineageEvent event =
new LineageEvent(
"COMPLETE",
Instant.now().atZone(ZoneId.systemDefault()),
new LineageEvent.Run(UUID.randomUUID().toString(), null),
new LineageEvent.Job("namespace", "job_name", null),
List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())),
Collections.emptyList(),
"the_producer");

final CompletableFuture<Integer> resp =
this.sendLineage(Utils.toJson(event))
.thenApply(HttpResponse::statusCode)
.whenComplete(
(val, error) -> {
if (error != null) {
Assertions.fail("Could not complete request");
}
});

// Ensure the event was correctly rejected and a proper response code returned.
assertThat(resp.join()).isEqualTo(201);

client.deleteDataset(namespace, name);

List<Dataset> datasets = client.listDatasets(namespace);
assertThat(datasets).hasSize(0);
}

@Test
public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOException {
String namespace = "namespace";
String name = "anotherTable";
LineageEvent event =
new LineageEvent(
"COMPLETE",
Instant.now().atZone(ZoneId.systemDefault()),
new LineageEvent.Run(UUID.randomUUID().toString(), null),
new LineageEvent.Job("namespace", "job_name", null),
List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())),
Collections.emptyList(),
"the_producer");

CompletableFuture<Integer> resp =
this.sendLineage(Utils.toJson(event))
.thenApply(HttpResponse::statusCode)
.whenComplete(
(val, error) -> {
if (error != null) {
Assertions.fail("Could not complete request");
}
});

// Ensure the event was correctly rejected and a proper response code returned.
assertThat(resp.join()).isEqualTo(201);

client.deleteDataset(namespace, name);

List<Dataset> datasets = client.listDatasets(namespace);
assertThat(datasets).hasSize(0);
resp = this.sendLineage(Utils.toJson(event))
.thenApply(HttpResponse::statusCode)
.whenComplete(
(val, error) -> {
if (error != null) {
Assertions.fail("Could not complete request");
}
});

assertThat(resp.join()).isEqualTo(201);

datasets = client.listDatasets(namespace);
assertThat(datasets).hasSize(1);
}

}
36 changes: 32 additions & 4 deletions api/src/test/java/marquez/db/DatasetDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,20 +287,22 @@ public void testGetDatasets() {
ImmutableMap.of("writeFacet", new CustomValueFacet("firstWriteValue")))));

String secondDatasetName = "secondDataset";
String deletedDatasetName = "deletedDataset";
createLineageRow(
openLineageDao,
"secondWriteJob",
"COMPLETE",
jobFacet,
Collections.emptyList(),
Collections.singletonList(
List.of(
new Dataset(
NAMESPACE,
secondDatasetName,
newDatasetFacet(
ImmutableMap.of("writeFacet", new CustomValueFacet("secondWriteValue")),
new SchemaField("age", "int", "the age"),
new SchemaField("address", "string", "the address")))));
new SchemaField("address", "string", "the address"))),
new Dataset(NAMESPACE, deletedDatasetName, newDatasetFacet())));

createLineageRow(
openLineageDao,
Expand All @@ -319,6 +321,11 @@ public void testGetDatasets() {
Collections.emptyList());

List<marquez.service.models.Dataset> datasets = datasetDao.findAll(NAMESPACE, 5, 0);
assertThat(datasets).hasSize(3);

datasetDao.softDelete(NAMESPACE, deletedDatasetName);

datasets = datasetDao.findAll(NAMESPACE, 5, 0);
assertThat(datasets).hasSize(2);

// datasets sorted alphabetically, so commonDataset is first
Expand Down Expand Up @@ -357,8 +364,7 @@ public void testGetDatasets() {
InstanceOfAssertFactories.map(String.class, Object.class))
.isNotEmpty()
.hasSize(6)
.containsKeys(
"documentation", "description", "schema", "dataSource", "writeFacet", "inputFacet")
.containsKeys("documentation", "description", "schema", "dataSource", "inputFacet")
.containsEntry(
"writeFacet",
ImmutableMap.of(
Expand All @@ -379,6 +385,28 @@ public void testGetDatasets() {
"http://test.schema/"));
}

@Test
public void testGetSpecificDatasetReturnsDatasetIfDeleted() {
createLineageRow(
openLineageDao,
"writeJob",
"COMPLETE",
jobFacet,
Collections.emptyList(),
Collections.singletonList(newCommonDataset(Collections.emptyMap())));

marquez.service.models.Dataset dataset = datasetDao.findDatasetByName(NAMESPACE, DATASET).get();

assertThat(dataset)
.matches(ds -> ds.getName().getValue().equals(DATASET))
.extracting(
marquez.service.models.Dataset::getFacets,
InstanceOfAssertFactories.map(String.class, Object.class))
.isNotEmpty()
.hasSize(4)
.containsKeys("documentation", "description", "schema", "dataSource");
}

@Test
public void testGetDatasetsWithMultipleVersions() {
String secondDatasetName = "secondDataset";
Expand Down
Loading

0 comments on commit cf50ad0

Please sign in to comment.