Skip to content

Commit

Permalink
Improve failure handling
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Sep 4, 2024
1 parent e2f4573 commit 2c739e9
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.elasticsearch.action.admin.cluster.stats.TransportRemoteClusterStatsAction;
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptContextAction;
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptLanguageAction;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptAction;
Expand Down Expand Up @@ -642,7 +641,6 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(TransportGetDesiredBalanceAction.TYPE, TransportGetDesiredBalanceAction.class);
actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class);
actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class);
actions.register(TransportRemoteClusterStatsAction.TYPE, TransportRemoteClusterStatsAction.class);
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class);
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class RemoteClusterStatsResponse extends BaseNodesResponse<ClusterStatsNo
private final long indicesBytes;
private final long heapBytes;
private final long memBytes;
private String remoteName;

public Set<String> getVersions() {
return versions;
Expand Down Expand Up @@ -61,14 +60,6 @@ public long getMemBytes() {
return memBytes;
}

public String getRemoteName() {
return remoteName;
}

public void setRemoteName(String remoteName) {
this.remoteName = remoteName;
}

public RemoteClusterStatsResponse(
ClusterName clusterName,
String clusterUUID,
Expand Down Expand Up @@ -135,20 +126,4 @@ protected List<ClusterStatsNodeResponse> readNodesFrom(StreamInput in) throws IO

@Override
protected void writeNodesTo(StreamOutput out, List<ClusterStatsNodeResponse> nodes) throws IOException {}

/**
* Default empty response, can be used in case the cluster did not respond.
*/
public static final RemoteClusterStatsResponse EMPTY = new RemoteClusterStatsResponse(
ClusterName.DEFAULT,
"",
ClusterHealthStatus.RED,
Set.of(),
0,
0,
0,
0,
0,
0
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.search.RemoteClusterActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterSnapshotStats;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -40,15 +40,16 @@
import org.elasticsearch.transport.Transports;
import org.elasticsearch.usage.UsageService;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

/**
* Transport action implementing _cluster/stats API.
*/
public class TransportClusterStatsAction extends TransportClusterStatsBaseAction<ClusterStatsResponse> {

public static final ActionType<ClusterStatsResponse> TYPE = new ActionType<>("cluster:monitor/stats");
Expand Down Expand Up @@ -203,32 +204,27 @@ private Map<String, RemoteClusterStatsResponse> getStatsFromRemotes(ClusterStats
}
var remotes = remoteClusterService.getRegisteredRemoteClusterNames();

var remotesListener = new PlainActionFuture<Collection<RemoteClusterStatsResponse>>();
GroupedActionListener<RemoteClusterStatsResponse> groupListener = new GroupedActionListener<>(remotes.size(), remotesListener);
var remotesListener = new PlainActionFuture<Map<String, RemoteClusterStatsResponse>>();
var groupListener = new RemoteClusterActionListener<>(remotes.size(), remotesListener);

for (String clusterAlias : remotes) {
ClusterStatsRequest remoteRequest = request.subRequest();
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
clusterAlias,
remoteClientResponseExecutor,
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
);
// TODO: this should collect all successful requests, not fail once one of them fails
remoteClusterClient.execute(
TransportRemoteClusterStatsAction.REMOTE_TYPE,
remoteRequest,
groupListener.delegateFailure((l, r) -> {
r.setRemoteName(clusterAlias);
l.onResponse(r);
})
groupListener.remoteListener(clusterAlias)
);

}

try {
Collection<RemoteClusterStatsResponse> remoteStats = remotesListener.get();
// Convert the list to map
return remoteStats.stream().collect(Collectors.toMap(RemoteClusterStatsResponse::getRemoteName, r -> r));
// TODO: how do we report errors?
return remotesListener.get();
} catch (InterruptedException | ExecutionException e) {
logger.log(Level.ERROR, "Failed to get remote cluster stats: ", ExceptionsHelper.unwrapCause(e));
return Map.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@
import java.util.HashSet;
import java.util.List;

/**
* Transport action for remote cluster stats. It returs a reduced answer since most of the stats from the remote
* cluster are not needed.
*/
public class TransportRemoteClusterStatsAction extends TransportClusterStatsBaseAction<RemoteClusterStatsResponse> {

public static final ActionType<RemoteClusterStatsResponse> TYPE = new ActionType<>("cluster:monitor/remote_stats");
public static final ActionType<RemoteClusterStatsResponse> TYPE = new ActionType<>("cluster:monitor/stats/remote");
public static final RemoteClusterActionType<RemoteClusterStatsResponse> REMOTE_TYPE = new RemoteClusterActionType<>(
TYPE.name(),
RemoteClusterStatsResponse::new
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DelegatingActionListener;
import org.elasticsearch.common.util.concurrent.CountDown;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/**
* Action listener for operations that are performed on a group of remote clusters.
* It will wait for all operations to complete and then delegate to the upstream listener.
* Does not fail if one of the operations fails.
* <br>
* Returns a map of the results per cluster name via {@link #remoteListener(String)} method.
* This is the listener that should be used to perform the individual operation on the remote cluster.
*
* @param <T> the type of the individual per-cluster result
*/
public class RemoteClusterActionListener<T> extends DelegatingActionListener<T, Map<String, T>> {
private final CountDown countDown;
private final Map<String, T> results;
private final AtomicReference<Exception> failure = new AtomicReference<>();

public RemoteClusterActionListener(int groupSize, ActionListener<Map<String, T>> delegate) {
super(delegate);
if (groupSize <= 0) {
assert false : "illegal group size [" + groupSize + "]";
throw new IllegalArgumentException("groupSize must be greater than 0 but was " + groupSize);
}
results = new ConcurrentHashMap<>(groupSize);
countDown = new CountDown(groupSize);
}

public ActionListener<T> remoteListener(String clusterAlias) {
return delegateFailure((l, r) -> {
results.put(clusterAlias, r);
l.onResponse(r);
});
}

@Override
public void onResponse(T element) {
if (countDown.countDown()) {
delegate.onResponse(results);
}
}

@Override
public void onFailure(Exception e) {
// TODO: how do we report the failures?
final var firstException = failure.compareAndExchange(null, e);
if (firstException != null && firstException != e) {
firstException.addSuppressed(e);
}
if (countDown.countDown()) {
delegate.onResponse(results);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ public void testToXContent() throws IOException {
MappingStats.of(metadata, () -> {}),
AnalysisStats.of(metadata, () -> {}),
VersionStats.of(metadata, singletonList(mockNodeResponse)),
ClusterSnapshotStats.EMPTY
ClusterSnapshotStats.EMPTY,
Map.of()
);

final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ public class Constants {
"cluster:monitor/settings",
"cluster:monitor/state",
"cluster:monitor/stats",
"cluster:monitor/stats/remote",
"cluster:monitor/task",
"cluster:monitor/task/get",
"cluster:monitor/tasks/lists",
Expand Down

0 comments on commit 2c739e9

Please sign in to comment.