From c6a5a863c6f9afd636f867c77a9bdf9803b25abf Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Mon, 4 Jul 2022 17:34:42 +0200 Subject: [PATCH] delete: add possibility to soft-delete datasets Signed-off-by: Maciej Obuchowski --- .../java/marquez/api/DatasetResource.java | 22 ++++ api/src/main/java/marquez/db/DatasetDao.java | 102 ++++++++++-------- api/src/main/java/marquez/db/LineageDao.java | 13 ++- .../java/marquez/service/LineageService.java | 2 +- .../java/marquez/DatasetIntegrationTest.java | 83 ++++++++++++++ .../test/java/marquez/db/DatasetDaoTest.java | 36 ++++++- .../test/java/marquez/db/LineageDaoTest.java | 61 ++++++++++- build.gradle | 2 +- .../java/marquez/client/MarquezClient.java | 5 + .../main/java/marquez/client/MarquezHttp.java | 21 ++++ .../java/marquez/client/MarquezHttpTest.java | 37 +++++++ 11 files changed, 326 insertions(+), 58 deletions(-) diff --git a/api/src/main/java/marquez/api/DatasetResource.java b/api/src/main/java/marquez/api/DatasetResource.java index 6e90179fca..accfdaa044 100644 --- a/api/src/main/java/marquez/api/DatasetResource.java +++ b/api/src/main/java/marquez/api/DatasetResource.java @@ -17,6 +17,7 @@ import javax.validation.Valid; import javax.validation.constraints.Min; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -150,6 +151,27 @@ public Response list( return Response.ok(new ResultsPage<>("datasets", datasets, totalCount)).build(); } + @Timed + @ResponseMetered + @ExceptionMetered + @DELETE + @Path("{dataset}") + @Produces(APPLICATION_JSON) + public Response delete( + @PathParam("namespace") NamespaceName namespaceName, + @PathParam("dataset") DatasetName datasetName) { + throwIfNotExists(namespaceName); + + datasetService + .softDelete(namespaceName.getValue(), datasetName.getValue()) + .orElseThrow(() -> new DatasetNotFoundException(datasetName)); + Dataset dataset = + datasetService + .findDatasetByName(namespaceName.getValue(), datasetName.getValue()) + .orElseThrow(() -> new DatasetNotFoundException(datasetName)); + return Response.ok(dataset).build(); + } + @Timed @ResponseMetered @ExceptionMetered diff --git a/api/src/main/java/marquez/db/DatasetDao.java b/api/src/main/java/marquez/db/DatasetDao.java index 6036364b5b..9b50f1fdc9 100644 --- a/api/src/main/java/marquez/db/DatasetDao.java +++ b/api/src/main/java/marquez/db/DatasetDao.java @@ -138,51 +138,53 @@ default void setFields(Dataset ds) { Optional getUuid(String namespaceName, String datasetName); @SqlQuery( - "WITH selected_datasets AS (\n" - + " SELECT d.*\n" - + " FROM datasets d\n" - + " WHERE d.namespace_name = :namespaceName\n" - + " ORDER BY d.name\n" - + " LIMIT :limit OFFSET :offset\n" - + "), dataset_runs AS (\n" - + " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n" - + " FROM selected_datasets d\n" - + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n" - + " LEFT JOIN LATERAL (\n" - + " SELECT run_uuid, event_time, event FROM lineage_events\n" - + " WHERE run_uuid = dv.run_uuid\n" - + " ) e ON e.run_uuid = dv.run_uuid\n" - + " UNION\n" - + " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n" - + " FROM selected_datasets d\n" - + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n" - + " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n" - + " LEFT JOIN LATERAL (\n" - + " SELECT run_uuid, event_time, event FROM lineage_events\n" - + " WHERE run_uuid = rim.run_uuid\n" - + " ) e ON e.run_uuid = rim.run_uuid\n" - + ")\n" - + "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n" - + "FROM selected_datasets d\n" - + "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n" - + "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n" - + "LEFT JOIN (\n" - + " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n" - + " FROM tags AS t\n" - + " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n" - + " GROUP BY m.dataset_uuid\n" - + ") t ON t.dataset_uuid = d.uuid\n" - + "LEFT JOIN (\n" - + " SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets\n" - + " FROM dataset_runs d2,\n" - + " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n" - + " WHERE d2.run_uuid = d2.run_uuid\n" - + " AND ds -> 'facets' IS NOT NULL\n" - + " AND ds ->> 'name' = d2.name\n" - + " AND ds ->> 'namespace' = d2.namespace_name\n" - + " GROUP BY d2.uuid\n" - + ") f ON f.dataset_uuid = d.uuid\n" - + "ORDER BY d.name") + """ + WITH selected_datasets AS ( + SELECT d.* + FROM datasets d + WHERE d.namespace_name = :namespaceName + AND d.is_deleted is false + ORDER BY d.name + LIMIT :limit OFFSET :offset + ), dataset_runs AS ( + SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event + FROM selected_datasets d + INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid + LEFT JOIN LATERAL ( + SELECT run_uuid, event_time, event FROM lineage_events + WHERE run_uuid = dv.run_uuid + ) e ON e.run_uuid = dv.run_uuid + UNION + SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event + FROM selected_datasets d + INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid + LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid + LEFT JOIN LATERAL ( + SELECT run_uuid, event_time, event FROM lineage_events + WHERE run_uuid = rim.run_uuid + ) e ON e.run_uuid = rim.run_uuid + ) + SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets + FROM selected_datasets d + LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid + LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid + LEFT JOIN ( + SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid + FROM tags AS t + INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid + GROUP BY m.dataset_uuid + ) t ON t.dataset_uuid = d.uuid + LEFT JOIN ( + SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets + FROM dataset_runs d2, + jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds + WHERE d2.run_uuid = d2.run_uuid + AND ds -> 'facets' IS NOT NULL + AND ds ->> 'name' = d2.name + AND ds ->> 'namespace' = d2.namespace_name + GROUP BY d2.uuid + ) f ON f.dataset_uuid = d.uuid + ORDER BY d.name""") List findAll(String namespaceName, int limit, int offset); @SqlQuery("SELECT count(*) FROM datasets") @@ -284,6 +286,16 @@ DatasetRow upsert( String name, String physicalName); + @SqlQuery( + """ + UPDATE datasets + SET is_deleted = true + WHERE namespace_name = :namespaceName + AND name = :name + RETURNING * + """) + Optional softDelete(String namespaceName, String name); + @Transaction default Dataset upsertDatasetMeta( NamespaceName namespaceName, DatasetName datasetName, DatasetMeta datasetMeta) { diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index a029bcd86b..d3984a8789 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -67,11 +67,14 @@ public interface LineageDao { Set getLineage(@BindList Set jobIds, int depth); @SqlQuery( - "SELECT ds.*, dv.fields, dv.lifecycle_state\n" - + "FROM datasets ds\n" - + "LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid\n" - + "WHERE ds.uuid IN ();") - Set getDatasetData(@BindList Set dsUuids); + """ + SELECT ds.*, dv.fields, dv.lifecycle_state + FROM datasets ds + LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid + WHERE ds.uuid IN () + AND ds.is_deleted is false + """) + Set getNonDeletedDatasetData(@BindList Set dsUuids); @SqlQuery( "select j.uuid from jobs j\n" diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 52af2e04c9..77878e14d9 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -75,7 +75,7 @@ public Lineage lineage(NodeId nodeId, int depth) { .collect(Collectors.toSet()); Set datasets = new HashSet<>(); if (!datasetIds.isEmpty()) { - datasets.addAll(getDatasetData(datasetIds)); + datasets.addAll(this.getNonDeletedDatasetData(datasetIds)); } return toLineage(jobData, datasets); diff --git a/api/src/test/java/marquez/DatasetIntegrationTest.java b/api/src/test/java/marquez/DatasetIntegrationTest.java index e0c1ee1e86..11abd5708d 100644 --- a/api/src/test/java/marquez/DatasetIntegrationTest.java +++ b/api/src/test/java/marquez/DatasetIntegrationTest.java @@ -11,7 +11,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import java.io.IOException; import java.net.http.HttpResponse; +import java.time.Instant; +import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Collections; import java.util.List; @@ -357,4 +360,84 @@ public void testApp_upsertDescription() { // Description stays assertThat(dataset2.getDescription()).isEqualTo(DESCRIPTION.getDescription()); } + + @Test + public void testApp_doesNotShowDeletedDataset() throws IOException { + String namespace = "namespace"; + String name = "table"; + LineageEvent event = + new LineageEvent( + "COMPLETE", + Instant.now().atZone(ZoneId.systemDefault()), + new LineageEvent.Run(UUID.randomUUID().toString(), null), + new LineageEvent.Job("namespace", "job_name", null), + List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), + Collections.emptyList(), + "the_producer"); + + final CompletableFuture resp = + this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + + // Ensure the event was correctly rejected and a proper response code returned. + assertThat(resp.join()).isEqualTo(201); + + client.deleteDataset(namespace, name); + + List datasets = client.listDatasets(namespace); + assertThat(datasets).hasSize(0); + } + + @Test + public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOException { + String namespace = "namespace"; + String name = "anotherTable"; + LineageEvent event = + new LineageEvent( + "COMPLETE", + Instant.now().atZone(ZoneId.systemDefault()), + new LineageEvent.Run(UUID.randomUUID().toString(), null), + new LineageEvent.Job("namespace", "job_name", null), + List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), + Collections.emptyList(), + "the_producer"); + + CompletableFuture resp = + this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + + // Ensure the event was correctly rejected and a proper response code returned. + assertThat(resp.join()).isEqualTo(201); + + client.deleteDataset(namespace, name); + + List datasets = client.listDatasets(namespace); + assertThat(datasets).hasSize(0); + resp = this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + + assertThat(resp.join()).isEqualTo(201); + + datasets = client.listDatasets(namespace); + assertThat(datasets).hasSize(1); + } + } diff --git a/api/src/test/java/marquez/db/DatasetDaoTest.java b/api/src/test/java/marquez/db/DatasetDaoTest.java index f956f1fc00..ba6be1911d 100644 --- a/api/src/test/java/marquez/db/DatasetDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetDaoTest.java @@ -287,20 +287,22 @@ public void testGetDatasets() { ImmutableMap.of("writeFacet", new CustomValueFacet("firstWriteValue"))))); String secondDatasetName = "secondDataset"; + String deletedDatasetName = "deletedDataset"; createLineageRow( openLineageDao, "secondWriteJob", "COMPLETE", jobFacet, Collections.emptyList(), - Collections.singletonList( + List.of( new Dataset( NAMESPACE, secondDatasetName, newDatasetFacet( ImmutableMap.of("writeFacet", new CustomValueFacet("secondWriteValue")), new SchemaField("age", "int", "the age"), - new SchemaField("address", "string", "the address"))))); + new SchemaField("address", "string", "the address"))), + new Dataset(NAMESPACE, deletedDatasetName, newDatasetFacet()))); createLineageRow( openLineageDao, @@ -319,6 +321,11 @@ public void testGetDatasets() { Collections.emptyList()); List datasets = datasetDao.findAll(NAMESPACE, 5, 0); + assertThat(datasets).hasSize(3); + + datasetDao.softDelete(NAMESPACE, deletedDatasetName); + + datasets = datasetDao.findAll(NAMESPACE, 5, 0); assertThat(datasets).hasSize(2); // datasets sorted alphabetically, so commonDataset is first @@ -357,8 +364,7 @@ public void testGetDatasets() { InstanceOfAssertFactories.map(String.class, Object.class)) .isNotEmpty() .hasSize(6) - .containsKeys( - "documentation", "description", "schema", "dataSource", "writeFacet", "inputFacet") + .containsKeys("documentation", "description", "schema", "dataSource", "inputFacet") .containsEntry( "writeFacet", ImmutableMap.of( @@ -379,6 +385,28 @@ public void testGetDatasets() { "http://test.schema/")); } + @Test + public void testGetSpecificDatasetReturnsDatasetIfDeleted() { + createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Collections.emptyList(), + Collections.singletonList(newCommonDataset(Collections.emptyMap()))); + + marquez.service.models.Dataset dataset = datasetDao.findDatasetByName(NAMESPACE, DATASET).get(); + + assertThat(dataset) + .matches(ds -> ds.getName().getValue().equals(DATASET)) + .extracting( + marquez.service.models.Dataset::getFacets, + InstanceOfAssertFactories.map(String.class, Object.class)) + .isNotEmpty() + .hasSize(4) + .containsKeys("documentation", "description", "schema", "dataSource"); + } + @Test public void testGetDatasetsWithMultipleVersions() { String secondDatasetName = "secondDataset"; diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 21a5ab0608..25d60e3236 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -48,6 +48,7 @@ @ExtendWith(MarquezJdbiExternalPostgresExtension.class) public class LineageDaoTest { + private static DatasetDao datasetDao; private static LineageDao lineageDao; private static OpenLineageDao openLineageDao; private final Dataset dataset = @@ -65,6 +66,7 @@ public class LineageDaoTest { @BeforeAll public static void setUpOnce(Jdbi jdbi) { LineageDaoTest.jdbi = jdbi; + datasetDao = jdbi.onDemand(DatasetDao.class); lineageDao = jdbi.onDemand(LineageDao.class); openLineageDao = jdbi.onDemand(OpenLineageDao.class); } @@ -531,7 +533,7 @@ public void testGetDatasetData() { jobFacet, dataset); Set datasetData = - lineageDao.getDatasetData( + lineageDao.getNonDeletedDatasetData( newRows.stream() .map(j -> j.getOutput().get().getDatasetRow().getUuid()) .collect(Collectors.toSet())); @@ -562,7 +564,7 @@ public void testGetDatasetDatalifecycleStateReturned() { Arrays.asList(dataset)); Set datasetData = - lineageDao.getDatasetData( + lineageDao.getNonDeletedDatasetData( Collections.singleton(row.getOutputs().get().get(0).getDatasetRow().getUuid())); assertThat(datasetData) @@ -570,6 +572,61 @@ public void testGetDatasetDatalifecycleStateReturned() { .anyMatch(str -> str.contains("CREATE")); } + @Test + public void testGetDatasetDataDoesNotReturnDeletedDataset() { + Dataset dataset = + new Dataset( + NAMESPACE, + DATASET, + LineageEvent.DatasetFacets.builder() + .lifecycleStateChange( + new LineageEvent.LifecycleStateChangeFacet(PRODUCER_URL, SCHEMA_URL, "CREATE")) + .build()); + + String deleteName = DATASET + "-delete"; + Dataset toDelete = + new Dataset( + NAMESPACE, + deleteName, + LineageEvent.DatasetFacets.builder() + .lifecycleStateChange( + new LineageEvent.LifecycleStateChangeFacet(PRODUCER_URL, SCHEMA_URL, "CREATE")) + .build()); + + UpdateLineageRow row = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(dataset, toDelete)); + + Set datasetData = + lineageDao.getNonDeletedDatasetData( + Set.of( + row.getOutputs().get().get(0).getDatasetRow().getUuid(), + row.getOutputs().get().get(1).getDatasetRow().getUuid())); + + assertThat(datasetData) + .hasSize(2) + .extracting(ds -> ds.getName().getValue()) + .anyMatch(str -> str.contains(deleteName)); + + datasetDao.softDelete(NAMESPACE, deleteName); + + datasetData = + lineageDao.getNonDeletedDatasetData( + Set.of( + row.getOutputs().get().get(0).getDatasetRow().getUuid(), + row.getOutputs().get().get(1).getDatasetRow().getUuid())); + + assertThat(datasetData) + .hasSize(1) + .extracting(ds -> ds.getName().getValue()) + .allMatch(str -> str.contains(DATASET)); + } + @Test public void testGetCurrentRuns() { diff --git a/build.gradle b/build.gradle index 275fbab702..0e44e80550 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,7 @@ buildscript { dependencies { classpath 'com.adarshr:gradle-test-logger-plugin:3.2.0' classpath 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2' - classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.6.1' + classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.7.2' } } diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index bc617a4f2a..3acd192408 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -150,6 +150,11 @@ public Dataset getDataset(@NonNull String namespaceName, @NonNull String dataset return Dataset.fromJson(bodyAsJson); } + public Dataset deleteDataset(@NonNull String namespaceName, @NonNull String datasetName) { + final String bodyAsJson = http.delete(url.toDatasetUrl(namespaceName, datasetName)); + return Dataset.fromJson(bodyAsJson); + } + public DatasetVersion getDatasetVersion( @NonNull String namespaceName, @NonNull String datasetName, @NonNull String version) { final String bodyAsJson = diff --git a/clients/java/src/main/java/marquez/client/MarquezHttp.java b/clients/java/src/main/java/marquez/client/MarquezHttp.java index 030585294d..ee9e90b0de 100644 --- a/clients/java/src/main/java/marquez/client/MarquezHttp.java +++ b/clients/java/src/main/java/marquez/client/MarquezHttp.java @@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; @@ -139,6 +140,26 @@ String get(URL url) { } } + String delete(URL url) { + log.debug("DELETE {}", url); + try { + final HttpDelete request = new HttpDelete(); + request.setURI(url.toURI()); + request.addHeader(ACCEPT, APPLICATION_JSON.toString()); + + addAuthToReqIfKeyPresent(request); + + final HttpResponse response = http.execute(request); + throwOnHttpError(response); + + final String bodyAsJson = EntityUtils.toString(response.getEntity(), UTF_8); + log.debug("Response: {}", bodyAsJson); + return bodyAsJson; + } catch (URISyntaxException | IOException e) { + throw new MarquezHttpException(); + } + } + private void throwOnHttpError(HttpResponse response) throws IOException { final int code = response.getStatusLine().getStatusCode(); if (code >= 400 && code < 600) { // non-2xx diff --git a/clients/java/src/test/java/marquez/client/MarquezHttpTest.java b/clients/java/src/test/java/marquez/client/MarquezHttpTest.java index 68bf6c9df8..4116b988b7 100644 --- a/clients/java/src/test/java/marquez/client/MarquezHttpTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezHttpTest.java @@ -7,6 +7,7 @@ import static marquez.client.MarquezPathV1.BASE_PATH; import static marquez.client.MarquezPathV1.path; +import static marquez.client.models.ModelGenerator.newDatasetId; import static marquez.client.models.ModelGenerator.newDescription; import static marquez.client.models.ModelGenerator.newJobName; import static marquez.client.models.ModelGenerator.newNamespace; @@ -30,6 +31,7 @@ import java.net.URL; import java.time.Instant; import javax.net.ssl.SSLContext; +import marquez.client.models.DatasetId; import marquez.client.models.JsonGenerator; import marquez.client.models.Namespace; import marquez.client.models.NamespaceMeta; @@ -38,6 +40,7 @@ import org.apache.http.HttpResponse; import org.apache.http.StatusLine; import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; @@ -242,6 +245,40 @@ public void testGet_throwsOnHttpError() throws Exception { assertThatExceptionOfType(MarquezHttpException.class).isThrownBy(() -> marquezHttp.get(url)); } + @Test + public void testDelete() throws Exception { + final Namespace namespace = newNamespace(); + final DatasetId datasetId = newDatasetId(); + + final String json = JsonGenerator.newJsonFor(namespace); + final ByteArrayInputStream stream = new ByteArrayInputStream(json.getBytes(UTF_8)); + when(httpEntity.getContent()).thenReturn(stream); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpResponse.getStatusLine()).thenReturn(mock(StatusLine.class)); + when(httpResponse.getStatusLine().getStatusCode()).thenReturn(HTTP_200); + + when(httpClient.execute(any(HttpDelete.class))).thenReturn(httpResponse); + + final URL url = + marquezUrl.from(path("/namespace/%s/dataset/%s", namespace.getName(), datasetId.getName())); + final String actual = marquezHttp.delete(url); + assertThat(actual).isEqualTo(json); + } + + @Test + public void testDelete_throwsOnHttpError() throws Exception { + final ByteArrayInputStream stream = new ByteArrayInputStream(HTTP_ERROR_AS_BYTES); + when(httpEntity.getContent()).thenReturn(stream); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpResponse.getStatusLine()).thenReturn(mock(StatusLine.class)); + when(httpResponse.getStatusLine().getStatusCode()).thenReturn(HTTP_500); + + when(httpClient.execute(any(HttpGet.class))).thenReturn(httpResponse); + + final URL url = marquezUrl.from(path("/namespace/%s", newNamespaceName())); + assertThatExceptionOfType(MarquezHttpException.class).isThrownBy(() -> marquezHttp.get(url)); + } + @Test public void testClose() throws Exception { // the sclient is not closeable