Skip to content

Commit

Permalink
Implement remote cluster stats polling
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Sep 4, 2024
1 parent 2df42c9 commit e2f4573
Show file tree
Hide file tree
Showing 9 changed files with 623 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.elasticsearch.action.admin.cluster.stats.TransportRemoteClusterStatsAction;
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptContextAction;
import org.elasticsearch.action.admin.cluster.storedscripts.GetScriptLanguageAction;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptAction;
Expand Down Expand Up @@ -641,6 +642,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(TransportGetDesiredBalanceAction.TYPE, TransportGetDesiredBalanceAction.class);
actions.register(TransportDeleteDesiredBalanceAction.TYPE, TransportDeleteDesiredBalanceAction.class);
actions.register(TransportClusterStatsAction.TYPE, TransportClusterStatsAction.class);
actions.register(TransportRemoteClusterStatsAction.TYPE, TransportRemoteClusterStatsAction.class);
actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
actions.register(TransportClusterHealthAction.TYPE, TransportClusterHealthAction.class);
actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@
package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;

/**
* A request to get cluster level stats.
* This request can be used both to request stats from single cluster or from remote cluster.
*/
public class ClusterStatsRequest extends BaseNodesRequest<ClusterStatsRequest> {
/**
* Should the remote cluster stats be included in the response.
*/
private final boolean doRemotes;

/**
Expand All @@ -34,6 +41,12 @@ public ClusterStatsRequest(boolean doRemotes, String... nodesIds) {
this.doRemotes = doRemotes;
}

public ClusterStatsRequest(StreamInput in) throws IOException {
super(in.readStringArray());
// We will never ask the remote to collect remote stats
doRemotes = false;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
Expand All @@ -49,4 +62,11 @@ public boolean doRemotes() {
public ClusterStatsRequest subRequest() {
return new ClusterStatsRequest(false, nodesIds());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArrayNullable(nodesIds());
// We will never ask remote to collect remote stats
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG;

Expand Down Expand Up @@ -160,24 +161,46 @@ public static class RemoteClusterStats implements ToXContentFragment {
private final String clusterUUID;
private final String mode;
private final boolean skipUnavailable;
private final boolean transportCompress;
private final List<String> versions;
private final String transportCompress;
private final Set<String> versions;
private final String status;
private final long nodesCount;
private final long shardsCount;
private final long indicesCount;
private final long indicesBytes;
private final long heapBytes;
private final long memBytes;

public RemoteClusterStats(
String clusterUUID,
RemoteClusterStatsResponse remoteResponse,
String mode,
boolean skipUnavailable,
boolean transportCompress,
List<String> versions,
String status
String transportCompress
) {
this.clusterUUID = clusterUUID;
this.mode = mode;
this.skipUnavailable = skipUnavailable;
this.transportCompress = transportCompress;
this.versions = versions;
this.status = status;
this.transportCompress = transportCompress.toLowerCase(Locale.ROOT);
if (remoteResponse != null) {
this.clusterUUID = remoteResponse.getClusterUUID();
this.versions = remoteResponse.getVersions();
this.status = remoteResponse.getStatus().name().toLowerCase(Locale.ROOT);
this.nodesCount = remoteResponse.getNodesCount();
this.shardsCount = remoteResponse.getShardsCount();
this.indicesCount = remoteResponse.getIndicesCount();
this.indicesBytes = remoteResponse.getIndicesBytes();
this.heapBytes = remoteResponse.getHeapBytes();
this.memBytes = remoteResponse.getMemBytes();
} else {
this.status = "unavailable";
this.clusterUUID = "unavailable";
this.versions = Set.of();
this.nodesCount = 0;
this.shardsCount = 0;
this.indicesCount = 0;
this.indicesBytes = 0;
this.heapBytes = 0;
this.memBytes = 0;
}
}

@Override
Expand All @@ -187,8 +210,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("mode", mode);
builder.field("skip_unavailable", skipUnavailable);
builder.field("transport.compress", transportCompress);
builder.field("version", versions);
builder.field("status", status);
builder.field("version", versions);
builder.field("nodes_count", nodesCount);
builder.field("shards_count", shardsCount);
builder.field("indices_count", indicesCount);
builder.field("indices_total_size_bytes", indicesBytes);
builder.field("max_heap_bytes", heapBytes);
builder.field("mem_total_bytes", memBytes);
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.stats;

import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.List;
import java.util.Set;

/**
* Trimmed down cluster stats response for reporting to a remote cluster.
*/
public class RemoteClusterStatsResponse extends BaseNodesResponse<ClusterStatsNodeResponse> {
final String clusterUUID;
final ClusterHealthStatus status;
private final Set<String> versions;
private final long nodesCount;
private final long shardsCount;
private final long indicesCount;
private final long indicesBytes;
private final long heapBytes;
private final long memBytes;
private String remoteName;

public Set<String> getVersions() {
return versions;
}

public long getNodesCount() {
return nodesCount;
}

public long getShardsCount() {
return shardsCount;
}

public long getIndicesCount() {
return indicesCount;
}

public long getIndicesBytes() {
return indicesBytes;
}

public long getHeapBytes() {
return heapBytes;
}

public long getMemBytes() {
return memBytes;
}

public String getRemoteName() {
return remoteName;
}

public void setRemoteName(String remoteName) {
this.remoteName = remoteName;
}

public RemoteClusterStatsResponse(
ClusterName clusterName,
String clusterUUID,
ClusterHealthStatus status,
Set<String> versions,
long nodesCount,
long shardsCount,
long indicesCount,
long indicesBytes,
long heapBytes,
long memBytes
) {
super(clusterName, List.of(), List.of());
this.clusterUUID = clusterUUID;
this.status = status;
this.versions = versions;
this.nodesCount = nodesCount;
this.shardsCount = shardsCount;
this.indicesCount = indicesCount;
this.indicesBytes = indicesBytes;
this.heapBytes = heapBytes;
this.memBytes = memBytes;
}

public String getClusterUUID() {
return this.clusterUUID;
}

public ClusterHealthStatus getStatus() {
return this.status;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(clusterUUID);
status.writeTo(out);
out.writeStringCollection(versions);
out.writeLong(nodesCount);
out.writeLong(shardsCount);
out.writeLong(indicesCount);
out.writeLong(indicesBytes);
out.writeLong(heapBytes);
out.writeLong(memBytes);
}

public RemoteClusterStatsResponse(StreamInput in) throws IOException {
super(in);
this.clusterUUID = in.readString();
this.status = ClusterHealthStatus.readFrom(in);
this.versions = in.readCollectionAsSet(StreamInput::readString);
this.nodesCount = in.readLong();
this.shardsCount = in.readLong();
this.indicesCount = in.readLong();
this.indicesBytes = in.readLong();
this.heapBytes = in.readLong();
this.memBytes = in.readLong();
}

@Override
protected List<ClusterStatsNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return List.of();
}

@Override
protected void writeNodesTo(StreamOutput out, List<ClusterStatsNodeResponse> nodes) throws IOException {}

/**
* Default empty response, can be used in case the cluster did not respond.
*/
public static final RemoteClusterStatsResponse EMPTY = new RemoteClusterStatsResponse(
ClusterName.DEFAULT,
"",
ClusterHealthStatus.RED,
Set.of(),
0,
0,
0,
0,
0,
0
);
}
Loading

0 comments on commit e2f4573

Please sign in to comment.