From 5ac0e23c2a4fcca9ad881084e2fdf6fe2cc5e4cc Mon Sep 17 00:00:00 2001 From: mattison chao Date: Thu, 28 Apr 2022 08:17:37 +0800 Subject: [PATCH 1/9] change setPeerClusterNames method --- .../broker/resources/ClusterResources.java | 5 + .../broker/admin/impl/ClustersBase.java | 112 +++++++++--------- 2 files changed, 59 insertions(+), 58 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index d3acd7bbe10b3..f169d9b4f8556 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -80,6 +80,11 @@ public void updateCluster(String clusterName, Function set(joinPath(BASE_CLUSTERS_PATH, clusterName), modifyFunction); } + public CompletableFuture updateClusterAsync(String clusterName, + Function modifyFunction) { + return setAsync(joinPath(BASE_CLUSTERS_PATH, clusterName), modifyFunction); + } + public void deleteCluster(String clusterName) throws MetadataStoreException { delete(joinPath(BASE_CLUSTERS_PATH, clusterName)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 6b984930ed7a2..b0a80b7ee104b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -48,6 +48,8 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.Namespaces; @@ -238,68 +240,62 @@ public void updateCluster( @ApiResponse(code = 412, message = "Peer cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.") }) - public void setPeerClusterNames( - @ApiParam( - value = "The cluster name", - required = true - ) - @PathParam("cluster") String cluster, - @ApiParam( - value = "The list of peer cluster names", - required = true, - examples = @Example( - value = @ExampleProperty( - mediaType = MediaType.APPLICATION_JSON, - value = - "[\n" - + " 'cluster-a',\n" - + " 'cluster-b'\n" - + "]" - ) - ) - ) - LinkedHashSet peerClusterNames - ) { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); + public void setPeerClusterNames(@Suspended AsyncResponse asyncResponse, + @ApiParam(value = "The cluster name", required = true) + @PathParam("cluster") String cluster, + @ApiParam( + value = "The list of peer cluster names", + required = true, + examples = @Example( + value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON, + value = "[\n" + + " 'cluster-a',\n" + + " 'cluster-b'\n" + + "]"))) + LinkedHashSet peerClusterNames) { + validateSuperUserAccessAsync() + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> innerSetPeerClusterNamesAsync(cluster, peerClusterNames)) + .thenAccept(__ -> { + log.info("[{}] Successfully added peer-cluster {} for {}", + clientAppId(), peerClusterNames, cluster); + asyncResponse.resume(Response.ok().build()); + }).exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to validate peer-cluster list {}, {}", clientAppId(), peerClusterNames, ex); + if (realCause instanceof NotFoundException) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist")); + return null; + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + + } + private CompletableFuture innerSetPeerClusterNamesAsync(String cluster, + LinkedHashSet peerClusterNames) { // validate if peer-cluster exist - if (peerClusterNames != null && !peerClusterNames.isEmpty()) { - for (String peerCluster : peerClusterNames) { - try { - if (cluster.equalsIgnoreCase(peerCluster)) { - throw new RestException(Status.PRECONDITION_FAILED, - cluster + " itself can't be part of peer-list"); - } - clusterResources().getCluster(peerCluster) - .orElseThrow(() -> new RestException(Status.PRECONDITION_FAILED, - "Peer cluster " + peerCluster + " does not exist")); - } catch (RestException e) { - log.warn("[{}] Peer cluster doesn't exist from {}, {}", clientAppId(), peerClusterNames, - e.getMessage()); - throw e; - } catch (Exception e) { - log.warn("[{}] Failed to validate peer-cluster list {}, {}", clientAppId(), peerClusterNames, - e.getMessage()); - throw new RestException(e); + CompletableFuture future; + if (CollectionUtils.isNotEmpty(peerClusterNames)) { + future = FutureUtil.waitForAll(peerClusterNames.stream().map(peerCluster -> { + if (cluster.equalsIgnoreCase(peerCluster)) { + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + cluster + " itself can't be part of peer-list")); } - } - } - - try { - clusterResources().updateCluster(cluster, old -> - old.clone() - .peerClusterNames(peerClusterNames) - .build() - ); - log.info("[{}] Successfully added peer-cluster {} for {}", clientAppId(), peerClusterNames, cluster); - } catch (NotFoundException e) { - log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), cluster); - throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); - } catch (Exception e) { - log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, e); - throw new RestException(e); + return clusterResources().getClusterAsync(peerCluster) + .thenAccept(peerClusterOpt -> { + if (!peerClusterOpt.isPresent()) { + throw new RestException(Status.PRECONDITION_FAILED, + "Peer cluster " + peerCluster + " does not exist"); + } + }); + }).collect(Collectors.toList())); + } else { + future = CompletableFuture.completedFuture(null); } + return future.thenCompose(__ -> clusterResources().updateClusterAsync(cluster, + old -> old.clone().peerClusterNames(peerClusterNames).build())); } @GET From 3f5a09361d2e90fa13d277d92efffd212b517975 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Thu, 28 Apr 2022 08:22:06 +0800 Subject: [PATCH 2/9] Refactor getPeerCluster --- .../broker/admin/impl/ClustersBase.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index b0a80b7ee104b..2efd234a1d4be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -311,22 +311,20 @@ private CompletableFuture innerSetPeerClusterNamesAsync(String cluster, @ApiResponse(code = 404, message = "Cluster doesn't exist."), @ApiResponse(code = 500, message = "Internal server error.") }) - public Set getPeerCluster( - @ApiParam( - value = "The cluster name", - required = true - ) - @PathParam("cluster") String cluster - ) { - validateSuperUserAccess(); - try { - ClusterData clusterData = clusterResources().getCluster(cluster) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")); - return clusterData.getPeerClusterNames(); - } catch (Exception e) { - log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); - throw new RestException(e); - } + public void getPeerCluster(@Suspended AsyncResponse asyncResponse, + @ApiParam(value = "The cluster name", required = true) + @PathParam("cluster") String cluster) { + validateSuperUserAccessAsync() + .thenCompose(__ -> clusterResources().getClusterAsync(cluster)) + .thenAccept(clusterOpt -> { + ClusterData clusterData = + clusterOpt.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")); + asyncResponse.resume(clusterData); + }).exceptionally(ex -> { + log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE From ed3791417ad2937c204258cf0e603a6ba7cee9d4 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Thu, 28 Apr 2022 08:53:48 +0800 Subject: [PATCH 3/9] Refactor deleteCluster --- .../broker/resources/ClusterResources.java | 37 ++++++++ .../broker/resources/NamespaceResources.java | 9 ++ .../broker/admin/impl/ClustersBase.java | 88 +++++++++---------- 3 files changed, 88 insertions(+), 46 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index f169d9b4f8556..607f02ae1317f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -25,9 +25,12 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import lombok.Getter; +import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.FailureDomainImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; @@ -89,6 +92,25 @@ public void deleteCluster(String clusterName) throws MetadataStoreException { delete(joinPath(BASE_CLUSTERS_PATH, clusterName)); } + public CompletableFuture deleteClusterAsync(String clusterName) { + return deleteAsync(joinPath(BASE_CLUSTERS_PATH, clusterName)); + } + + public CompletableFuture isClusterUsedAsync(String clusterName) { + return getCache().getChildren(BASE_POLICIES_PATH) + .thenCompose(tenants -> { + List>> futures = tenants.stream() + .map(tenant -> getCache().getChildren(joinPath(BASE_POLICIES_PATH, tenant, clusterName))) + .collect(Collectors.toList()); + return FutureUtil.waitForAll(futures) + .thenApply(__ -> { + // We found a tenant that has at least a namespace in this cluster + return futures.stream().map(CompletableFuture::join) + .anyMatch(CollectionUtils::isNotEmpty); + }); + }); + } + public boolean isClusterUsed(String clusterName) throws MetadataStoreException { for (String tenant : getCache().getChildren(BASE_POLICIES_PATH).join()) { if (!getCache().getChildren(joinPath(BASE_POLICIES_PATH, tenant, clusterName)).join().isEmpty()) { @@ -138,6 +160,21 @@ public void deleteFailureDomain(String clusterName, String domainName) throws Me delete(path); } + public CompletableFuture deleteFailureDomainsAsync(String clusterName) { + String failureDomainPath = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN); + return existsAsync(failureDomainPath) + .thenCompose(exists -> { + if (!exists) { + return CompletableFuture.completedFuture(null); + } + return getChildrenAsync(failureDomainPath) + .thenCompose(children -> FutureUtil.waitForAll(children.stream() + .map(domain -> deleteAsync(joinPath(failureDomainPath, domain))) + .collect(Collectors.toList()))) + .thenCompose(__ -> deleteAsync(failureDomainPath)); + }); + } + public void deleteFailureDomains(String clusterName) throws MetadataStoreException { String failureDomainPath = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN); if (!exists(failureDomainPath)) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 54122bbcf4500..4fa03f0a78dc9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -176,6 +176,11 @@ public IsolationPolicyResources(MetadataStore store, int operationTimeoutSec) { }, operationTimeoutSec); } + public CompletableFuture> getIsolationDataPoliciesAsync(String cluster) { + return getAsync(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES)) + .thenApply(isolationData -> isolationData.map(NamespaceIsolationPolicies::new)); + } + public Optional getIsolationDataPolicies(String cluster) throws MetadataStoreException { Optional> data = @@ -193,6 +198,10 @@ public void deleteIsolationData(String cluster) throws MetadataStoreException { delete(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES)); } + public CompletableFuture deleteIsolationDataAsync(String cluster) { + return deleteAsync(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES)); + } + public void createIsolationData(String cluster, Map id) throws MetadataStoreException { create(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES), id); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 2efd234a1d4be..2c6a0dee67525 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -340,54 +340,50 @@ public void getPeerCluster(@Suspended AsyncResponse asyncResponse, @ApiResponse(code = 412, message = "Cluster is not empty."), @ApiResponse(code = 500, message = "Internal server error.") }) - public void deleteCluster( - @ApiParam( - value = "The cluster name", - required = true - ) - @PathParam("cluster") String cluster - ) { - validateSuperUserAccess(); - validatePoliciesReadOnlyAccess(); + public void deleteCluster(@Suspended AsyncResponse asyncResponse, + @ApiParam(value = "The cluster name", required = true) + @PathParam("cluster") String cluster) { + validateSuperUserAccessAsync() + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> internalDeleteClusterAsync(cluster)) + .thenAccept(__ -> { + log.info("[{}] Deleted cluster {}", clientAppId(), cluster); + asyncResponse.resume(Response.ok().build()); + }).exceptionally(ex -> { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof NotFoundException) { + log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster); + asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist")); + return null; + } + log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + private CompletableFuture internalDeleteClusterAsync(String cluster) { // Check that the cluster is not used by any tenant (eg: no namespaces provisioned there) - boolean isClusterUsed = false; - try { - isClusterUsed = pulsar().getPulsarResources().getClusterResources().isClusterUsed(cluster); - - // check the namespaceIsolationPolicies associated with the cluster - Optional nsIsolationPolicies = - namespaceIsolationPolicies().getIsolationDataPolicies(cluster); - - // Need to delete the isolation policies if present - if (nsIsolationPolicies.isPresent()) { - if (nsIsolationPolicies.get().getPolicies().isEmpty()) { - namespaceIsolationPolicies().deleteIsolationData(cluster); - } else { - isClusterUsed = true; - } - } - } catch (Exception e) { - log.error("[{}] Failed to get cluster usage {}", clientAppId(), cluster, e); - throw new RestException(e); - } - - if (isClusterUsed) { - log.warn("[{}] Failed to delete cluster {} - Cluster not empty", clientAppId(), cluster); - throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty"); - } - - try { - clusterResources().getFailureDomainResources().deleteFailureDomains(cluster); - clusterResources().deleteCluster(cluster); - log.info("[{}] Deleted cluster {}", clientAppId(), cluster); - } catch (NotFoundException e) { - log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster); - throw new RestException(Status.NOT_FOUND, "Cluster does not exist"); - } catch (Exception e) { - log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, e); - throw new RestException(e); - } + return pulsar().getPulsarResources().getClusterResources().isClusterUsedAsync(cluster) + .thenCompose(isClusterUsed -> { + if (isClusterUsed) { + throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty"); + } + // check the namespaceIsolationPolicies associated with the cluster + return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster); + }).thenCompose(nsIsolationPolicies -> { + if (!nsIsolationPolicies.isPresent()) { + return CompletableFuture.completedFuture(null); + } + if (!nsIsolationPolicies.get().getPolicies().isEmpty()) { + throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty"); + } + // Need to delete the isolation policies if present + return namespaceIsolationPolicies().deleteIsolationDataAsync(cluster) + .thenCompose(__ -> clusterResources() + .getFailureDomainResources().deleteFailureDomainsAsync(cluster)) + .thenCompose(__ -> clusterResources().deleteClusterAsync(cluster)); + }); } @GET From a6a7883e5e7c235aea0741bfa25bdd36c2a33043 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Thu, 5 May 2022 04:44:53 +0800 Subject: [PATCH 4/9] Rebase to master --- .../org/apache/pulsar/broker/resources/ClusterResources.java | 5 ----- .../apache/pulsar/broker/resources/NamespaceResources.java | 5 ----- .../org/apache/pulsar/broker/admin/impl/ClustersBase.java | 5 +---- 3 files changed, 1 insertion(+), 14 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index 607f02ae1317f..7dbf2839b3981 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -83,11 +83,6 @@ public void updateCluster(String clusterName, Function set(joinPath(BASE_CLUSTERS_PATH, clusterName), modifyFunction); } - public CompletableFuture updateClusterAsync(String clusterName, - Function modifyFunction) { - return setAsync(joinPath(BASE_CLUSTERS_PATH, clusterName), modifyFunction); - } - public void deleteCluster(String clusterName) throws MetadataStoreException { delete(joinPath(BASE_CLUSTERS_PATH, clusterName)); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 4fa03f0a78dc9..7ad22d4f3f7cb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -176,11 +176,6 @@ public IsolationPolicyResources(MetadataStore store, int operationTimeoutSec) { }, operationTimeoutSec); } - public CompletableFuture> getIsolationDataPoliciesAsync(String cluster) { - return getAsync(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES)) - .thenApply(isolationData -> isolationData.map(NamespaceIsolationPolicies::new)); - } - public Optional getIsolationDataPolicies(String cluster) throws MetadataStoreException { Optional> data = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 2c6a0dee67525..eed5f22fa4938 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -372,10 +372,7 @@ private CompletableFuture internalDeleteClusterAsync(String cluster) { // check the namespaceIsolationPolicies associated with the cluster return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster); }).thenCompose(nsIsolationPolicies -> { - if (!nsIsolationPolicies.isPresent()) { - return CompletableFuture.completedFuture(null); - } - if (!nsIsolationPolicies.get().getPolicies().isEmpty()) { + if (!nsIsolationPolicies.getPolicies().isEmpty()) { throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty"); } // Need to delete the isolation policies if present From 78f1aca59d18613f1a14191bff9ea41a7576b90f Mon Sep 17 00:00:00 2001 From: mattison chao Date: Thu, 5 May 2022 05:05:35 +0800 Subject: [PATCH 5/9] Fix test --- .../org/apache/pulsar/broker/admin/AdminTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index d39ad3fbb978b..4677ccb521e16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -223,7 +223,7 @@ public void clusters() throws Exception { // Check deleting non-existing cluster try { - clusters.deleteCluster("usc"); + asynRequests(ctx -> clusters.deleteCluster(ctx, "usc")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode()); @@ -262,7 +262,7 @@ public void clusters() throws Exception { clusters.getNamespaceIsolationPolicies("use"); try { - clusters.deleteCluster("use"); + asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), 412); @@ -271,7 +271,7 @@ public void clusters() throws Exception { clusters.deleteNamespaceIsolationPolicy("use", "policy1"); assertTrue(clusters.getNamespaceIsolationPolicies("use").isEmpty()); - clusters.deleteCluster("use"); + asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet()); try { @@ -359,7 +359,7 @@ public void clusters() throws Exception { }); try { - clusters.deleteCluster("use"); + asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -373,7 +373,7 @@ public void clusters() throws Exception { isolationPolicyCache.invalidateAll(); store.invalidateAll(); try { - clusters.deleteCluster("use"); + asynRequests(ctx -> clusters.deleteCluster(ctx, "use")); fail("should have failed"); } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode()); @@ -406,8 +406,8 @@ public void clusters() throws Exception { } catch (RestException e) { assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode()); } - verify(clusters, times(13)).validateSuperUserAccessAsync(); - verify(clusters, times(11)).validateSuperUserAccess(); + verify(clusters, times(18)).validateSuperUserAccessAsync(); + verify(clusters, times(6)).validateSuperUserAccess(); } Object asynRequests(Consumer function) throws Exception { From 8d9549084658187e3b011aae0b258bf6d212ac80 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Thu, 5 May 2022 15:17:51 +0800 Subject: [PATCH 6/9] Fix checkstyle --- .../java/org/apache/pulsar/broker/admin/impl/ClustersBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index eed5f22fa4938..79132540462ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -47,7 +47,6 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; -import org.apache.pulsar.broker.admin.AdminResource; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources; From 0ec7600fe9f4d0b98308098396e53a90adb1b4aa Mon Sep 17 00:00:00 2001 From: mattison chao Date: Fri, 6 May 2022 04:59:47 +0800 Subject: [PATCH 7/9] Fix wrong behavior --- .../broker/resources/NamespaceResources.java | 5 ++--- .../pulsar/broker/admin/impl/ClustersBase.java | 16 ++++++++++------ .../broker/namespace/NamespaceService.java | 3 ++- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 7ad22d4f3f7cb..ce797a80c7c3a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -183,10 +183,9 @@ public Optional getIsolationDataPolicies(String clus return data.isPresent() ? Optional.of(new NamespaceIsolationPolicies(data.get())) : Optional.empty(); } - public CompletableFuture getIsolationDataPoliciesAsync(String cluster) { + public CompletableFuture> getIsolationDataPoliciesAsync(String cluster) { return getAsync(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES)) - .thenApply(data -> data.map(NamespaceIsolationPolicies::new) - .orElseGet(NamespaceIsolationPolicies::new)); + .thenApply(data -> data.map(NamespaceIsolationPolicies::new)); } public void deleteIsolationData(String cluster) throws MetadataStoreException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 79132540462ed..39274585894b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -370,14 +370,18 @@ private CompletableFuture internalDeleteClusterAsync(String cluster) { } // check the namespaceIsolationPolicies associated with the cluster return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster); - }).thenCompose(nsIsolationPolicies -> { - if (!nsIsolationPolicies.getPolicies().isEmpty()) { - throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty"); + }).thenCompose(nsIsolationPoliciesOpt -> { + if (nsIsolationPoliciesOpt.isPresent()) { + if (!nsIsolationPoliciesOpt.get().getPolicies().isEmpty()) { + throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty"); + } + return namespaceIsolationPolicies().deleteIsolationDataAsync(cluster); } + return CompletableFuture.completedFuture(null); + }).thenCompose(unused -> { // Need to delete the isolation policies if present - return namespaceIsolationPolicies().deleteIsolationDataAsync(cluster) - .thenCompose(__ -> clusterResources() - .getFailureDomainResources().deleteFailureDomainsAsync(cluster)) + return clusterResources() + .getFailureDomainResources().deleteFailureDomainsAsync(cluster) .thenCompose(__ -> clusterResources().deleteClusterAsync(cluster)); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index fdfd9bf1880c3..558d408f1d37f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -771,7 +771,8 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj, private CompletableFuture getLocalNamespaceIsolationPoliciesAsync() { String localCluster = pulsar.getConfiguration().getClusterName(); return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies() - .getIsolationDataPoliciesAsync(localCluster); + .getIsolationDataPoliciesAsync(localCluster) + .thenApply(nsIsolationPolicies -> nsIsolationPolicies.orElseGet(NamespaceIsolationPolicies::new)); } public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception { From ba51a8a0bcac3c5b683e478a2c3e480970949754 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Sun, 8 May 2022 09:16:43 +0800 Subject: [PATCH 8/9] Fix wrong behavior --- .../java/org/apache/pulsar/broker/admin/impl/ClustersBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 39274585894b0..f9fdc5608291d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -318,7 +318,7 @@ public void getPeerCluster(@Suspended AsyncResponse asyncResponse, .thenAccept(clusterOpt -> { ClusterData clusterData = clusterOpt.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")); - asyncResponse.resume(clusterData); + asyncResponse.resume(clusterData.getPeerClusterNames()); }).exceptionally(ex -> { log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, ex); resumeAsyncResponseExceptionally(asyncResponse, ex); From ee6263543ae120e5816d49eb7d15188992e3d2d3 Mon Sep 17 00:00:00 2001 From: mattison chao Date: Tue, 10 May 2022 09:35:50 +0800 Subject: [PATCH 9/9] Apply comments --- .../pulsar/broker/admin/impl/ClustersBase.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index f9fdc5608291d..6ffd2ee0058ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -258,7 +258,7 @@ public void setPeerClusterNames(@Suspended AsyncResponse asyncResponse, .thenAccept(__ -> { log.info("[{}] Successfully added peer-cluster {} for {}", clientAppId(), peerClusterNames, cluster); - asyncResponse.resume(Response.ok().build()); + asyncResponse.resume(Response.noContent().build()); }).exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to validate peer-cluster list {}, {}", clientAppId(), peerClusterNames, ex); @@ -347,7 +347,7 @@ public void deleteCluster(@Suspended AsyncResponse asyncResponse, .thenCompose(__ -> internalDeleteClusterAsync(cluster)) .thenAccept(__ -> { log.info("[{}] Deleted cluster {}", clientAppId(), cluster); - asyncResponse.resume(Response.ok().build()); + asyncResponse.resume(Response.noContent().build()); }).exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); if (realCause instanceof NotFoundException) { @@ -375,15 +375,13 @@ private CompletableFuture internalDeleteClusterAsync(String cluster) { if (!nsIsolationPoliciesOpt.get().getPolicies().isEmpty()) { throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty"); } + // Need to delete the isolation policies if present return namespaceIsolationPolicies().deleteIsolationDataAsync(cluster); } return CompletableFuture.completedFuture(null); - }).thenCompose(unused -> { - // Need to delete the isolation policies if present - return clusterResources() - .getFailureDomainResources().deleteFailureDomainsAsync(cluster) - .thenCompose(__ -> clusterResources().deleteClusterAsync(cluster)); - }); + }).thenCompose(unused -> clusterResources() + .getFailureDomainResources().deleteFailureDomainsAsync(cluster) + .thenCompose(__ -> clusterResources().deleteClusterAsync(cluster))); } @GET