From a680fdee214f143c7105739491801c7483d2096f Mon Sep 17 00:00:00 2001 From: Stas Malyshev Date: Wed, 4 Sep 2024 14:50:03 -0600 Subject: [PATCH] Improve failure handling --- .../elasticsearch/action/ActionModule.java | 2 - .../stats/RemoteClusterStatsResponse.java | 25 ------- .../stats/TransportClusterStatsAction.java | 24 +++---- .../TransportRemoteClusterStatsAction.java | 6 +- .../search/RemoteClusterActionListener.java | 68 +++++++++++++++++++ .../xpack/security/operator/Constants.java | 1 + 6 files changed, 84 insertions(+), 42 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/search/RemoteClusterActionListener.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 163e7d6ac4865..37a33eab4e4e8 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -72,7 +72,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction; -import org.elasticsearch.action.admin.cluster.stats.TransportRemoteClusterStatsAction; import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptContextAction; import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptLanguageAction; import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptAction; @@ -642,7 +641,6 @@ public void reg actions.register(TransportGetDesiredBalanceAction.TYPE, TransportGetDesiredBalanceAction.class); actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class); actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class); - actions.register(TransportRemoteClusterStatsAction.TYPE, TransportRemoteClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class); actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java index a3b0cd06338f0..65f735b02e76d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java @@ -31,7 +31,6 @@ public class RemoteClusterStatsResponse extends BaseNodesResponse getVersions() { return versions; @@ -61,14 +60,6 @@ public long getMemBytes() { return memBytes; } - public String getRemoteName() { - return remoteName; - } - - public void setRemoteName(String remoteName) { - this.remoteName = remoteName; - } - public RemoteClusterStatsResponse( ClusterName clusterName, String clusterUUID, @@ -135,20 +126,4 @@ protected List readNodesFrom(StreamInput in) throws IO @Override protected void writeNodesTo(StreamOutput out, List nodes) throws IOException {} - - /** - * Default empty response, can be used in case the cluster did not respond. - */ - public static final RemoteClusterStatsResponse EMPTY = new RemoteClusterStatsResponse( - ClusterName.DEFAULT, - "", - ClusterHealthStatus.RED, - Set.of(), - 0, - 0, - 0, - 0, - 0, - 0 - ); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 3eff0b2f765c3..b6b2cd5abb6ac 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -15,8 +15,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.search.RemoteClusterActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterSnapshotStats; import org.elasticsearch.cluster.ClusterState; @@ -40,15 +40,16 @@ import org.elasticsearch.transport.Transports; import org.elasticsearch.usage.UsageService; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; -import java.util.stream.Collectors; +/** + * Transport action implementing _cluster/stats API. + */ public class TransportClusterStatsAction extends TransportClusterStatsBaseAction { public static final ActionType TYPE = new ActionType<>("cluster:monitor/stats"); @@ -203,32 +204,27 @@ private Map getStatsFromRemotes(ClusterStats } var remotes = remoteClusterService.getRegisteredRemoteClusterNames(); - var remotesListener = new PlainActionFuture>(); - GroupedActionListener groupListener = new GroupedActionListener<>(remotes.size(), remotesListener); + var remotesListener = new PlainActionFuture>(); + var groupListener = new RemoteClusterActionListener<>(remotes.size(), remotesListener); for (String clusterAlias : remotes) { ClusterStatsRequest remoteRequest = request.subRequest(); var remoteClusterClient = remoteClusterService.getRemoteClusterClient( clusterAlias, remoteClientResponseExecutor, - RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE + RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED ); - // TODO: this should collect all successful requests, not fail once one of them fails remoteClusterClient.execute( TransportRemoteClusterStatsAction.REMOTE_TYPE, remoteRequest, - groupListener.delegateFailure((l, r) -> { - r.setRemoteName(clusterAlias); - l.onResponse(r); - }) + groupListener.remoteListener(clusterAlias) ); } try { - Collection remoteStats = remotesListener.get(); - // Convert the list to map - return remoteStats.stream().collect(Collectors.toMap(RemoteClusterStatsResponse::getRemoteName, r -> r)); + // TODO: how do we report errors? + return remotesListener.get(); } catch (InterruptedException | ExecutionException e) { logger.log(Level.ERROR, "Failed to get remote cluster stats: ", ExceptionsHelper.unwrapCause(e)); return Map.of(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java index 58281fba39e7b..00ec22b56d036 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java @@ -29,9 +29,13 @@ import java.util.HashSet; import java.util.List; +/** + * Transport action for remote cluster stats. It returs a reduced answer since most of the stats from the remote + * cluster are not needed. + */ public class TransportRemoteClusterStatsAction extends TransportClusterStatsBaseAction { - public static final ActionType TYPE = new ActionType<>("cluster:monitor/remote_stats"); + public static final ActionType TYPE = new ActionType<>("cluster:monitor/stats/remote"); public static final RemoteClusterActionType REMOTE_TYPE = new RemoteClusterActionType<>( TYPE.name(), RemoteClusterStatsResponse::new diff --git a/server/src/main/java/org/elasticsearch/action/search/RemoteClusterActionListener.java b/server/src/main/java/org/elasticsearch/action/search/RemoteClusterActionListener.java new file mode 100644 index 0000000000000..e90f79b95f6e2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/RemoteClusterActionListener.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.action.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DelegatingActionListener; +import org.elasticsearch.common.util.concurrent.CountDown; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Action listener for operations that are performed on a group of remote clusters. + * It will wait for all operations to complete and then delegate to the upstream listener. + * Does not fail if one of the operations fails. + *
+ * Returns a map of the results per cluster name via {@link #remoteListener(String)} method. + * This is the listener that should be used to perform the individual operation on the remote cluster. + * + * @param the type of the individual per-cluster result + */ +public class RemoteClusterActionListener extends DelegatingActionListener> { + private final CountDown countDown; + private final Map results; + private final AtomicReference failure = new AtomicReference<>(); + + public RemoteClusterActionListener(int groupSize, ActionListener> delegate) { + super(delegate); + if (groupSize <= 0) { + assert false : "illegal group size [" + groupSize + "]"; + throw new IllegalArgumentException("groupSize must be greater than 0 but was " + groupSize); + } + results = new ConcurrentHashMap<>(groupSize); + countDown = new CountDown(groupSize); + } + + public ActionListener remoteListener(String clusterAlias) { + return delegateFailure((l, r) -> { + results.put(clusterAlias, r); + l.onResponse(r); + }); + } + + @Override + public void onResponse(T element) { + if (countDown.countDown()) { + delegate.onResponse(results); + } + } + + @Override + public void onFailure(Exception e) { + // TODO: how do we report the failures? + final var firstException = failure.compareAndExchange(null, e); + if (firstException != null && firstException != e) { + firstException.addSuppressed(e); + } + if (countDown.countDown()) { + delegate.onResponse(results); + } + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index c5304d8313df2..14921b5ff51fd 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -364,6 +364,7 @@ public class Constants { "cluster:monitor/settings", "cluster:monitor/state", "cluster:monitor/stats", + "cluster:monitor/stats/remote", "cluster:monitor/task", "cluster:monitor/task/get", "cluster:monitor/tasks/lists",