Skip to content

Commit

Permalink
deletes: add views instead of filtering
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Sep 6, 2022
1 parent ca204dd commit 17aaa8f
Show file tree
Hide file tree
Showing 23 changed files with 589 additions and 362 deletions.
7 changes: 4 additions & 3 deletions api/src/main/java/marquez/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,14 @@ public Response delete(
@PathParam("dataset") DatasetName datasetName) {
throwIfNotExists(namespaceName);

datasetService
.softDelete(namespaceName.getValue(), datasetName.getValue())
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
Dataset dataset =
datasetService
.findDatasetByName(namespaceName.getValue(), datasetName.getValue())
.orElseThrow(() -> new DatasetNotFoundException(datasetName));

datasetService
.delete(namespaceName.getValue(), datasetName.getValue())
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
return Response.ok(dataset).build();
}

Expand Down
20 changes: 20 additions & 0 deletions api/src/main/java/marquez/api/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -160,6 +161,25 @@ public Response list(
return Response.ok(new ResultsPage<>("jobs", jobs, totalCount)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@DELETE
@Path("/namespaces/{namespace}/jobs/{job}")
@Produces(APPLICATION_JSON)
public Response delete(
@PathParam("namespace") NamespaceName namespaceName, @PathParam("job") JobName jobName) {
throwIfNotExists(namespaceName);

Job job =
jobService
.findJobByName(namespaceName.getValue(), jobName.getValue())
.orElseThrow(() -> new JobNotFoundException(jobName));

jobService.delete(namespaceName.getValue(), jobName.getValue());
return Response.ok(job).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
Expand Down
178 changes: 92 additions & 86 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public interface DatasetDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
+ "SELECT 1 FROM datasets AS d "
+ "SELECT 1 FROM datasets_view AS d "
+ "WHERE d.name = :datasetName AND d.namespace_name = :namespaceName)")
boolean exists(String namespaceName, String datasetName);

Expand All @@ -69,49 +69,50 @@ void updateLastModifiedAt(
void updateVersion(UUID rowUuid, Instant updatedAt, UUID currentVersionUuid);

@SqlQuery(
"WITH selected_datasets AS (\n"
+ " SELECT d.*\n"
+ " FROM datasets d\n"
+ " WHERE d.namespace_name = :namespaceName\n"
+ " AND d.name = :datasetName\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = rim.run_uuid\n"
+ " ) e ON e.run_uuid = rim.run_uuid\n"
+ ")\n"
+ "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n"
+ "FROM selected_datasets d\n"
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n"
+ " FROM tags AS t\n"
+ " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n"
+ " GROUP BY m.dataset_uuid\n"
+ ") t ON t.dataset_uuid = d.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets\n"
+ " FROM dataset_runs d2,\n"
+ " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n"
+ " WHERE d2.run_uuid = d2.run_uuid\n"
+ " AND ds -> 'facets' IS NOT NULL\n"
+ " AND ds ->> 'name' = d2.name\n"
+ " AND ds ->> 'namespace' = d2.namespace_name\n"
+ " GROUP BY d2.uuid\n"
+ ") f ON f.dataset_uuid = d.uuid")
"""
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
AND d.name = :datasetName
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = dv.run_uuid
) e ON e.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = rim.run_uuid
) e ON e.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
FROM dataset_runs d2,
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE d2.run_uuid = d2.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = d2.name
AND ds ->> 'namespace' = d2.namespace_name
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid""")
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);

default Optional<Dataset> findWithTags(String namespaceName, String datasetName) {
Expand All @@ -131,19 +132,19 @@ default void setFields(Dataset ds) {
}

@SqlQuery(
"SELECT d.* FROM datasets AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName")
"SELECT d.* FROM datasets_view AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName")
Optional<DatasetRow> findDatasetAsRow(String namespaceName, String datasetName);

@SqlQuery("SELECT * FROM datasets WHERE name = :datasetName AND namespace_name = :namespaceName")
@SqlQuery(
"SELECT * FROM datasets_view WHERE name = :datasetName AND namespace_name = :namespaceName")
Optional<DatasetRow> getUuid(String namespaceName, String datasetName);

@SqlQuery(
"""
WITH selected_datasets AS (
SELECT d.*
FROM datasets d
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
AND d.is_deleted is false
ORDER BY d.name
LIMIT :limit OFFSET :offset
), dataset_runs AS (
Expand Down Expand Up @@ -187,10 +188,10 @@ SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS f
ORDER BY d.name""")
List<Dataset> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM datasets")
@SqlQuery("SELECT count(*) FROM datasets_view")
int count();

@SqlQuery("SELECT count(*) FROM datasets AS j WHERE j.namespace_name = :namespaceName")
@SqlQuery("SELECT count(*) FROM datasets_view AS j WHERE j.namespace_name = :namespaceName")
int countFor(String namespaceName);

default List<Dataset> findAllWithTags(String namespaceName, int limit, int offset) {
Expand All @@ -199,40 +200,45 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
}

@SqlQuery(
"INSERT INTO datasets ("
+ "uuid, "
+ "type, "
+ "created_at, "
+ "updated_at, "
+ "namespace_uuid, "
+ "namespace_name, "
+ "source_uuid, "
+ "source_name, "
+ "name, "
+ "physical_name, "
+ "description, "
+ "is_deleted "
+ ") VALUES ( "
+ ":uuid, "
+ ":type, "
+ ":now, "
+ ":now, "
+ ":namespaceUuid, "
+ ":namespaceName, "
+ ":sourceUuid, "
+ ":sourceName, "
+ ":name, "
+ ":physicalName, "
+ ":description, "
+ ":isDeleted) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
+ "physical_name = EXCLUDED.physical_name, "
+ "description = EXCLUDED.description, "
+ "is_deleted = EXCLUDED.is_deleted "
+ "RETURNING *")
"""
INSERT INTO datasets (
uuid,
type,
created_at,
updated_at,
namespace_uuid,
namespace_name,
source_uuid,
source_name,
name,
physical_name,
description,
is_deleted,
is_hidden
) VALUES (
:uuid,
:type,
:now,
:now,
:namespaceUuid,
:namespaceName,
:sourceUuid,
:sourceName,
:name,
:physicalName,
:description,
:isDeleted,
false
) ON CONFLICT (namespace_uuid, name)
DO UPDATE SET
type = EXCLUDED.type,
updated_at = EXCLUDED.updated_at,
physical_name = EXCLUDED.physical_name,
description = EXCLUDED.description,
is_deleted = EXCLUDED.is_deleted,
is_hidden = EXCLUDED.is_hidden
RETURNING *
""")
DatasetRow upsert(
UUID uuid,
DatasetType type,
Expand Down Expand Up @@ -289,12 +295,12 @@ DatasetRow upsert(
@SqlQuery(
"""
UPDATE datasets
SET is_deleted = true
SET is_hidden = true
WHERE namespace_name = :namespaceName
AND name = :name
RETURNING *
""")
Optional<DatasetRow> softDelete(String namespaceName, String name);
Optional<DatasetRow> delete(String namespaceName, String name);

@Transaction
default Dataset upsertDatasetMeta(
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface DatasetFieldDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
+ "SELECT 1 FROM dataset_fields AS df "
+ "INNER JOIN datasets AS d "
+ "INNER JOIN datasets_view AS d "
+ " ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespaceName "
+ "WHERE df.name = :name)")
boolean exists(String namespaceName, String datasetName, String name);
Expand Down
Loading

0 comments on commit 17aaa8f

Please sign in to comment.