From e2f457378411c710a4dd8a30dc6ae85634ffa5d1 Mon Sep 17 00:00:00 2001 From: Stas Malyshev Date: Tue, 3 Sep 2024 14:04:41 -0600 Subject: [PATCH] Implement remote cluster stats polling --- .../elasticsearch/action/ActionModule.java | 2 + .../cluster/stats/ClusterStatsRequest.java | 20 ++ .../cluster/stats/ClusterStatsResponse.java | 51 +++- .../stats/RemoteClusterStatsResponse.java | 154 +++++++++++ .../stats/TransportClusterStatsAction.java | 247 ++++-------------- .../TransportClusterStatsBaseAction.java | 238 +++++++++++++++++ .../TransportRemoteClusterStatsAction.java | 124 +++++++++ .../support/nodes/BaseNodesRequest.java | 2 +- .../transport/RemoteConnectionInfo.java | 1 - 9 files changed, 623 insertions(+), 216 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsBaseAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 37a33eab4e4e8..163e7d6ac4865 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -72,6 +72,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction; +import org.elasticsearch.action.admin.cluster.stats.TransportRemoteClusterStatsAction; import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptContextAction; import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptLanguageAction; import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptAction; @@ -641,6 +642,7 @@ public void reg actions.register(TransportGetDesiredBalanceAction.TYPE, TransportGetDesiredBalanceAction.class); actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class); actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class); + actions.register(TransportRemoteClusterStatsAction.TYPE, TransportRemoteClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class); actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java index 5d040af4bb4ba..f2cbbd6a4efb5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java @@ -9,16 +9,23 @@ package org.elasticsearch.action.admin.cluster.stats; import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import java.io.IOException; import java.util.Map; /** * A request to get cluster level stats. + * This request can be used both to request stats from single cluster or from remote cluster. */ public class ClusterStatsRequest extends BaseNodesRequest { + /** + * Should the remote cluster stats be included in the response. + */ private final boolean doRemotes; /** @@ -34,6 +41,12 @@ public ClusterStatsRequest(boolean doRemotes, String... nodesIds) { this.doRemotes = doRemotes; } + public ClusterStatsRequest(StreamInput in) throws IOException { + super(in.readStringArray()); + // We will never ask the remote to collect remote stats + doRemotes = false; + } + @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { return new CancellableTask(id, type, action, "", parentTaskId, headers); @@ -49,4 +62,11 @@ public boolean doRemotes() { public ClusterStatsRequest subRequest() { return new ClusterStatsRequest(false, nodesIds()); } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeStringArrayNullable(nodesIds()); + // We will never ask remote to collect remote stats + } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java index 991d509a36367..7136795be721c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG; @@ -160,24 +161,46 @@ public static class RemoteClusterStats implements ToXContentFragment { private final String clusterUUID; private final String mode; private final boolean skipUnavailable; - private final boolean transportCompress; - private final List versions; + private final String transportCompress; + private final Set versions; private final String status; + private final long nodesCount; + private final long shardsCount; + private final long indicesCount; + private final long indicesBytes; + private final long heapBytes; + private final long memBytes; public RemoteClusterStats( - String clusterUUID, + RemoteClusterStatsResponse remoteResponse, String mode, boolean skipUnavailable, - boolean transportCompress, - List versions, - String status + String transportCompress ) { - this.clusterUUID = clusterUUID; this.mode = mode; this.skipUnavailable = skipUnavailable; - this.transportCompress = transportCompress; - this.versions = versions; - this.status = status; + this.transportCompress = transportCompress.toLowerCase(Locale.ROOT); + if (remoteResponse != null) { + this.clusterUUID = remoteResponse.getClusterUUID(); + this.versions = remoteResponse.getVersions(); + this.status = remoteResponse.getStatus().name().toLowerCase(Locale.ROOT); + this.nodesCount = remoteResponse.getNodesCount(); + this.shardsCount = remoteResponse.getShardsCount(); + this.indicesCount = remoteResponse.getIndicesCount(); + this.indicesBytes = remoteResponse.getIndicesBytes(); + this.heapBytes = remoteResponse.getHeapBytes(); + this.memBytes = remoteResponse.getMemBytes(); + } else { + this.status = "unavailable"; + this.clusterUUID = "unavailable"; + this.versions = Set.of(); + this.nodesCount = 0; + this.shardsCount = 0; + this.indicesCount = 0; + this.indicesBytes = 0; + this.heapBytes = 0; + this.memBytes = 0; + } } @Override @@ -187,8 +210,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("mode", mode); builder.field("skip_unavailable", skipUnavailable); builder.field("transport.compress", transportCompress); - builder.field("version", versions); builder.field("status", status); + builder.field("version", versions); + builder.field("nodes_count", nodesCount); + builder.field("shards_count", shardsCount); + builder.field("indices_count", indicesCount); + builder.field("indices_total_size_bytes", indicesBytes); + builder.field("max_heap_bytes", heapBytes); + builder.field("mem_total_bytes", memBytes); builder.endObject(); return builder; } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java new file mode 100644 index 0000000000000..a3b0cd06338f0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java @@ -0,0 +1,154 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +/** + * Trimmed down cluster stats response for reporting to a remote cluster. + */ +public class RemoteClusterStatsResponse extends BaseNodesResponse { + final String clusterUUID; + final ClusterHealthStatus status; + private final Set versions; + private final long nodesCount; + private final long shardsCount; + private final long indicesCount; + private final long indicesBytes; + private final long heapBytes; + private final long memBytes; + private String remoteName; + + public Set getVersions() { + return versions; + } + + public long getNodesCount() { + return nodesCount; + } + + public long getShardsCount() { + return shardsCount; + } + + public long getIndicesCount() { + return indicesCount; + } + + public long getIndicesBytes() { + return indicesBytes; + } + + public long getHeapBytes() { + return heapBytes; + } + + public long getMemBytes() { + return memBytes; + } + + public String getRemoteName() { + return remoteName; + } + + public void setRemoteName(String remoteName) { + this.remoteName = remoteName; + } + + public RemoteClusterStatsResponse( + ClusterName clusterName, + String clusterUUID, + ClusterHealthStatus status, + Set versions, + long nodesCount, + long shardsCount, + long indicesCount, + long indicesBytes, + long heapBytes, + long memBytes + ) { + super(clusterName, List.of(), List.of()); + this.clusterUUID = clusterUUID; + this.status = status; + this.versions = versions; + this.nodesCount = nodesCount; + this.shardsCount = shardsCount; + this.indicesCount = indicesCount; + this.indicesBytes = indicesBytes; + this.heapBytes = heapBytes; + this.memBytes = memBytes; + } + + public String getClusterUUID() { + return this.clusterUUID; + } + + public ClusterHealthStatus getStatus() { + return this.status; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(clusterUUID); + status.writeTo(out); + out.writeStringCollection(versions); + out.writeLong(nodesCount); + out.writeLong(shardsCount); + out.writeLong(indicesCount); + out.writeLong(indicesBytes); + out.writeLong(heapBytes); + out.writeLong(memBytes); + } + + public RemoteClusterStatsResponse(StreamInput in) throws IOException { + super(in); + this.clusterUUID = in.readString(); + this.status = ClusterHealthStatus.readFrom(in); + this.versions = in.readCollectionAsSet(StreamInput::readString); + this.nodesCount = in.readLong(); + this.shardsCount = in.readLong(); + this.indicesCount = in.readLong(); + this.indicesBytes = in.readLong(); + this.heapBytes = in.readLong(); + this.memBytes = in.readLong(); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return List.of(); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException {} + + /** + * Default empty response, can be used in case the cluster did not respond. + */ + public static final RemoteClusterStatsResponse EMPTY = new RemoteClusterStatsResponse( + ClusterName.DEFAULT, + "", + ClusterHealthStatus.RED, + Set.of(), + 0, + 0, + 0, + 0, + 0, + 0 + ); +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 35a6f66c34f45..3eff0b2f765c3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -8,57 +8,38 @@ package org.elasticsearch.action.admin.cluster.stats; -import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.TransportVersions; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; -import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; -import org.elasticsearch.action.admin.indices.stats.CommonStats; -import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.nodes.TransportNodesAction; import org.elasticsearch.cluster.ClusterSnapshotStats; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CancellableSingleObjectCache; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.core.UpdateForV9; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.CommitStats; -import org.elasticsearch.index.seqno.RetentionLeaseStats; -import org.elasticsearch.index.seqno.SeqNoStats; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.node.NodeService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterConnection; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.RemoteConnectionInfo; -import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.Transports; -import org.elasticsearch.usage.SearchUsageHolder; import org.elasticsearch.usage.UsageService; -import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -66,34 +47,17 @@ import java.util.concurrent.ExecutionException; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; -public class TransportClusterStatsAction extends TransportNodesAction< - ClusterStatsRequest, - ClusterStatsResponse, - TransportClusterStatsAction.ClusterStatsNodeRequest, - ClusterStatsNodeResponse> { +public class TransportClusterStatsAction extends TransportClusterStatsBaseAction { public static final ActionType TYPE = new ActionType<>("cluster:monitor/stats"); - private static final CommonStatsFlags SHARD_STATS_FLAGS = new CommonStatsFlags( - CommonStatsFlags.Flag.Docs, - CommonStatsFlags.Flag.Store, - CommonStatsFlags.Flag.FieldData, - CommonStatsFlags.Flag.QueryCache, - CommonStatsFlags.Flag.Completion, - CommonStatsFlags.Flag.Segments, - CommonStatsFlags.Flag.DenseVector, - CommonStatsFlags.Flag.SparseVector - ); - - private final NodeService nodeService; - private final IndicesService indicesService; - private final RepositoriesService repositoriesService; - private final SearchUsageHolder searchUsageHolder; - private final CCSUsageTelemetry ccsUsageHolder; private final MetadataStatsCache mappingStatsCache; private final MetadataStatsCache analysisStatsCache; private final RemoteClusterService remoteClusterService; + private static final Logger logger = LogManager.getLogger(TransportClusterStatsAction.class); + private final Settings settings; @Inject public TransportClusterStatsAction( @@ -104,24 +68,24 @@ public TransportClusterStatsAction( IndicesService indicesService, RepositoriesService repositoriesService, UsageService usageService, - ActionFilters actionFilters + ActionFilters actionFilters, + Settings settings ) { super( TYPE.name(), + threadPool, clusterService, transportService, - actionFilters, - ClusterStatsNodeRequest::new, - threadPool.executor(ThreadPool.Names.MANAGEMENT) + nodeService, + indicesService, + repositoriesService, + usageService, + actionFilters ); - this.nodeService = nodeService; - this.indicesService = indicesService; - this.repositoriesService = repositoriesService; - this.searchUsageHolder = usageService.getSearchUsageHolder(); - this.ccsUsageHolder = usageService.getCcsUsageHolder(); this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of); this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); this.remoteClusterService = transportService.getRemoteClusterService(); + this.settings = settings; } @Override @@ -179,129 +143,6 @@ protected void newResponseAsync( ); } - @Override - protected ClusterStatsResponse newResponse( - ClusterStatsRequest request, - List responses, - List failures - ) { - assert false; - throw new UnsupportedOperationException("use newResponseAsync instead"); - } - - @Override - protected ClusterStatsNodeRequest newNodeRequest(ClusterStatsRequest request) { - return new ClusterStatsNodeRequest(); - } - - @Override - protected ClusterStatsNodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { - return new ClusterStatsNodeResponse(in); - } - - @Override - protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) { - assert task instanceof CancellableTask; - final CancellableTask cancellableTask = (CancellableTask) task; - NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, false, true, false, false, false); - NodeStats nodeStats = nodeService.stats( - CommonStatsFlags.NONE, - false, - true, - true, - true, - false, - true, - false, - false, - false, - false, - false, - true, - false, - false, - false, - false - ); - List shardsStats = new ArrayList<>(); - for (IndexService indexService : indicesService) { - for (IndexShard indexShard : indexService) { - cancellableTask.ensureNotCancelled(); - if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) { - // only report on fully started shards - CommitStats commitStats; - SeqNoStats seqNoStats; - RetentionLeaseStats retentionLeaseStats; - try { - commitStats = indexShard.commitStats(); - seqNoStats = indexShard.seqNoStats(); - retentionLeaseStats = indexShard.getRetentionLeaseStats(); - } catch (final AlreadyClosedException e) { - // shard is closed - no stats is fine - commitStats = null; - seqNoStats = null; - retentionLeaseStats = null; - } - shardsStats.add( - new ShardStats( - indexShard.routingEntry(), - indexShard.shardPath(), - CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), - commitStats, - seqNoStats, - retentionLeaseStats, - indexShard.isSearchIdle(), - indexShard.searchIdleTime() - ) - ); - } - } - } - - final ClusterState clusterState = clusterService.state(); - final ClusterHealthStatus clusterStatus = clusterState.nodes().isLocalNodeElectedMaster() - ? new ClusterStateHealth(clusterState).getStatus() - : null; - - final SearchUsageStats searchUsageStats = searchUsageHolder.getSearchUsageStats(); - - final RepositoryUsageStats repositoryUsageStats = repositoriesService.getUsageStats(); - final CCSTelemetrySnapshot ccsUsage = ccsUsageHolder.getCCSTelemetrySnapshot(); - - return new ClusterStatsNodeResponse( - nodeInfo.getNode(), - clusterStatus, - nodeInfo, - nodeStats, - shardsStats.toArray(new ShardStats[shardsStats.size()]), - searchUsageStats, - repositoryUsageStats, - ccsUsage - ); - } - - @UpdateForV9 // this can be replaced with TransportRequest.Empty in v9 - public static class ClusterStatsNodeRequest extends TransportRequest { - - ClusterStatsNodeRequest() {} - - public ClusterStatsNodeRequest(StreamInput in) throws IOException { - super(in); - skipLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, in); - } - - @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new CancellableTask(id, type, action, "", parentTaskId, headers); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - sendLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, out); - } - } - private static class MetadataStatsCache extends CancellableSingleObjectCache { private final BiFunction function; @@ -336,34 +177,34 @@ private Map getRemoteClusterSta return null; } Map remoteClustersStats = new HashMap<>(); + Map remoteData = getStatsFromRemotes(request); for (String clusterAlias : remoteClusterService.getRegisteredRemoteClusterNames()) { RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias); RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo(); + RemoteClusterStatsResponse response = remoteData.get(clusterAlias); + var compression = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); var remoteClusterStats = new ClusterStatsResponse.RemoteClusterStats( - "UUID", // TODO cluster_uuid + response, remoteConnectionInfo.getModeInfo().modeName(), remoteConnection.isSkipUnavailable(), - false, // TODO transport.compress - List.of(), // TODO version - "green" // TODO status + compression.toString() ); remoteClustersStats.put(clusterAlias, remoteClusterStats); } return remoteClustersStats; } - private Collection getStatsFromRemotes(ClusterStatsRequest request) { + private Map getStatsFromRemotes(ClusterStatsRequest request) { + // TODO: make correct pool + final var remoteClientResponseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT); if (request.doRemotes() == false) { - return null; + return Map.of(); } var remotes = remoteClusterService.getRegisteredRemoteClusterNames(); - var remotesListener = new PlainActionFuture< Collection>(); - GroupedActionListener groupListener = new GroupedActionListener( - remotes.size(), - remotesListener - ); + var remotesListener = new PlainActionFuture>(); + GroupedActionListener groupListener = new GroupedActionListener<>(remotes.size(), remotesListener); for (String clusterAlias : remotes) { ClusterStatsRequest remoteRequest = request.subRequest(); @@ -372,26 +213,26 @@ private Collection getStatsFromRemotes(ClusterStatsRequest remoteClientResponseExecutor, RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE ); - remoteClusterService.getConnection(clusterAlias).sendRequest( - 1, - , - remoteRequest, - null + // TODO: this should collect all successful requests, not fail once one of them fails + remoteClusterClient.execute( + TransportRemoteClusterStatsAction.REMOTE_TYPE, + remoteRequest, + groupListener.delegateFailure((l, r) -> { + r.setRemoteName(clusterAlias); + l.onResponse(r); + }) ); - remoteClusterClient.execute(TransportClusterStatsAction.TYPE, remoteRequest, groupListener); + } - Collection remoteStats = null; try { - remoteStats = remotesListener.get(); - } catch (InterruptedException e) { - return null; - } catch (ExecutionException e) { - return null; + Collection remoteStats = remotesListener.get(); + // Convert the list to map + return remoteStats.stream().collect(Collectors.toMap(RemoteClusterStatsResponse::getRemoteName, r -> r)); + } catch (InterruptedException | ExecutionException e) { + logger.log(Level.ERROR, "Failed to get remote cluster stats: ", ExceptionsHelper.unwrapCause(e)); + return Map.of(); } - - return remoteStats; - } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsBaseAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsBaseAction.java new file mode 100644 index 0000000000000..92523d1c60abc --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsBaseAction.java @@ -0,0 +1,238 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterStateHealth; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.UpdateForV9; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.seqno.RetentionLeaseStats; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.node.NodeService; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.usage.SearchUsageHolder; +import org.elasticsearch.usage.UsageService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Base class for cluster stats actions. Implements everything except the final response generation. + */ +public abstract class TransportClusterStatsBaseAction> extends TransportNodesAction< + ClusterStatsRequest, + FinalResponse, + TransportClusterStatsBaseAction.ClusterStatsNodeRequest, + ClusterStatsNodeResponse> { + + private static final CommonStatsFlags SHARD_STATS_FLAGS = new CommonStatsFlags( + CommonStatsFlags.Flag.Docs, + CommonStatsFlags.Flag.Store, + CommonStatsFlags.Flag.FieldData, + CommonStatsFlags.Flag.QueryCache, + CommonStatsFlags.Flag.Completion, + CommonStatsFlags.Flag.Segments, + CommonStatsFlags.Flag.DenseVector, + CommonStatsFlags.Flag.SparseVector + ); + + private final NodeService nodeService; + private final IndicesService indicesService; + private final RepositoriesService repositoriesService; + private final SearchUsageHolder searchUsageHolder; + private final CCSUsageTelemetry ccsUsageHolder; + + @Inject + public TransportClusterStatsBaseAction( + String typeName, + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + NodeService nodeService, + IndicesService indicesService, + RepositoriesService repositoriesService, + UsageService usageService, + ActionFilters actionFilters + ) { + super( + typeName, + clusterService, + transportService, + actionFilters, + ClusterStatsNodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.nodeService = nodeService; + this.indicesService = indicesService; + this.repositoriesService = repositoriesService; + this.searchUsageHolder = usageService.getSearchUsageHolder(); + this.ccsUsageHolder = usageService.getCcsUsageHolder(); + } + + @Override + protected abstract void newResponseAsync( + Task task, + ClusterStatsRequest request, + List responses, + List failures, + ActionListener listener + ); + + @Override + protected FinalResponse newResponse( + ClusterStatsRequest request, + List responses, + List failures + ) { + assert false; + throw new UnsupportedOperationException("use newResponseAsync instead"); + } + + @Override + protected ClusterStatsNodeRequest newNodeRequest(ClusterStatsRequest request) { + return new ClusterStatsNodeRequest(); + } + + @Override + protected ClusterStatsNodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new ClusterStatsNodeResponse(in); + } + + @Override + protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeRequest, Task task) { + assert task instanceof CancellableTask; + final CancellableTask cancellableTask = (CancellableTask) task; + NodeInfo nodeInfo = nodeService.info(true, true, false, true, false, true, false, false, true, false, false, false); + NodeStats nodeStats = nodeService.stats( + CommonStatsFlags.NONE, + false, + true, + true, + true, + false, + true, + false, + false, + false, + false, + false, + true, + false, + false, + false, + false + ); + List shardsStats = new ArrayList<>(); + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + cancellableTask.ensureNotCancelled(); + if (indexShard.routingEntry() != null && indexShard.routingEntry().active()) { + // only report on fully started shards + CommitStats commitStats; + SeqNoStats seqNoStats; + RetentionLeaseStats retentionLeaseStats; + try { + commitStats = indexShard.commitStats(); + seqNoStats = indexShard.seqNoStats(); + retentionLeaseStats = indexShard.getRetentionLeaseStats(); + } catch (final AlreadyClosedException e) { + // shard is closed - no stats is fine + commitStats = null; + seqNoStats = null; + retentionLeaseStats = null; + } + shardsStats.add( + new ShardStats( + indexShard.routingEntry(), + indexShard.shardPath(), + CommonStats.getShardLevelStats(indicesService.getIndicesQueryCache(), indexShard, SHARD_STATS_FLAGS), + commitStats, + seqNoStats, + retentionLeaseStats, + indexShard.isSearchIdle(), + indexShard.searchIdleTime() + ) + ); + } + } + } + + final ClusterState clusterState = clusterService.state(); + final ClusterHealthStatus clusterStatus = clusterState.nodes().isLocalNodeElectedMaster() + ? new ClusterStateHealth(clusterState).getStatus() + : null; + + final SearchUsageStats searchUsageStats = searchUsageHolder.getSearchUsageStats(); + + final RepositoryUsageStats repositoryUsageStats = repositoriesService.getUsageStats(); + final CCSTelemetrySnapshot ccsUsage = ccsUsageHolder.getCCSTelemetrySnapshot(); + + return new ClusterStatsNodeResponse( + nodeInfo.getNode(), + clusterStatus, + nodeInfo, + nodeStats, + shardsStats.toArray(new ShardStats[shardsStats.size()]), + searchUsageStats, + repositoryUsageStats, + ccsUsage + ); + } + + @UpdateForV9 // this can be replaced with TransportRequest.Empty in v9 + public static class ClusterStatsNodeRequest extends TransportRequest { + + ClusterStatsNodeRequest() {} + + public ClusterStatsNodeRequest(StreamInput in) throws IOException { + super(in); + skipLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, in); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sendLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, out); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java new file mode 100644 index 0000000000000..58281fba39e7b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.RemoteClusterActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.node.NodeService; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.usage.UsageService; + +import java.util.HashSet; +import java.util.List; + +public class TransportRemoteClusterStatsAction extends TransportClusterStatsBaseAction { + + public static final ActionType TYPE = new ActionType<>("cluster:monitor/remote_stats"); + public static final RemoteClusterActionType REMOTE_TYPE = new RemoteClusterActionType<>( + TYPE.name(), + RemoteClusterStatsResponse::new + ); + + @Inject + public TransportRemoteClusterStatsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + NodeService nodeService, + IndicesService indicesService, + RepositoriesService repositoriesService, + UsageService usageService, + ActionFilters actionFilters + ) { + super( + TYPE.name(), + threadPool, + clusterService, + transportService, + nodeService, + indicesService, + repositoriesService, + usageService, + actionFilters + ); + transportService.registerRequestHandler( + TYPE.name(), + // TODO: which executor here? + threadPool.executor(ThreadPool.Names.MANAGEMENT), + ClusterStatsRequest::new, + (request, channel, task) -> execute(task, request, new ActionListener<>() { + @Override + public void onResponse(RemoteClusterStatsResponse response) { + channel.sendResponse(response); + } + + @Override + public void onFailure(Exception e) { + channel.sendResponse(e); + } + }) + ); + } + + @Override + protected void newResponseAsync( + final Task task, + final ClusterStatsRequest request, + final List responses, + final List failures, + final ActionListener listener + ) { + final ClusterState state = clusterService.state(); + final Metadata metadata = state.metadata(); + ClusterHealthStatus status = null; + long totalShards = 0; + long indicesBytes = 0; + var indexSet = new HashSet(); + + for (ClusterStatsNodeResponse r : responses) { + totalShards += r.shardsStats().length; + for (var shard : r.shardsStats()) { + indexSet.add(shard.getShardRouting().getIndexName()); + if (shard.getStats().getStore() != null) { + indicesBytes += shard.getStats().getStore().totalDataSetSizeInBytes(); + } + } + if (status == null && r.clusterStatus() != null) { + status = r.clusterStatus(); + } + } + + ClusterStatsNodes nodesStats = new ClusterStatsNodes(responses); + RemoteClusterStatsResponse response = new RemoteClusterStatsResponse( + clusterService.getClusterName(), + metadata.clusterUUID(), + status, + nodesStats.getVersions(), + nodesStats.getCounts().getTotal(), + totalShards, + indexSet.size(), + indicesBytes, + nodesStats.getJvm().getHeapMax().getBytes(), + nodesStats.getOs().getMem().getTotal().getBytes() + ); + listener.onResponse(response); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java index d8628db4047e6..7b20676f831d1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesRequest.java @@ -68,7 +68,7 @@ public ActionRequestValidationException validate() { } @Override - public final void writeTo(StreamOutput out) throws IOException { + public void writeTo(StreamOutput out) throws IOException { // `BaseNodesRequest` is rather heavyweight, especially all those `DiscoveryNodes` objects in larger clusters, and there is no need // to send it out over the wire. Use a dedicated transport request just for the bits you need. TransportAction.localOnly(); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java index 27f894ff1c3aa..8e0b17b50fbaf 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java @@ -30,7 +30,6 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable final ModeInfo modeInfo; final TimeValue initialConnectionTimeout; final String clusterAlias; - final boolean skipUnavailable; final boolean hasClusterCredentials;