Skip to content

Commit

Permalink
revert style changes
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Dec 10, 2022
1 parent 3d69525 commit 33001f4
Showing 1 changed file with 43 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
CompletableFuture<Policies> preconditionCheck = precheckWhenDeleteNamespace(namespaceName, force);
return preconditionCheck
.thenCompose(policies -> {
if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)) {
if (policies == null || CollectionUtils.isEmpty(policies.replication_clusters)){
return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName);
}
return pulsar().getNamespaceService().getFullListOfTopics(namespaceName);
Expand Down Expand Up @@ -258,7 +258,7 @@ protected CompletableFuture<Void> internalDeleteNamespaceAsync(boolean force) {
}
return namespaceResources().setPoliciesAsync(namespaceName, old -> {
old.deleted = true;
return old;
return old;
}).thenCompose(ignore -> {
return internalDeleteTopicsAsync(allUserCreatedTopics);
}).thenCompose(ignore -> {
Expand Down Expand Up @@ -515,41 +515,6 @@ protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bund
});
}

protected void getPermissionOnSubscriptionRequired(AsyncResponse asyncResponse) {
validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_PERMISSION)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName).thenApply(policies ->
asyncResponse.resume(Response.ok(policies.auth_policies.isSubscriptionAuthRequired()).build())
)).exceptionally(ex -> {
log.error("[{}] Failed to get PermissionOnSubscriptionRequired", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

protected void internalSetPermissionOnSubscriptionRequired(AsyncResponse asyncResponse,
boolean permissionOnSubscriptionRequired) {
CompletableFuture<Void> isAuthorized;
if (permissionOnSubscriptionRequired) {
isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
} else {
isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION);
}
isAuthorized
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.auth_policies.setSubscriptionAuthRequired(permissionOnSubscriptionRequired);
return policies;
})).thenAccept(__ -> {
log.info("[{}] Updated PermissionOnSubscriptionRequired for namespace {} to {}", clientAppId(),
namespaceName, permissionOnSubscriptionRequired);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
log.error("[{}] Failed to update PermissionOnSubscriptionRequired", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

protected CompletableFuture<Void> internalGrantPermissionOnNamespaceAsync(String role, Set<AuthAction> actions) {
AuthorizationService authService = pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
Expand Down Expand Up @@ -667,6 +632,7 @@ protected CompletableFuture<Set<String>> internalGetNamespaceReplicationClusters
.thenApply(policies -> policies.replication_clusters);
}

@SuppressWarnings("checkstyle:WhitespaceAfter")
protected CompletableFuture<Void> internalSetNamespaceReplicationClusters(List<String> clusterIds) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
Expand Down Expand Up @@ -755,7 +721,7 @@ protected CompletableFuture<Void> internalSetAutoTopicCreationAsync(
+ validateResult.getErrorInfo());
}
if (Objects.equals(autoTopicCreationOverride.getTopicType(),
TopicType.PARTITIONED.toString())) {
TopicType.PARTITIONED.toString())){
if (maxPartitions > 0
&& autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
Expand All @@ -776,7 +742,7 @@ protected CompletableFuture<Void> internalSetAutoSubscriptionCreationAsync(AutoS
// Force to read the data s.t. the watch to the cache content is setup.
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_SUBSCRIPTION_CREATION,
PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(unused -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
policies.autoSubscriptionCreationOverride = autoSubscriptionCreationOverride;
return policies;
Expand Down Expand Up @@ -1240,7 +1206,6 @@ protected CompletableFuture<SubscribeRate> internalGetSubscribeRateAsync() {
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName()));
}

protected CompletableFuture<Void> setBacklogQuotaAsync(BacklogQuotaType backlogQuotaType,
BacklogQuota quota) {
return namespaceResources().setPoliciesAsync(namespaceName, policies -> {
Expand Down Expand Up @@ -1603,7 +1568,7 @@ protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
}

try {
getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp) ->
getLocalPolicies().setLocalPoliciesWithCreate(namespaceName, (lp)->
lp.map(policies -> new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
antiAffinityGroup))
Expand Down Expand Up @@ -1639,7 +1604,7 @@ protected void internalRemoveNamespaceAntiAffinityGroup() {
log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName);

try {
getLocalPolicies().setLocalPolicies(namespaceName, (policies) ->
getLocalPolicies().setLocalPolicies(namespaceName, (policies)->
new LocalPolicies(policies.bundles,
policies.bookieAffinityGroup,
null));
Expand Down Expand Up @@ -1941,7 +1906,7 @@ protected void internalSetMaxUnackedMessagesPerConsumer(Integer maxUnackedMessag
}
}

protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) {
protected void internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic){
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
Expand Down Expand Up @@ -2489,7 +2454,6 @@ protected void internalSetReplicatorDispatchRate(AsyncResponse asyncResponse, Di
return null;
});
}

/**
* Base method for getReplicatorDispatchRate v1 and v2.
* Notion: don't re-use this logic.
Expand All @@ -2511,7 +2475,6 @@ protected void internalGetReplicatorDispatchRate(AsyncResponse asyncResponse) {
return null;
});
}

/**
* Base method for removeReplicatorDispatchRate v1 and v2.
* Notion: don't re-use this logic.
Expand Down Expand Up @@ -2598,4 +2561,39 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu
return null;
});
}

protected void getPermissionOnSubscriptionRequired(AsyncResponse asyncResponse) {
validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_PERMISSION)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName).thenApply(policies ->
asyncResponse.resume(Response.ok(policies.auth_policies.isSubscriptionAuthRequired()).build())
)).exceptionally(ex -> {
log.error("[{}] Failed to get PermissionOnSubscriptionRequired", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

protected void internalSetPermissionOnSubscriptionRequired(AsyncResponse asyncResponse,
boolean permissionOnSubscriptionRequired) {
CompletableFuture<Void> isAuthorized;
if (permissionOnSubscriptionRequired) {
isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.REVOKE_PERMISSION);
} else {
isAuthorized = validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GRANT_PERMISSION);
}
isAuthorized
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.auth_policies.setSubscriptionAuthRequired(permissionOnSubscriptionRequired);
return policies;
})).thenAccept(__ -> {
log.info("[{}] Updated PermissionOnSubscriptionRequired for namespace {} to {}", clientAppId(),
namespaceName, permissionOnSubscriptionRequired);
asyncResponse.resume(Response.ok().build());
}).exceptionally(ex -> {
log.error("[{}] Failed to update PermissionOnSubscriptionRequired", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}

0 comments on commit 33001f4

Please sign in to comment.