Skip to content

Commit

Permalink
[Improve][broker] Improve replicatorDispacherRate endpoint to pure …
Browse files Browse the repository at this point in the history
…async. (#17377)
  • Loading branch information
mattisonchao authored Sep 2, 2022
1 parent bab7faf commit 305758a
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1431,46 +1431,6 @@ protected CompletableFuture<SubscribeRate> internalGetSubscribeRateAsync() {
.thenApply(policies -> policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName()));
}

protected void internalRemoveReplicatorDispatchRate() {
validateSuperUserAccess();
try {
updatePolicies(namespaceName, policies -> {
policies.replicatorDispatchRate.remove(pulsar().getConfiguration().getClusterName());
return policies;
});
log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to delete the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected void internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate) {
validateSuperUserAccess();
log.info("[{}] Set namespace replicator dispatch-rate {}/{}", clientAppId(), namespaceName, dispatchRate);
try {
updatePolicies(namespaceName, policies -> {
policies.replicatorDispatchRate.put(pulsar().getConfiguration().getClusterName(), dispatchRate);
return policies;
});
log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName);
} catch (Exception e) {
log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}", clientAppId(),
namespaceName, e);
throw new RestException(e);
}
}

protected CompletableFuture<DispatchRate> internalGetReplicatorDispatchRateAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(
policies -> policies.replicatorDispatchRate.get(pulsar().getConfiguration().getClusterName()));
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Expand Down Expand Up @@ -2741,5 +2701,71 @@ protected CompletableFuture<Void> internalSetEntryFiltersPerTopicAsync(EntryFilt
}));
}

/**
* Base method for setReplicatorDispatchRate v1 and v2.
* Notion: don't re-use this logic.
*/
protected void internalSetReplicatorDispatchRate(AsyncResponse asyncResponse, DispatchRateImpl dispatchRate) {
validateSuperUserAccessAsync()
.thenAccept(__ -> {
log.info("[{}] Set namespace replicator dispatch-rate {}/{}",
clientAppId(), namespaceName, dispatchRate);
}).thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
String clusterName = pulsar().getConfiguration().getClusterName();
policies.replicatorDispatchRate.put(clusterName, dispatchRate);
return policies;
})).thenAccept(__ -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully updated the replicatorDispatchRate for cluster on namespace {}",
clientAppId(), namespaceName);
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to update the replicatorDispatchRate for cluster on namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}
/**
* Base method for getReplicatorDispatchRate v1 and v2.
* Notion: don't re-use this logic.
*/
protected void internalGetReplicatorDispatchRate(AsyncResponse asyncResponse) {
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION_RATE, PolicyOperation.READ)
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
.thenApply(policiesOpt -> {
if (!policiesOpt.isPresent()) {
throw new RestException(Response.Status.NOT_FOUND, "Namespace policies does not exist");
}
String clusterName = pulsar().getConfiguration().getClusterName();
return policiesOpt.get().replicatorDispatchRate.get(clusterName);
}).thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}
/**
* Base method for removeReplicatorDispatchRate v1 and v2.
* Notion: don't re-use this logic.
*/
protected void internalRemoveReplicatorDispatchRate(AsyncResponse asyncResponse) {
validateSuperUserAccessAsync()
.thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
String clusterName = pulsar().getConfiguration().getClusterName();
policies.replicatorDispatchRate.remove(clusterName);
return policies;
})).thenAccept(__ -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully delete the replicatorDispatchRate for cluster on namespace {}",
clientAppId(), namespaceName);
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to delete the replicatorDispatchRate for cluster on namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}
private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1100,13 +1100,14 @@ public void deleteSubscriptionDispatchRate(@Suspended AsyncResponse asyncRespons
@Path("/{tenant}/{cluster}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
public void setReplicatorDispatchRate(
@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@ApiParam(value = "Replicator dispatch rate for all topics of the specified namespace")
DispatchRateImpl dispatchRate) {
DispatchRateImpl dispatchRate) {
validateNamespaceName(tenant, cluster, namespace);
internalSetReplicatorDispatchRate(dispatchRate);
internalSetReplicatorDispatchRate(asyncResponse, dispatchRate);
}

@GET
Expand All @@ -1122,13 +1123,7 @@ public void getReplicatorDispatchRate(
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, cluster, namespace);
internalGetReplicatorDispatchRateAsync().thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}",
clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
internalGetReplicatorDispatchRate(asyncResponse);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1103,23 +1103,24 @@ public void getSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam(
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Remove replicator dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
public void removeReplicatorDispatchRate(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
public void removeReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalRemoveReplicatorDispatchRate();
internalRemoveReplicatorDispatchRate(asyncResponse);
}

@POST
@Path("/{tenant}/{namespace}/replicatorDispatchRate")
@ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @ApiParam(value =
"Replicator dispatch rate for all topics of the specified namespace")
DispatchRateImpl dispatchRate) {
public void setReplicatorDispatchRate(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value =
"Replicator dispatch rate for all topics of the specified namespace") DispatchRateImpl dispatchRate) {
validateNamespaceName(tenant, namespace);
internalSetReplicatorDispatchRate(dispatchRate);
internalSetReplicatorDispatchRate(asyncResponse, dispatchRate);
}

@GET
Expand All @@ -1128,19 +1129,12 @@ public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
+ "dispatch-rate not configured, -1 means msg-dispatch-rate or byte-dispatch-rate not configured "
+ "in dispatch-rate yet")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void getReplicatorDispatchRate(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalGetReplicatorDispatchRateAsync().thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("[{}] Failed to get replicator dispatch-rate configured for the namespace {}",
clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
internalGetReplicatorDispatchRate(asyncResponse);
}

@GET
Expand Down

0 comments on commit 305758a

Please sign in to comment.