From 42a309f80b383f3cee8cbcddffdc20e887c6452f Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 31 Aug 2022 22:33:30 +0800 Subject: [PATCH 1/4] [Improve][broker] Improve `replicatorDispacherRate` endpoint to pure async. --- .../broker/admin/impl/NamespacesBase.java | 42 +----------- .../pulsar/broker/admin/v1/Namespaces.java | 35 ++++++++-- .../pulsar/broker/admin/v2/Namespaces.java | 68 ++++++++++++++----- 3 files changed, 82 insertions(+), 63 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 0b25599c1263d..cd11ef58a9856 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1422,46 +1422,6 @@ protected CompletableFuture internalGetSubscribeRateAsync() { .thenApply(policies -> policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName())); } - protected void internalRemoveReplicatorDispatchRate() { - validateSuperUserAccess(); - try { - updatePolicies(namespaceName, policies -> { - policies.replicatorDispatchRate.remove(pulsar().getConfiguration().getClusterName()); - return policies; - }); - log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(), - namespaceName); - } catch (Exception e) { - log.error("[{}] Failed to delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(), - namespaceName, e); - throw new RestException(e); - } - } - - protected void internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate) { - validateSuperUserAccess(); - log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate); - try { - updatePolicies(namespaceName, policies -> { - policies.replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate); - return policies; - }); - log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(), - namespaceName); - } catch (Exception e) { - log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(), - namespaceName, e); - throw new RestException(e); - } - } - - protected CompletableFuture internalGetReplicatorDispatchRateAsync() { - return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenApply( - policies -> policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName())); - } - protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) { validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); @@ -2673,7 +2633,7 @@ private CompletableFuture updatePoliciesAsync(NamespaceName ns, Function

updateFunction) { + protected void updatePolicies(NamespaceName ns, Function updateFunction) { // Force to read the data s.t. the watch to the cache content is setup. try { updatePoliciesAsync(ns, updateFunction).get(namespaceResources().getOperationTimeoutSec(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 20c200318f392..cd747587c849e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1100,13 +1100,31 @@ public void deleteSubscriptionDispatchRate(@Suspended AsyncResponse asyncRespons @Path("/{tenant}/{cluster}/{namespace}/replicatorDispatchRate") @ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")}) - public void setReplicatorDispatchRate( - @PathParam("tenant") String tenant, - @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, @ApiParam(value = "Replicator dispatch rate for all topics of the specified namespace") - DispatchRateImpl dispatchRate) { + DispatchRateImpl dispatchRate) { validateNamespaceName(tenant, cluster, namespace); - internalSetReplicatorDispatchRate(dispatchRate); + validateSuperUserAccessAsync() + .thenAccept(__ -> { + log.info("[{}] Set namespace replicator dispatch-rate {}/{}", + clientAppId(), namespaceName, dispatchRate); + }).thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + String clusterName = pulsar().getConfiguration().getClusterName(); + policies.replicatorDispatchRate.put(clusterName, dispatchRate); + return policies; + })).thenAccept(__ -> { + log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -1122,7 +1140,12 @@ public void getReplicatorDispatchRate( @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, cluster, namespace); - internalGetReplicatorDispatchRateAsync().thenAccept(asyncResponse::resume) + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenApply(policies -> { + String clusterName = pulsar().getConfiguration().getClusterName(); + return policies.replicatorDispatchRate.get(clusterName); + }).thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}", clientAppId(), namespaceName, ex); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 7fae0a5709e8e..3a90fb108f212 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1103,23 +1103,55 @@ public void getSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam( @Path("/{tenant}/{namespace}/replicatorDispatchRate") @ApiOperation(value = "Remove replicator dispatch-rate throttling for all topics of the namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")}) - public void removeReplicatorDispatchRate( - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { - validateNamespaceName(tenant, namespace); - internalRemoveReplicatorDispatchRate(); + public void removeReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + validateSuperUserAccessAsync() + .thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + String clusterName = pulsar().getConfiguration().getClusterName(); + policies.replicatorDispatchRate.remove(clusterName); + return policies; + })).thenAccept(__ -> { + log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to delete the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @Path("/{tenant}/{namespace}/replicatorDispatchRate") @ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace") @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")}) - public void setReplicatorDispatchRate(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, @ApiParam(value = - "Replicator dispatch rate for all topics of the specified namespace") - DispatchRateImpl dispatchRate) { + public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @ApiParam(value = + "Replicator dispatch rate for all topics of the specified namespace") DispatchRateImpl dispatchRate) { validateNamespaceName(tenant, namespace); - internalSetReplicatorDispatchRate(dispatchRate); + validateSuperUserAccessAsync() + .thenAccept(__ -> { + log.info("[{}] Set namespace replicator dispatch-rate {}/{}", + clientAppId(), namespaceName, dispatchRate); + }).thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + String clusterName = pulsar().getConfiguration().getClusterName(); + policies.replicatorDispatchRate.put(clusterName, dispatchRate); + return policies; + })).thenAccept(__ -> { + log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName); + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(ex -> { + log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET @@ -1128,13 +1160,17 @@ public void setReplicatorDispatchRate(@PathParam("tenant") String tenant, + "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured " + "in dispatch-rate yet") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void getReplicatorDispatchRate( - @Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace) { + @ApiResponse(code = 404, message = "Namespace does not exist") }) + public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - internalGetReplicatorDispatchRateAsync().thenAccept(asyncResponse::resume) + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenApply(policies -> { + String clusterName = pulsar().getConfiguration().getClusterName(); + return policies.replicatorDispatchRate.get(clusterName); + }).thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}", clientAppId(), namespaceName, ex); From 492565fab1d6042d9b034e11d9c0935768497e12 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Wed, 31 Aug 2022 22:49:09 +0800 Subject: [PATCH 2/4] Avoid useless change --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index cd11ef58a9856..800767817e780 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2633,7 +2633,7 @@ private CompletableFuture updatePoliciesAsync(NamespaceName ns, Function

updateFunction) { + private void updatePolicies(NamespaceName ns, Function updateFunction) { // Force to read the data s.t. the watch to the cache content is setup. try { updatePoliciesAsync(ns, updateFunction).get(namespaceResources().getOperationTimeoutSec(), From cce9c36fd53d18823aa1fea1a6bfe6b43c916e67 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Thu, 1 Sep 2022 08:40:34 +0800 Subject: [PATCH 3/4] Avoid reuse complex method. --- .../org/apache/pulsar/broker/admin/v1/Namespaces.java | 9 ++++++--- .../org/apache/pulsar/broker/admin/v2/Namespaces.java | 9 ++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index cd747587c849e..9694bf16016a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1141,10 +1141,13 @@ public void getReplicatorDispatchRate( @PathParam("namespace") String namespace) { validateNamespaceName(tenant, cluster, namespace); validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenApply(policies -> { + .thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)) + .thenApply(policiesOpt -> { + if (!policiesOpt.isPresent()) { + throw new RestException(Response.Status.NOT_FOUND, "Namespace policies does not exist"); + } String clusterName = pulsar().getConfiguration().getClusterName(); - return policies.replicatorDispatchRate.get(clusterName); + return policiesOpt.get().replicatorDispatchRate.get(clusterName); }).thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 3a90fb108f212..c77a1691e8ce3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1166,10 +1166,13 @@ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenApply(policies -> { + .thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)) + .thenApply(policiesOpt -> { + if (!policiesOpt.isPresent()) { + throw new RestException(Response.Status.NOT_FOUND, "Namespace policies does not exist"); + } String clusterName = pulsar().getConfiguration().getClusterName(); - return policies.replicatorDispatchRate.get(clusterName); + return policiesOpt.get().replicatorDispatchRate.get(clusterName); }).thenAccept(asyncResponse::resume) .exceptionally(ex -> { log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}", From dfce37fd215bad0f880756f2f59990d5bed1022f Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Fri, 2 Sep 2022 11:25:46 +0800 Subject: [PATCH 4/4] Apply comment --- .../broker/admin/impl/NamespacesBase.java | 66 +++++++++++++++++++ .../pulsar/broker/admin/v1/Namespaces.java | 35 +--------- .../pulsar/broker/admin/v2/Namespaces.java | 51 +------------- 3 files changed, 71 insertions(+), 81 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 800767817e780..a65843303bea2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2692,5 +2692,71 @@ protected CompletableFuture internalSetEntryFiltersPerTopicAsync(EntryFilt })); } + /** + * Base method for setReplicatorDispatchRate v1 and v2. + * Notion: don't re-use this logic. + */ + protected void internalSetReplicatorDispatchRate(AsyncResponse asyncResponse, DispatchRateImpl dispatchRate) { + validateSuperUserAccessAsync() + .thenAccept(__ -> { + log.info("[{}] Set namespace replicator dispatch-rate {}/{}", + clientAppId(), namespaceName, dispatchRate); + }).thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + String clusterName = pulsar().getConfiguration().getClusterName(); + policies.replicatorDispatchRate.put(clusterName, dispatchRate); + return policies; + })).thenAccept(__ -> { + asyncResponse.resume(Response.noContent().build()); + log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName); + }).exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex); + log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName, ex); + return null; + }); + } + /** + * Base method for getReplicatorDispatchRate v1 and v2. + * Notion: don't re-use this logic. + */ + protected void internalGetReplicatorDispatchRate(AsyncResponse asyncResponse) { + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ) + .thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)) + .thenApply(policiesOpt -> { + if (!policiesOpt.isPresent()) { + throw new RestException(Response.Status.NOT_FOUND, "Namespace policies does not exist"); + } + String clusterName = pulsar().getConfiguration().getClusterName(); + return policiesOpt.get().replicatorDispatchRate.get(clusterName); + }).thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex); + log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}", + clientAppId(), namespaceName, ex); + return null; + }); + } + /** + * Base method for removeReplicatorDispatchRate v1 and v2. + * Notion: don't re-use this logic. + */ + protected void internalRemoveReplicatorDispatchRate(AsyncResponse asyncResponse) { + validateSuperUserAccessAsync() + .thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { + String clusterName = pulsar().getConfiguration().getClusterName(); + policies.replicatorDispatchRate.remove(clusterName); + return policies; + })).thenAccept(__ -> { + asyncResponse.resume(Response.noContent().build()); + log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName); + }).exceptionally(ex -> { + resumeAsyncResponseExceptionally(asyncResponse, ex); + log.error("[{}] Failed to delete the replicatorDispatchRate for cluster on namespace {}", + clientAppId(), namespaceName, ex); + return null; + }); + } private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 9694bf16016a1..b93db5900fc81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1107,24 +1107,7 @@ public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "Replicator dispatch rate for all topics of the specified namespace") DispatchRateImpl dispatchRate) { validateNamespaceName(tenant, cluster, namespace); - validateSuperUserAccessAsync() - .thenAccept(__ -> { - log.info("[{}] Set namespace replicator dispatch-rate {}/{}", - clientAppId(), namespaceName, dispatchRate); - }).thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { - String clusterName = pulsar().getConfiguration().getClusterName(); - policies.replicatorDispatchRate.put(clusterName, dispatchRate); - return policies; - })).thenAccept(__ -> { - log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", - clientAppId(), namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", - clientAppId(), namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalSetReplicatorDispatchRate(asyncResponse, dispatchRate); } @GET @@ -1140,21 +1123,7 @@ public void getReplicatorDispatchRate( @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, cluster, namespace); - validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ) - .thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)) - .thenApply(policiesOpt -> { - if (!policiesOpt.isPresent()) { - throw new RestException(Response.Status.NOT_FOUND, "Namespace policies does not exist"); - } - String clusterName = pulsar().getConfiguration().getClusterName(); - return policiesOpt.get().replicatorDispatchRate.get(clusterName); - }).thenAccept(asyncResponse::resume) - .exceptionally(ex -> { - log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}", - clientAppId(), namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalGetReplicatorDispatchRate(asyncResponse); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index c77a1691e8ce3..d5cfed37c00e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1107,21 +1107,7 @@ public void removeReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - validateSuperUserAccessAsync() - .thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { - String clusterName = pulsar().getConfiguration().getClusterName(); - policies.replicatorDispatchRate.remove(clusterName); - return policies; - })).thenAccept(__ -> { - log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", - clientAppId(), namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - log.error("[{}] Failed to delete the replicatorDispatchRate for cluster on namespace {}", - clientAppId(), namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalRemoveReplicatorDispatchRate(asyncResponse); } @POST @@ -1134,24 +1120,7 @@ public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "Replicator dispatch rate for all topics of the specified namespace") DispatchRateImpl dispatchRate) { validateNamespaceName(tenant, namespace); - validateSuperUserAccessAsync() - .thenAccept(__ -> { - log.info("[{}] Set namespace replicator dispatch-rate {}/{}", - clientAppId(), namespaceName, dispatchRate); - }).thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> { - String clusterName = pulsar().getConfiguration().getClusterName(); - policies.replicatorDispatchRate.put(clusterName, dispatchRate); - return policies; - })).thenAccept(__ -> { - log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", - clientAppId(), namespaceName); - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(ex -> { - log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", - clientAppId(), namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalSetReplicatorDispatchRate(asyncResponse, dispatchRate); } @GET @@ -1165,21 +1134,7 @@ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ) - .thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName)) - .thenApply(policiesOpt -> { - if (!policiesOpt.isPresent()) { - throw new RestException(Response.Status.NOT_FOUND, "Namespace policies does not exist"); - } - String clusterName = pulsar().getConfiguration().getClusterName(); - return policiesOpt.get().replicatorDispatchRate.get(clusterName); - }).thenAccept(asyncResponse::resume) - .exceptionally(ex -> { - log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}", - clientAppId(), namespaceName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; - }); + internalGetReplicatorDispatchRate(asyncResponse); } @GET