Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][PIP-149]make some police methods async in Namespaces #16881

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2116,16 +2116,6 @@ protected void validateRetentionPolicies(RetentionPolicies retention) {
+ "specific limit. To disable retention both limits must be set to 0.");
}

protected Integer internalGetMaxProducersPerTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_producers_per_topic;
}

protected Integer internalGetDeduplicationSnapshotInterval() {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).deduplicationSnapshotIntervalSeconds;
}

protected void internalSetDeduplicationSnapshotInterval(Integer interval) {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.WRITE);
if (interval != null && interval < 0) {
Expand Down Expand Up @@ -2164,11 +2154,6 @@ protected CompletableFuture<Boolean> internalGetDeduplicationAsync() {
.thenApply(policies -> policies.deduplicationEnabled);
}

protected Integer internalGetMaxConsumersPerTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_consumers_per_topic;
}

protected void internalSetMaxConsumersPerTopic(Integer maxConsumersPerTopic) {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
Expand Down Expand Up @@ -2382,17 +2367,6 @@ protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrateg
"schemaCompatibilityStrategy");
}

protected boolean internalGetSchemaValidationEnforced(boolean applied) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
boolean schemaValidationEnforced = getNamespacePolicies(namespaceName).schema_validation_enforced;
if (!schemaValidationEnforced && applied) {
return pulsar().getConfiguration().isSchemaValidationEnforced();
} else {
return schemaValidationEnforced;
}
}

protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.WRITE);
Expand All @@ -2405,15 +2379,6 @@ protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnfor
"schemaValidationEnforced");
}

protected boolean internalGetIsAllowAutoUpdateSchema() {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
if (getNamespacePolicies(namespaceName).is_allow_auto_update_schema == null) {
return pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}
return getNamespacePolicies(namespaceName).is_allow_auto_update_schema;
}

protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.WRITE);
Expand All @@ -2426,15 +2391,6 @@ protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchem
"isAllowAutoUpdateSchema");
}

protected Set<SubscriptionType> internalGetSubscriptionTypesEnabled() {
validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE,
PolicyOperation.READ);
Set<SubscriptionType> subscriptionTypes = new HashSet<>();
getNamespacePolicies(namespaceName).subscription_types_enabled.forEach(subType ->
subscriptionTypes.add(SubscriptionType.valueOf(subType)));
return subscriptionTypes;
}

protected void internalSetSubscriptionTypesEnabled(Set<SubscriptionType> subscriptionTypesEnabled) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE,
PolicyOperation.WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1460,10 +1460,21 @@ public Boolean getEncryptionRequired(@PathParam("property") String property,
@ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public int getMaxProducersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void getMaxProducersPerTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetMaxProducersPerTopic();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.max_producers_per_topic))
.exceptionally(ex -> {
log.error("[{}] Failed to get maxProducersPerTopic config on a namespace {}", clientAppId(),
namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -1484,10 +1495,21 @@ public void setMaxProducersPerTopic(@PathParam("property") String property, @Pat
@ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public Integer getMaxConsumersPerTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void getMaxConsumersPerTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetMaxConsumersPerTopic();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.max_consumers_per_topic))
.exceptionally(ex -> {
log.error("[{}] Failed to get maxConsumersPerTopic config on a namespace {}", clientAppId(),
namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.swagger.annotations.ApiResponses;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -1591,10 +1592,20 @@ public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public Integer getMaxProducersPerTopic(@PathParam("tenant") String tenant,
public void getMaxProducersPerTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetMaxProducersPerTopic();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.max_producers_per_topic))
.exceptionally(ex -> {
log.error("[{}] Failed to get maxProducersPerTopic config on a namespace {}", clientAppId(),
namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down Expand Up @@ -1627,10 +1638,20 @@ public void removeMaxProducersPerTopic(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get deduplicationSnapshotInterval config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public Integer getDeduplicationSnapshotInterval(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
public void getDeduplicationSnapshotInterval(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetDeduplicationSnapshotInterval();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION_SNAPSHOT, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.deduplicationSnapshotIntervalSeconds))
.exceptionally(ex -> {
log.error("[{}] Failed to get deduplicationSnapshotInterval config on a namespace {}",
clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -1651,10 +1672,20 @@ public void setDeduplicationSnapshotInterval(@PathParam("tenant") String tenant
@ApiOperation(value = "Get maxConsumersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public Integer getMaxConsumersPerTopic(@PathParam("tenant") String tenant,
public void getMaxConsumersPerTopic(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetMaxConsumersPerTopic();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.max_consumers_per_topic))
.exceptionally(ex -> {
log.error("[{}] Failed to get maxConsumersPerTopic config on a namespace {}", clientAppId(),
namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down Expand Up @@ -2180,11 +2211,27 @@ public void setSchemaCompatibilityStrategy(
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public boolean getIsAllowAutoUpdateSchema(
public void getIsAllowAutoUpdateSchema(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetIsAllowAutoUpdateSchema();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> {
if (policies.is_allow_auto_update_schema == null) {
asyncResponse.resume(pulsar().getConfig().isAllowAutoUpdateSchemaEnabled());
} else {
asyncResponse.resume(policies.is_allow_auto_update_schema);
}
})
.exceptionally(ex -> {
log.error("[{}] Failed to get the flag of whether allow auto update schema on a namespace {}",
clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -2208,11 +2255,25 @@ public void setIsAllowAutoUpdateSchema(
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public Set<SubscriptionType> getSubscriptionTypesEnabled(
public void getSubscriptionTypesEnabled(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetSubscriptionTypesEnabled();
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SUBSCRIPTION_AUTH_MODE, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> {
Set<SubscriptionType> subscriptionTypes = new HashSet<>();
policies.subscription_types_enabled.forEach(
subType -> subscriptionTypes.add(SubscriptionType.valueOf(subType)));
asyncResponse.resume(subscriptionTypes);
})
.exceptionally(ex -> {
log.error("[{}] Failed to get the set of whether allow subscription types on a namespace {}",
clientAppId(), namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down Expand Up @@ -2252,11 +2313,29 @@ public void removeSubscriptionTypesEnabled(@PathParam("tenant") String tenant,
+ " this setting, it will cause non-java clients failed to produce.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenants or Namespace doesn't exist") })
public boolean getSchemaValidtionEnforced(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("applied") @DefaultValue("false") boolean applied) {
public void getSchemaValidtionEnforced(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("applied") @DefaultValue("false") boolean applied) {
validateNamespaceName(tenant, namespace);
return internalGetSchemaValidationEnforced(applied);
validateNamespacePolicyOperationAsync(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> {
boolean schemaValidationEnforced = policies.schema_validation_enforced;
if (!schemaValidationEnforced && applied) {
asyncResponse.resume(pulsar().getConfiguration().isSchemaValidationEnforced());
} else {
asyncResponse.resume(schemaValidationEnforced);
}
})
.exceptionally(ex -> {
log.error("[{}] Failed to get schema validation enforced flag for namespace {}", clientAppId(),
namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down