Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make some methods of ClusterBase pure async. #15358

Merged
merged 9 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
Expand Down Expand Up @@ -84,6 +87,25 @@ public void deleteCluster(String clusterName) throws MetadataStoreException {
delete(joinPath(BASE_CLUSTERS_PATH, clusterName));
}

public CompletableFuture<Void> deleteClusterAsync(String clusterName) {
return deleteAsync(joinPath(BASE_CLUSTERS_PATH, clusterName));
}

public CompletableFuture<Boolean> isClusterUsedAsync(String clusterName) {
return getCache().getChildren(BASE_POLICIES_PATH)
.thenCompose(tenants -> {
List<CompletableFuture<List<String>>> futures = tenants.stream()
.map(tenant -> getCache().getChildren(joinPath(BASE_POLICIES_PATH, tenant, clusterName)))
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures)
.thenApply(__ -> {
// We found a tenant that has at least a namespace in this cluster
return futures.stream().map(CompletableFuture::join)
.anyMatch(CollectionUtils::isNotEmpty);
});
});
}

public boolean isClusterUsed(String clusterName) throws MetadataStoreException {
for (String tenant : getCache().getChildren(BASE_POLICIES_PATH).join()) {
if (!getCache().getChildren(joinPath(BASE_POLICIES_PATH, tenant, clusterName)).join().isEmpty()) {
Expand Down Expand Up @@ -133,6 +155,21 @@ public void deleteFailureDomain(String clusterName, String domainName) throws Me
delete(path);
}

public CompletableFuture<Void> deleteFailureDomainsAsync(String clusterName) {
String failureDomainPath = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN);
return existsAsync(failureDomainPath)
.thenCompose(exists -> {
if (!exists) {
return CompletableFuture.completedFuture(null);
}
return getChildrenAsync(failureDomainPath)
.thenCompose(children -> FutureUtil.waitForAll(children.stream()
.map(domain -> deleteAsync(joinPath(failureDomainPath, domain)))
.collect(Collectors.toList())))
.thenCompose(__ -> deleteAsync(failureDomainPath));
});
}

public void deleteFailureDomains(String clusterName) throws MetadataStoreException {
String failureDomainPath = joinPath(BASE_CLUSTERS_PATH, clusterName, FAILURE_DOMAIN);
if (!exists(failureDomainPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,19 @@ public Optional<NamespaceIsolationPolicies> getIsolationDataPolicies(String clus
return data.isPresent() ? Optional.of(new NamespaceIsolationPolicies(data.get())) : Optional.empty();
}

public CompletableFuture<NamespaceIsolationPolicies> getIsolationDataPoliciesAsync(String cluster) {
public CompletableFuture<Optional<NamespaceIsolationPolicies>> getIsolationDataPoliciesAsync(String cluster) {
return getAsync(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES))
.thenApply(data -> data.map(NamespaceIsolationPolicies::new)
.orElseGet(NamespaceIsolationPolicies::new));
.thenApply(data -> data.map(NamespaceIsolationPolicies::new));
}

public void deleteIsolationData(String cluster) throws MetadataStoreException {
delete(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES));
}

public CompletableFuture<Void> deleteIsolationDataAsync(String cluster) {
return deleteAsync(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES));
}

public void createIsolationData(String cluster, Map<String, NamespaceIsolationDataImpl> id)
throws MetadataStoreException {
create(joinPath(BASE_CLUSTERS_PATH, cluster, NAMESPACE_ISOLATION_POLICIES), id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.resources.ClusterResources.FailureDomainResources;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -238,68 +239,62 @@ public void updateCluster(
@ApiResponse(code = 412, message = "Peer cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void setPeerClusterNames(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The list of peer cluster names",
required = true,
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
value =
"[\n"
+ " 'cluster-a',\n"
+ " 'cluster-b'\n"
+ "]"
)
)
)
LinkedHashSet<String> peerClusterNames
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
public void setPeerClusterNames(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
@ApiParam(
value = "The list of peer cluster names",
required = true,
examples = @Example(
value = @ExampleProperty(mediaType = MediaType.APPLICATION_JSON,
value = "[\n"
+ " 'cluster-a',\n"
+ " 'cluster-b'\n"
+ "]")))
LinkedHashSet<String> peerClusterNames) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> innerSetPeerClusterNamesAsync(cluster, peerClusterNames))
.thenAccept(__ -> {
log.info("[{}] Successfully added peer-cluster {} for {}",
clientAppId(), peerClusterNames, cluster);
asyncResponse.resume(Response.ok().build());
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
}).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to validate peer-cluster list {}, {}", clientAppId(), peerClusterNames, ex);
if (realCause instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return null;
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});

}

private CompletableFuture<Void> innerSetPeerClusterNamesAsync(String cluster,
LinkedHashSet<String> peerClusterNames) {
// validate if peer-cluster exist
if (peerClusterNames != null && !peerClusterNames.isEmpty()) {
for (String peerCluster : peerClusterNames) {
try {
if (cluster.equalsIgnoreCase(peerCluster)) {
throw new RestException(Status.PRECONDITION_FAILED,
cluster + " itself can't be part of peer-list");
}
clusterResources().getCluster(peerCluster)
.orElseThrow(() -> new RestException(Status.PRECONDITION_FAILED,
"Peer cluster " + peerCluster + " does not exist"));
} catch (RestException e) {
log.warn("[{}] Peer cluster doesn't exist from {}, {}", clientAppId(), peerClusterNames,
e.getMessage());
throw e;
} catch (Exception e) {
log.warn("[{}] Failed to validate peer-cluster list {}, {}", clientAppId(), peerClusterNames,
e.getMessage());
throw new RestException(e);
CompletableFuture<Void> future;
if (CollectionUtils.isNotEmpty(peerClusterNames)) {
future = FutureUtil.waitForAll(peerClusterNames.stream().map(peerCluster -> {
if (cluster.equalsIgnoreCase(peerCluster)) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
cluster + " itself can't be part of peer-list"));
}
}
}

try {
clusterResources().updateCluster(cluster, old ->
old.clone()
.peerClusterNames(peerClusterNames)
.build()
);
log.info("[{}] Successfully added peer-cluster {} for {}", clientAppId(), peerClusterNames, cluster);
} catch (NotFoundException e) {
log.warn("[{}] Failed to update cluster {}: Does not exist", clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
} catch (Exception e) {
log.error("[{}] Failed to update cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
return clusterResources().getClusterAsync(peerCluster)
.thenAccept(peerClusterOpt -> {
if (!peerClusterOpt.isPresent()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Peer cluster " + peerCluster + " does not exist");
}
});
}).collect(Collectors.toList()));
} else {
future = CompletableFuture.completedFuture(null);
}
return future.thenCompose(__ -> clusterResources().updateClusterAsync(cluster,
old -> old.clone().peerClusterNames(peerClusterNames).build()));
}

@GET
Expand All @@ -315,22 +310,20 @@ public void setPeerClusterNames(
@ApiResponse(code = 404, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public Set<String> getPeerCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();
try {
ClusterData clusterData = clusterResources().getCluster(cluster)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return clusterData.getPeerClusterNames();
} catch (Exception e) {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
public void getPeerCluster(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster) {
validateSuperUserAccessAsync()
.thenCompose(__ -> clusterResources().getClusterAsync(cluster))
.thenAccept(clusterOpt -> {
ClusterData clusterData =
clusterOpt.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist"));
asyncResponse.resume(clusterData.getPeerClusterNames());
}).exceptionally(ex -> {
log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
Expand All @@ -346,54 +339,51 @@ public Set<String> getPeerCluster(
@ApiResponse(code = 412, message = "Cluster is not empty."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void deleteCluster(
@ApiParam(
value = "The cluster name",
required = true
)
@PathParam("cluster") String cluster
) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
public void deleteCluster(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> internalDeleteClusterAsync(cluster))
.thenAccept(__ -> {
log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
asyncResponse.resume(Response.ok().build());
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
}).exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotFoundException) {
log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Cluster does not exist"));
return null;
}
log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private CompletableFuture<Void> internalDeleteClusterAsync(String cluster) {
// Check that the cluster is not used by any tenant (eg: no namespaces provisioned there)
boolean isClusterUsed = false;
try {
isClusterUsed = pulsar().getPulsarResources().getClusterResources().isClusterUsed(cluster);

// check the namespaceIsolationPolicies associated with the cluster
Optional<NamespaceIsolationPolicies> nsIsolationPolicies =
namespaceIsolationPolicies().getIsolationDataPolicies(cluster);

// Need to delete the isolation policies if present
if (nsIsolationPolicies.isPresent()) {
if (nsIsolationPolicies.get().getPolicies().isEmpty()) {
namespaceIsolationPolicies().deleteIsolationData(cluster);
} else {
isClusterUsed = true;
}
}
} catch (Exception e) {
log.error("[{}] Failed to get cluster usage {}", clientAppId(), cluster, e);
throw new RestException(e);
}

if (isClusterUsed) {
log.warn("[{}] Failed to delete cluster {} - Cluster not empty", clientAppId(), cluster);
throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
}

try {
clusterResources().getFailureDomainResources().deleteFailureDomains(cluster);
clusterResources().deleteCluster(cluster);
log.info("[{}] Deleted cluster {}", clientAppId(), cluster);
} catch (NotFoundException e) {
log.warn("[{}] Failed to delete cluster {} - Does not exist", clientAppId(), cluster);
throw new RestException(Status.NOT_FOUND, "Cluster does not exist");
} catch (Exception e) {
log.error("[{}] Failed to delete cluster {}", clientAppId(), cluster, e);
throw new RestException(e);
}
return pulsar().getPulsarResources().getClusterResources().isClusterUsedAsync(cluster)
.thenCompose(isClusterUsed -> {
if (isClusterUsed) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
}
// check the namespaceIsolationPolicies associated with the cluster
return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster);
}).thenCompose(nsIsolationPoliciesOpt -> {
if (nsIsolationPoliciesOpt.isPresent()) {
if (!nsIsolationPoliciesOpt.get().getPolicies().isEmpty()) {
throw new RestException(Status.PRECONDITION_FAILED, "Cluster not empty");
}
return namespaceIsolationPolicies().deleteIsolationDataAsync(cluster);
}
return CompletableFuture.completedFuture(null);
}).thenCompose(unused -> {
// Need to delete the isolation policies if present
mattisonchao marked this conversation as resolved.
Show resolved Hide resolved
return clusterResources()
.getFailureDomainResources().deleteFailureDomainsAsync(cluster)
.thenCompose(__ -> clusterResources().deleteClusterAsync(cluster));
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,8 @@ private NamespaceOwnershipStatus getNamespaceOwnershipStatus(OwnedBundle nsObj,
private CompletableFuture<NamespaceIsolationPolicies> getLocalNamespaceIsolationPoliciesAsync() {
String localCluster = pulsar.getConfiguration().getClusterName();
return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
.getIsolationDataPoliciesAsync(localCluster);
.getIsolationDataPoliciesAsync(localCluster)
.thenApply(nsIsolationPolicies -> nsIsolationPolicies.orElseGet(NamespaceIsolationPolicies::new));
}

public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception {
Expand Down
Loading