Skip to content

Commit

Permalink
Refactor TransportClusterStatsAction - should not use field for the f…
Browse files Browse the repository at this point in the history
…uture

Still TODO: getting rid of actionGet
  • Loading branch information
smalyshev committed Sep 13, 2024
1 parent 47a7c4c commit 5354543
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
]
},
"params":{
"flat_settings":{
"include_remotes":{
"type":"boolean",
"description":"Return settings in flat format (default: false)"
"description":"Include remote cluster data into the response (default: false)"
},
"timeout":{
"type":"time",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
public class RemoteClusterStatsRequest extends ActionRequest {
private final String[] nodesIds;

/**
* Get stats from nodes based on the nodes ids specified. If none are passed, stats
* based on all nodes will be returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterSnapshotStats;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
Expand Down Expand Up @@ -60,7 +61,6 @@
import org.elasticsearch.transport.Transports;
import org.elasticsearch.usage.SearchUsageHolder;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.action.support.nodes.TransportNodesAction;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -92,17 +92,17 @@ public class TransportClusterStatsAction extends TransportNodesAction<
CommonStatsFlags.Flag.DenseVector,
CommonStatsFlags.Flag.SparseVector
);

private final MetadataStatsCache<MappingStats> mappingStatsCache;
private final MetadataStatsCache<AnalysisStats> analysisStatsCache;
private final RemoteClusterService remoteClusterService;
private static final Logger logger = LogManager.getLogger(TransportClusterStatsAction.class);

private final Settings settings;
private final NodeService nodeService;
private final IndicesService indicesService;
private final RepositoriesService repositoriesService;
private final SearchUsageHolder searchUsageHolder;
private final CCSUsageTelemetry ccsUsageHolder;
private final MetadataStatsCache<MappingStats> mappingStatsCache;
private final MetadataStatsCache<AnalysisStats> analysisStatsCache;
private final RemoteClusterService remoteClusterService;

@Inject
public TransportClusterStatsAction(
Expand All @@ -124,23 +124,24 @@ public TransportClusterStatsAction(
ClusterStatsNodeRequest::new,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
);
this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of);
this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of);
this.remoteClusterService = transportService.getRemoteClusterService();
this.settings = settings;
this.nodeService = nodeService;
this.indicesService = indicesService;
this.repositoriesService = repositoriesService;
this.searchUsageHolder = usageService.getSearchUsageHolder();
this.ccsUsageHolder = usageService.getCcsUsageHolder();
this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of);
this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of);
this.remoteClusterService = transportService.getRemoteClusterService();
this.settings = settings;
}

private ActionFuture<Map<String, RemoteClusterStatsResponse>> remoteFuture;

@Override
protected void doExecute(Task task, ClusterStatsRequest request, ActionListener<ClusterStatsResponse> listener) {
remoteFuture = getStatsFromRemotes(request);
super.doExecute(task, request, listener);
if (doRemotes(request)) {
super.doExecute(task, request, new ActionListenerWithRemotes(listener, request));
} else {
super.doExecute(task, request, listener);
}
}

@Override
Expand Down Expand Up @@ -169,8 +170,10 @@ protected void newResponseAsync(
clusterService.threadPool().absoluteTimeInMillis()
);

// This will wait until remotes are done if it didn't happen yet
var remoteClusterStats = getRemoteClusterStats(request);
final Map<String, ClusterStatsResponse.RemoteClusterStats> remoteClusterStats =
(listener instanceof ActionListenerWithRemotes listenerWithRemotes)
? listenerWithRemotes.getRemoteClusterStats(request)
: Map.of();

final ListenableFuture<MappingStats> mappingStatsStep = new ListenableFuture<>();
final ListenableFuture<AnalysisStats> analysisStatsStep = new ListenableFuture<>();
Expand Down Expand Up @@ -354,92 +357,117 @@ protected boolean isFresh(Long currentKey, Long newKey) {
}
}

private static boolean doRemotes(ClusterStatsRequest request) {
return CCS_TELEMETRY_FEATURE_FLAG.isEnabled() && request.doRemotes();
}
@UpdateForV9 // this can be replaced with TransportRequest.Empty in v9
public static class ClusterStatsNodeRequest extends TransportRequest {

private Map<String, ClusterStatsResponse.RemoteClusterStats> getRemoteClusterStats(ClusterStatsRequest request) {
if (doRemotes(request) == false) {
return null;
ClusterStatsNodeRequest() {}

public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
skipLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, in);
}
Map<String, ClusterStatsResponse.RemoteClusterStats> remoteClustersStats = new HashMap<>();
Map<String, RemoteClusterStatsResponse> remoteData = resolveRemoteClusterStats();

for (String clusterAlias : remoteClusterService.getRegisteredRemoteClusterNames()) {
RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias);
RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo();
RemoteClusterStatsResponse response = remoteData.get(clusterAlias);
var compression = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
var remoteClusterStats = new ClusterStatsResponse.RemoteClusterStats(
response,
remoteConnectionInfo.getModeInfo().modeName(),
remoteConnection.isSkipUnavailable(),
compression.toString()
);
remoteClustersStats.put(clusterAlias, remoteClusterStats);

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}
return remoteClustersStats;
}

private Map<String, RemoteClusterStatsResponse> resolveRemoteClusterStats() {
try {
return remoteFuture.actionGet();
} catch (ElasticsearchException e) {
logger.warn("Failed to get remote cluster stats", e);
return Map.of();
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sendLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, out);
}
}

private ActionFuture<Map<String, RemoteClusterStatsResponse>> getStatsFromRemotes(ClusterStatsRequest request) {
if (doRemotes(request) == false) {
// this will never be used since getRemoteClusterStats has the same check
return null;
private static boolean doRemotes(ClusterStatsRequest request) {
return CCS_TELEMETRY_FEATURE_FLAG.isEnabled() && request.doRemotes();
}

private class ActionListenerWithRemotes implements ActionListener<ClusterStatsResponse> {
private final ActionListener<ClusterStatsResponse> listener;
private final ActionFuture<Map<String, RemoteClusterStatsResponse>> remoteFuture;

ActionListenerWithRemotes(ActionListener<ClusterStatsResponse> listener, ClusterStatsRequest request) {
this.listener = listener;
remoteFuture = getStatsFromRemotes(request);
}

// TODO: make correct pool
final var remoteClientResponseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION);
var remotes = remoteClusterService.getRegisteredRemoteClusterNames();

var remotesFuture = new PlainActionFuture<Map<String, RemoteClusterStatsResponse>>();
var groupListener = new RemoteClusterActionListener<>(remotes.size(), remotesFuture);

for (String clusterAlias : remotes) {
var remoteRequest = new RemoteClusterStatsRequest(request.nodesIds());
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
clusterAlias,
remoteClientResponseExecutor,
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
);
remoteClusterClient.execute(
TransportRemoteClusterStatsAction.REMOTE_TYPE,
remoteRequest,
groupListener.remoteListener(clusterAlias)
);
private Map<String, RemoteClusterStatsResponse> resolveRemoteClusterStats() {
try {
return remoteFuture.actionGet();
} catch (ElasticsearchException e) {
logger.warn("Failed to get remote cluster stats", e);
return Map.of();
}
}

Map<String, ClusterStatsResponse.RemoteClusterStats> getRemoteClusterStats(ClusterStatsRequest request) {
if (remoteFuture == null) {
return Map.of();
}
Map<String, ClusterStatsResponse.RemoteClusterStats> remoteClustersStats = new HashMap<>();
Map<String, RemoteClusterStatsResponse> remoteData = resolveRemoteClusterStats();

for (String clusterAlias : remoteClusterService.getRegisteredRemoteClusterNames()) {
RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias);
RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo();
RemoteClusterStatsResponse response = remoteData.get(clusterAlias);
var compression = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings);
var remoteClusterStats = new ClusterStatsResponse.RemoteClusterStats(
response,
remoteConnectionInfo.getModeInfo().modeName(),
remoteConnection.isSkipUnavailable(),
compression.toString()
);
remoteClustersStats.put(clusterAlias, remoteClusterStats);
}
return remoteClustersStats;
}

return remotesFuture;
}
private ActionFuture<Map<String, RemoteClusterStatsResponse>> getStatsFromRemotes(ClusterStatsRequest request) {
if (doRemotes(request) == false) {
// this will never be used since getRemoteClusterStats has the same check
return null;
}

@UpdateForV9 // this can be replaced with TransportRequest.Empty in v9
public static class ClusterStatsNodeRequest extends TransportRequest {
// TODO: make correct pool
final var remoteClientResponseExecutor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION);
var remotes = remoteClusterService.getRegisteredRemoteClusterNames();
var remotesFuture = new PlainActionFuture<Map<String, RemoteClusterStatsResponse>>();

ClusterStatsNodeRequest() {}
if (remotes.isEmpty()) {
remotesFuture.onResponse(Map.of());
return remotesFuture;
}

public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
skipLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, in);
var groupListener = new RemoteClusterActionListener<>(remotes.size(), remotesFuture);

for (String clusterAlias : remotes) {
var remoteRequest = new RemoteClusterStatsRequest(request.nodesIds());
var remoteClusterClient = remoteClusterService.getRemoteClusterClient(
clusterAlias,
remoteClientResponseExecutor,
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
);
remoteClusterClient.execute(
TransportRemoteClusterStatsAction.REMOTE_TYPE,
remoteRequest,
groupListener.remoteListener(clusterAlias)
);

}

return remotesFuture;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
public void onResponse(ClusterStatsResponse response) {
listener.onResponse(response);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sendLegacyNodesRequestHeader(TransportVersions.DROP_UNUSED_NODES_REQUESTS, out);
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
* Handler action for incoming {@link RemoteClusterStatsRequest}.
* Will pass the work to {@link TransportClusterStatsAction} and return the response.
*/
public class TransportRemoteClusterStatsAction extends HandledTransportAction<
RemoteClusterStatsRequest,
RemoteClusterStatsResponse> {
public class TransportRemoteClusterStatsAction extends HandledTransportAction<RemoteClusterStatsRequest, RemoteClusterStatsResponse> {

public static final ActionType<RemoteClusterStatsResponse> TYPE = new ActionType<>("cluster:monitor/stats/remote");
public static final RemoteClusterActionType<RemoteClusterStatsResponse> REMOTE_TYPE = new RemoteClusterActionType<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String getName() {

@Override
public Set<String> supportedQueryParameters() {
return Set.of("include_remotes", "nodeId");
return Set.of("include_remotes");
}

@Override
Expand Down

0 comments on commit 5354543

Please sign in to comment.