From 2769bc204ffab05881f6599eb8459e4f4c805498 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Sat, 14 May 2022 11:11:06 +0800 Subject: [PATCH 1/3] Make some operation messageTTL methods in Namespaces async. --- .../broker/admin/impl/NamespacesBase.java | 22 +++++----- .../pulsar/broker/admin/v1/Namespaces.java | 43 +++++++++++++------ .../pulsar/broker/admin/v2/Namespaces.java | 37 +++++++++++----- .../pulsar/broker/admin/NamespacesTest.java | 28 ++++++++++++ 4 files changed, 98 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 39a4eb1306083..a4d0a7b5aa9d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -791,16 +791,18 @@ protected void internalSetNamespaceReplicationClusters(List clusterIds) }); } - protected void internalSetNamespaceMessageTTL(Integer messageTTL) { - validateNamespacePolicyOperation(namespaceName, PolicyName.TTL, PolicyOperation.WRITE); - validatePoliciesReadOnlyAccess(); - if (messageTTL != null && messageTTL < 0) { - throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL"); - } - updatePolicies(namespaceName, policies -> { - policies.message_ttl_in_seconds = messageTTL; - return policies; - }); + protected CompletableFuture internalSetNamespaceMessageTTLAsync(Integer messageTTL) { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.TTL, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenAccept(__ -> { + if (messageTTL != null && messageTTL < 0) { + throw new RestException(Status.PRECONDITION_FAILED, + "Invalid value for message TTL"); + } + }).thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.message_ttl_in_seconds = messageTTL; + return policies; + })); } protected void internalSetSubscriptionExpirationTime(Integer expirationTime) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 0ccc505d5e84b..7ae02fae9a2fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -390,13 +390,19 @@ public void setNamespaceReplicationClusters(@PathParam("property") String proper @ApiOperation(hidden = true, value = "Get the message TTL for the namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public Integer getNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.TTL, PolicyOperation.READ); - - Policies policies = getNamespacePolicies(namespaceName); - return policies.message_ttl_in_seconds; + validateNamespacePolicyOperationAsync(NamespaceName.get(property, namespace), PolicyName.TTL, + PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.message_ttl_in_seconds)) + .exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -405,10 +411,16 @@ public Integer getNamespaceMessageTTL(@PathParam("property") String property, @P @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL") }) - public void setNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, int messageTTL) { + public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + int messageTTL) { validateNamespaceName(property, cluster, namespace); - internalSetNamespaceMessageTTL(messageTTL); + internalSetNamespaceMessageTTLAsync(messageTTL) + .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) + .exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -417,10 +429,17 @@ public void setNamespaceMessageTTL(@PathParam("property") String property, @Path @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL") }) - public void removeNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace) { + public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, + @PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); - internalSetNamespaceMessageTTL(null); + internalSetNamespaceMessageTTLAsync(null) + .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) + .exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 2769b5b3bc343..e0ea1379202ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -333,13 +333,17 @@ public void setNamespaceReplicationClusters(@PathParam("tenant") String tenant, @ApiOperation(value = "Get the message TTL for the namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public Integer getNamespaceMessageTTL(@PathParam("tenant") String tenant, + public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.TTL, PolicyOperation.READ); - - Policies policies = getNamespacePolicies(namespaceName); - return policies.message_ttl_in_seconds; + validateNamespacePolicyOperationAsync(NamespaceName.get(tenant, namespace), PolicyName.TTL, + PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(policies -> asyncResponse.resume(policies.message_ttl_in_seconds)) + .exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -348,10 +352,17 @@ public Integer getNamespaceMessageTTL(@PathParam("tenant") String tenant, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL") }) - public void setNamespaceMessageTTL(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, - @ApiParam(value = "TTL in seconds for the specified namespace", required = true) int messageTTL) { + public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @ApiParam(value = "TTL in seconds for the specified namespace", required = true) + int messageTTL) { validateNamespaceName(tenant, namespace); - internalSetNamespaceMessageTTL(messageTTL); + internalSetNamespaceMessageTTLAsync(messageTTL) + .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) + .exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -360,10 +371,16 @@ public void setNamespaceMessageTTL(@PathParam("tenant") String tenant, @PathPara @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL")}) - public void removeNamespaceMessageTTL(@PathParam("tenant") String tenant, + public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - internalSetNamespaceMessageTTL(null); + internalSetNamespaceMessageTTLAsync(null) + .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) + .exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index a02f7d6c93483..bf83d1702ecd2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -31,6 +31,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; @@ -1325,6 +1326,33 @@ public void close() { } } + @Test + public void testOperationNamespaceMessageTTL() throws Exception { + String namespace = "ttlnamespace"; + + asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, this.testLocalCluster, + namespace, BundlesData.builder().build())); + + asyncRequests(response -> namespaces.setNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, + namespace, 100)); + + int namespaceMessageTTL = (Integer) asyncRequests(response -> namespaces.getNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, + namespace)); + assertEquals(100, namespaceMessageTTL); + + asyncRequests(response -> namespaces.removeNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, namespace)); + assertNull(asyncRequests(response -> namespaces.getNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, + namespace))); + + try { + asyncRequests(response -> namespaces.setNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, + namespace, -1)); + fail("should have failed"); + } catch (RestException e) { + assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); + } + } + @Test public void testSetOffloadThreshold() throws Exception { TopicName topicName = TopicName.get("persistent", this.testTenant, "offload", "offload-topic"); From 59528262871acb01351b1f201b290e675b5d98d1 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Sat, 14 May 2022 21:49:45 +0800 Subject: [PATCH 2/3] Improve log print --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 +- .../java/org/apache/pulsar/broker/admin/v1/Namespaces.java | 5 ++++- .../java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 5 ++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index a4d0a7b5aa9d9..dc4c4cd883d15 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -797,7 +797,7 @@ protected CompletableFuture internalSetNamespaceMessageTTLAsync(Integer me .thenAccept(__ -> { if (messageTTL != null && messageTTL < 0) { throw new RestException(Status.PRECONDITION_FAILED, - "Invalid value for message TTL"); + "Invalid value for message TTL, message TTL must >= 0"); } }).thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { policies.message_ttl_in_seconds = messageTTL; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 7ae02fae9a2fb..3e285426b56b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -400,6 +400,7 @@ public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenAccept(policies -> asyncResponse.resume(policies.message_ttl_in_seconds)) .exceptionally(ex -> { + log.error("Failed to get namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -418,6 +419,7 @@ public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @Path internalSetNamespaceMessageTTLAsync(messageTTL) .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) .exceptionally(ex -> { + log.error("Failed to set namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -425,7 +427,7 @@ public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @Path @DELETE @Path("/{property}/{cluster}/{namespace}/messageTTL") - @ApiOperation(value = "Set message TTL in seconds for namespace") + @ApiOperation(value = "Remove message TTL in seconds for namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL") }) @@ -437,6 +439,7 @@ public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, internalSetNamespaceMessageTTLAsync(null) .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) .exceptionally(ex -> { + log.error("Failed to remove namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index e0ea1379202ad..ed358c5fecf34 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -341,6 +341,7 @@ public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @Path .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenAccept(policies -> asyncResponse.resume(policies.message_ttl_in_seconds)) .exceptionally(ex -> { + log.error("Failed to get namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -360,6 +361,7 @@ public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @Path internalSetNamespaceMessageTTLAsync(messageTTL) .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) .exceptionally(ex -> { + log.error("Failed to set namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -367,7 +369,7 @@ public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @Path @DELETE @Path("/{tenant}/{namespace}/messageTTL") - @ApiOperation(value = "Set message TTL in seconds for namespace") + @ApiOperation(value = "Remove message TTL in seconds for namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), @ApiResponse(code = 412, message = "Invalid TTL")}) @@ -378,6 +380,7 @@ public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, internalSetNamespaceMessageTTLAsync(null) .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) .exceptionally(ex -> { + log.error("Failed to remove namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); From bde93aaef8758449587840bbcdbb029a3c166371 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 16 May 2022 15:48:09 +0800 Subject: [PATCH 3/3] change reponse content --- .../java/org/apache/pulsar/broker/admin/v1/Namespaces.java | 4 ++-- .../java/org/apache/pulsar/broker/admin/v2/Namespaces.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 3e285426b56b6..ded685020f647 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -417,7 +417,7 @@ public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @Path int messageTTL) { validateNamespaceName(property, cluster, namespace); internalSetNamespaceMessageTTLAsync(messageTTL) - .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { log.error("Failed to set namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -437,7 +437,7 @@ public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("namespace") String namespace) { validateNamespaceName(property, cluster, namespace); internalSetNamespaceMessageTTLAsync(null) - .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { log.error("Failed to remove namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index ed358c5fecf34..ffcabfba2bf45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -359,7 +359,7 @@ public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @Path int messageTTL) { validateNamespaceName(tenant, namespace); internalSetNamespaceMessageTTLAsync(messageTTL) - .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { log.error("Failed to set namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); @@ -378,7 +378,7 @@ public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); internalSetNamespaceMessageTTLAsync(null) - .thenAccept(__ -> asyncResponse.resume(Response.ok().build())) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { log.error("Failed to remove namespace message TTL for namespace {}", namespaceName, ex); resumeAsyncResponseExceptionally(asyncResponse, ex);