Skip to content

Commit

Permalink
[improve][broker]PIP-340 Optimization of Probe Implementation for Aut…
Browse files Browse the repository at this point in the history
…omatic Failover
  • Loading branch information
yyj8 committed Feb 22, 2024
1 parent 8a18043 commit d333e48
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 186 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resources;

import java.util.Optional;
import java.util.function.Function;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;

public class ClusterHealthStatusResources extends BaseResources<String> {
public static final String BASE_PATH = "/health-status/";

public ClusterHealthStatusResources(MetadataStore store, int operationTimeoutSec) {
super(store, String.class, operationTimeoutSec);
}

public void updateHealthStatus(String clusterName, Function<String, String> modifyFunction)
throws MetadataStoreException {
set(joinPath(BASE_PATH, clusterName), modifyFunction);
}

public Optional<String> getHealthStatus(String clusterName) throws MetadataStoreException {
return get(joinPath(BASE_PATH, clusterName));
}

public enum Status {
available,
unavailable
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class PulsarResources {
@Getter
private final ClusterResources clusterResources;
@Getter
private final ClusterHealthStatusResources clusterHealthStatusResources;
@Getter
private final ResourceGroupResources resourcegroupResources;
@Getter
private final NamespaceResources namespaceResources;
Expand Down Expand Up @@ -63,11 +65,14 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore,
operationTimeoutSec);
clusterHealthStatusResources = new ClusterHealthStatusResources(localMetadataStore,
operationTimeoutSec);
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
tenantResources = null;
clusterResources = null;
clusterHealthStatusResources = null;
namespaceResources = null;
resourcegroupResources = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTopicTransportManager;
import org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager;
import org.apache.pulsar.broker.resources.ClusterHealthStatusResources;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.rest.Topics;
Expand Down Expand Up @@ -752,6 +754,14 @@ public void start() throws PulsarServerException {
localMetadataStore = createLocalMetadataStore(localMetadataSynchronizer);
localMetadataStore.registerSessionListener(this::handleMetadataSessionEvent);

String healthStatusPath = ClusterHealthStatusResources.BASE_PATH
+ config.getClusterName();
if (!localMetadataStore.exists(healthStatusPath).get(30000, TimeUnit.MICROSECONDS)) {
localMetadataStore.put(healthStatusPath,
ClusterHealthStatusResources.Status.available.name().getBytes(StandardCharsets.UTF_8),
Optional.of(-1L));
}

coordinationService = new CoordinationServiceImpl(localMetadataStore);

if (config.isConfigurationStoreSeparated()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,92 @@ public void deleteFailureDomain(
});
}

@POST
@Path("/{cluster}/updateHealthStatus}")
@ApiOperation(
value = "Update cluster health status.",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void updateHealthStatus(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster,
@ApiParam(value = "The status info", required = true) Map<String, String> status
) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
.thenAccept(__ -> {
try {
clusterHealthStatusResources().updateHealthStatus(cluster,
old -> status.getOrDefault("status", ""));
log.info("[{}] Successful update health status for cluster {}", clientAppId(), cluster);
asyncResponse.resume(Response.noContent().build());
} catch (MetadataStoreException e) {
log.error("Update cluster {} health status error", cluster, e);
}
})
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotFoundException) {
log.warn("[{}] Failed to update health status for cluster. clusters {} Does not exist",
clientAppId(), cluster);
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"cluster " + cluster + " does not exist"));
return null;
}
log.error("[{}] Failed to update clusters/{}/{}", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
@Path("/{cluster}/getHealthStatus}")
@ApiOperation(
value = "Get cluster health status.",
notes = "This operation requires Pulsar superuser privileges."
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission."),
@ApiResponse(code = 412, message = "Cluster doesn't exist."),
@ApiResponse(code = 500, message = "Internal server error.")
})
public void getHealthStatus(
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster
) {
validateSuperUserAccessAsync()
.thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED))
.thenAccept(__ -> {
try {
String status = clusterHealthStatusResources().getHealthStatus(cluster).get();
log.info("[{}] Successful get health status for cluster {}", clientAppId(), cluster);
asyncResponse.resume(Response.ok(status));
} catch (MetadataStoreException e) {
log.error("Update cluster {} health status error", cluster, e);
}
})
.exceptionally(ex -> {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof NotFoundException) {
log.warn("[{}] Failed to get health status for cluster. clusters {} Does not exist",
clientAppId(), cluster);
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"cluster " + cluster + " does not exist"));
return null;
}
log.error("[{}] Failed to get clusters/{}/{}", clientAppId(), cluster, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

private CompletableFuture<Void> validateBrokerExistsInOtherDomain(final String cluster,
final String inputDomainName,
final FailureDomainImpl inputDomain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
Expand All @@ -81,6 +77,7 @@
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.resources.ClusterHealthStatusResources;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
Expand All @@ -91,58 +88,18 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.internal.BaseResource;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn;
import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.common.api.proto.CommandConsumerStats;
import org.apache.pulsar.common.api.proto.CommandEndTxn;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartition;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscription;
import org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.common.api.proto.CommandGetLastMessageId;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandNewTxn;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
import org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.*;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType;
import org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.common.api.proto.CommandWatchTopicList;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.intercept.InterceptException;
Expand Down Expand Up @@ -1114,6 +1071,30 @@ protected void handleConnect(CommandConnect connect) {
}
}


@Override
protected void handleHealthCheck(CommandHealthCheck healthCheck) {
checkArgument(state == State.Start);

String clusterName = this.service.pulsar().getConfig().getClusterName();
try {
byte[] status = this.service.pulsar().getLocalMetadataStore()
.get(ClusterHealthStatusResources.BASE_PATH + clusterName)
.get(30000, TimeUnit.MILLISECONDS)
.get()
.getValue();
if (String.valueOf(status).equalsIgnoreCase(ClusterHealthStatusResources.Status.available.name())) {
writeAndFlush(Commands.newHealthCheckResponse(true));
} else {
writeAndFlush(Commands.newHealthCheckResponse(false));
}
} catch (Exception e) {
log.error("cluster health status check error.", e);
writeAndFlush(Commands.newError(-1L, ServerError.UnknownError,
"cluster health status check error."));
}
}

@Override
protected void handleAuthResponse(CommandAuthResponse authResponse) {
checkArgument(authResponse.hasResponse());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.BookieResources;
import org.apache.pulsar.broker.resources.ClusterHealthStatusResources;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
import org.apache.pulsar.broker.resources.LoadBalanceResources;
Expand Down Expand Up @@ -1104,6 +1105,10 @@ protected ClusterResources clusterResources() {
return pulsar().getPulsarResources().getClusterResources();
}

protected ClusterHealthStatusResources clusterHealthStatusResources() {
return pulsar().getPulsarResources().getClusterHealthStatusResources();
}

protected BookieResources bookieResources() {
return pulsar().getPulsarResources().getBookieResources();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,4 +809,31 @@ void updateFailureDomain(String cluster, String domainName, FailureDomain domain
*/
CompletableFuture<FailureDomain> getFailureDomainAsync(String cluster, String domainName);

/**
* Update health status for a cluster.
* <p/>
*
* @param cluster
* Cluster name
*
* @param status
* health status
*
* @return
*
*/
CompletableFuture<Void> updateHealthStatusAsync(String cluster, String status);

/**
* Get health status for a cluster.
* <p/>
*
* @param cluster
* Cluster name
*
* @return
*
*/
CompletableFuture<String> getHealthStatusAsync(String cluster);

}
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,21 @@ public CompletableFuture<FailureDomain> getFailureDomainAsync(String cluster, St
.thenApply(failureDomain -> failureDomain);
}

@Override
public CompletableFuture<Void> updateHealthStatusAsync(String cluster, String status) {
WebTarget path = adminClusters.path(cluster).path("updateHealthStatus");
Map<String, String> statusMap = new HashMap<>();
statusMap.put("status", status);
return asyncPostRequest(path, Entity.entity(statusMap, MediaType.APPLICATION_JSON_TYPE));
}

@Override
public CompletableFuture<String> getHealthStatusAsync(String cluster) {
WebTarget path = adminClusters.path(cluster).path("getHealthStatus");
return asyncGetRequest(path, new FutureCallback<String>() {})
.thenApply(status -> status);
}

private void setDomain(String cluster, String domainName,
FailureDomain domain) throws PulsarAdminException {
sync(() -> setDomainAsync(cluster, domainName, domain));
Expand Down
Loading

0 comments on commit d333e48

Please sign in to comment.