Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Improve backlogQuota endpoint to pure async. #17383

Merged
merged 1 commit into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
Expand Down Expand Up @@ -110,9 +111,8 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public abstract class NamespacesBase extends AdminResource {

protected CompletableFuture<List<String>> internalGetTenantNamespaces(String tenant) {
Expand Down Expand Up @@ -1430,40 +1430,29 @@ protected CompletableFuture<SubscribeRate> internalGetSubscribeRateAsync() {
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName()));
}

protected void internalSetBacklogQuota(BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateNamespacePolicyOperation(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType
: BacklogQuotaType.destination_storage;
try {
Policies policies = namespaceResources().getPolicies(namespaceName)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace policies does not exist"));
RetentionPolicies r = policies.retention_policies;
if (r != null) {
Policies p = new Policies();
p.backlog_quota_map.put(quotaType, backlogQuota);
if (!checkQuotas(p, r)) {
log.warn(
"[{}] Failed to update backlog configuration"
+ " for namespace {}: conflicts with retention quota",
clientAppId(), namespaceName);
throw new RestException(Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for namespace."
+ " Please increase retention quota and retry");
}
protected CompletableFuture<Void> setBacklogQuotaAsync(BacklogQuotaType backlogQuotaType,
BacklogQuota quota) {
return namespaceResources().setPoliciesAsync(namespaceName, policies -> {
nodece marked this conversation as resolved.
Show resolved Hide resolved
RetentionPolicies retentionPolicies = policies.retention_policies;
final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType
: BacklogQuotaType.destination_storage;
if (retentionPolicies == null) {
policies.backlog_quota_map.put(quotaType, quota);
return policies;
}
policies.backlog_quota_map.put(quotaType, backlogQuota);
namespaceResources().setPolicies(namespaceName, p -> policies);
log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(), namespaceName,
jsonMapper().writeValueAsString(backlogQuota));

} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update backlog quota map for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
// If we have retention policies, we have to check the conflict.
BacklogQuota needCheckQuota = null;
if (quotaType == BacklogQuotaType.destination_storage) {
needCheckQuota = quota;
}
boolean passCheck = checkBacklogQuota(needCheckQuota, retentionPolicies);
if (!passCheck) {
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for namespace."
+ " Please increase retention quota and retry");
}
return policies;
});
}

protected void internalRemoveBacklogQuota(BacklogQuotaType backlogQuotaType) {
Expand Down Expand Up @@ -2767,5 +2756,69 @@ protected void internalRemoveReplicatorDispatchRate(AsyncResponse asyncResponse)
return null;
});
}
private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);

/**
* Base method for getBackLogQuotaMap v1 and v2.
* Notion: don't re-use this logic.
*/
protected void internalGetBacklogQuotaMap(AsyncResponse asyncResponse) {
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG, PolicyOperation.READ)
.thenCompose(__ -> namespaceResources().getPoliciesAsync(namespaceName))
.thenAccept(policiesOpt -> {
Map<BacklogQuotaType, BacklogQuota> backlogQuotaMap = policiesOpt.orElseThrow(() ->
new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"))
.backlog_quota_map;
asyncResponse.resume(backlogQuotaMap);
})
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to get backlog quota map on namespace {}", clientAppId(), namespaceName, ex);
return null;
});
}

/**
* Base method for setBacklogQuota v1 and v2.
* Notion: don't re-use this logic.
*/
protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> setBacklogQuotaAsync(backlogQuotaType, backlogQuota))
.thenAccept(__ -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully updated backlog quota map: namespace={}, map={}", clientAppId(),
namespaceName, backlogQuota);
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to update backlog quota map for namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}

/**
* Base method for removeBacklogQuota v1 and v2.
* Notion: don't re-use this logic.
*/
protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQuotaType backlogQuotaType) {
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
final BacklogQuotaType quotaType = backlogQuotaType != null ? backlogQuotaType
: BacklogQuotaType.destination_storage;
policies.backlog_quota_map.remove(quotaType);
return policies;
})).thenAccept(__ -> {
asyncResponse.resume(Response.noContent().build());
log.info("[{}] Successfully removed backlog namespace={}, quota={}", clientAppId(), namespaceName,
backlogQuotaType);
}).exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
log.error("[{}] Failed to update backlog quota map for namespace {}",
clientAppId(), namespaceName, ex);
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1136,15 +1136,7 @@ public void getBacklogQuotaMap(
@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.backlog_quota_map))
.exceptionally(ex -> {
log.error("[{}] Failed to get backlog quota map on namespace {}", clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
internalGetBacklogQuotaMap(asyncResponse);
}

@POST
Expand All @@ -1156,12 +1148,13 @@ public void getBacklogQuotaMap(
@ApiResponse(code = 412, message = "Specified backlog quota exceeds retention quota."
+ " Increase retention quota and retry request")})
public void setBacklogQuota(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
BacklogQuota backlogQuota) {
validateNamespaceName(property, cluster, namespace);
internalSetBacklogQuota(backlogQuotaType, backlogQuota);
internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
}

@DELETE
Expand All @@ -1170,11 +1163,13 @@ public void setBacklogQuota(
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void removeBacklogQuota(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void removeBacklogQuota(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType) {
validateNamespaceName(property, cluster, namespace);
internalRemoveBacklogQuota(backlogQuotaType);
internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1147,15 +1147,7 @@ public void getBacklogQuotaMap(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.BACKLOG,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.backlog_quota_map))
.exceptionally(ex -> {
log.error("[{}] Failed to get backlog quota map on namespace {}", clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
internalGetBacklogQuotaMap(asyncResponse);
}

@POST
Expand All @@ -1167,11 +1159,13 @@ public void getBacklogQuotaMap(
@ApiResponse(code = 412,
message = "Specified backlog quota exceeds retention quota."
+ " Increase retention quota and retry request")})
public void setBacklogQuota(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void setBacklogQuota(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType,
@ApiParam(value = "Backlog quota for all topics of the specified namespace") BacklogQuota backlogQuota) {
validateNamespaceName(tenant, namespace);
internalSetBacklogQuota(backlogQuotaType, backlogQuota);
internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
}

@DELETE
Expand All @@ -1180,10 +1174,12 @@ public void setBacklogQuota(@PathParam("tenant") String tenant, @PathParam("name
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void removeBacklogQuota(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void removeBacklogQuota(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType) {
validateNamespaceName(tenant, namespace);
internalRemoveBacklogQuota(backlogQuotaType);
internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.common.policies.path.PolicyPath;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1278,6 +1279,10 @@ protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncRespon
asyncResponse.resume(realCause);
} else if (realCause instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
} else if (realCause instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, realCause));
} else if (realCause instanceof MetadataStoreException.BadVersionException) {
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else if (realCause instanceof PulsarAdminException) {
asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
} else {
Expand Down