Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCR] Add time since last auto follow fetch to auto follow stats #36542

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public final class AutoFollowStats {
static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors");
static final ParseField LEADER_INDEX = new ParseField("leader_index");
static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception");
static final ParseField AUTO_FOLLOWED_CLUSTERS = new ParseField("auto_followed_clusters");
static final ParseField CLUSTER_NAME = new ParseField("cluster_name");
static final ParseField TIME_SINCE_LAST_CHECK_MILLIS = new ParseField("time_since_last_check_millis");
static final ParseField LAST_SEEN_METADATA_VERSION = new ParseField("last_seen_metadata_version");

@SuppressWarnings("unchecked")
static final ConstructingObjectParser<AutoFollowStats, Void> STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats",
Expand All @@ -48,6 +52,10 @@ public final class AutoFollowStats {
(Long) args[2],
new TreeMap<>(
((List<Map.Entry<String, ElasticsearchException>>) args[3])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
new TreeMap<>(
((List<Map.Entry<String, AutoFollowedCluster>>) args[4])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
));
Expand All @@ -57,33 +65,47 @@ public final class AutoFollowStats {
"auto_follow_stats_errors",
args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1]));

private static final ConstructingObjectParser<Map.Entry<String, AutoFollowedCluster>, Void> AUTO_FOLLOWED_CLUSTERS_PARSER =
new ConstructingObjectParser<>(
"auto_followed_clusters",
args -> new AbstractMap.SimpleEntry<>((String) args[0], new AutoFollowedCluster((Long) args[1], (Long) args[2])));

static {
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
AUTO_FOLLOW_EXCEPTION);

AUTO_FOLLOWED_CLUSTERS_PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_NAME);
AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_CHECK_MILLIS);
AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_SEEN_METADATA_VERSION);

STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED);
STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS);
STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED);
STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER,
RECENT_AUTO_FOLLOW_ERRORS);
STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOWED_CLUSTERS_PARSER,
AUTO_FOLLOWED_CLUSTERS);
}

private final long numberOfFailedFollowIndices;
private final long numberOfFailedRemoteClusterStateRequests;
private final long numberOfSuccessfulFollowIndices;
private final NavigableMap<String, ElasticsearchException> recentAutoFollowErrors;
private final NavigableMap<String, AutoFollowedCluster> autoFollowedClusters;

AutoFollowStats(long numberOfFailedFollowIndices,
long numberOfFailedRemoteClusterStateRequests,
long numberOfSuccessfulFollowIndices,
NavigableMap<String, ElasticsearchException> recentAutoFollowErrors) {
NavigableMap<String, ElasticsearchException> recentAutoFollowErrors,
NavigableMap<String, AutoFollowedCluster> autoFollowedClusters) {
this.numberOfFailedFollowIndices = numberOfFailedFollowIndices;
this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests;
this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices;
this.recentAutoFollowErrors = recentAutoFollowErrors;
this.autoFollowedClusters = autoFollowedClusters;
}

public long getNumberOfFailedFollowIndices() {
Expand All @@ -102,4 +124,27 @@ public NavigableMap<String, ElasticsearchException> getRecentAutoFollowErrors()
return recentAutoFollowErrors;
}

public NavigableMap<String, AutoFollowedCluster> getAutoFollowedClusters() {
return autoFollowedClusters;
}

public static class AutoFollowedCluster {

private final long timeSinceLastCheckMillis;
private final long lastSeenMetadataVersion;

public AutoFollowedCluster(long timeSinceLastCheckMillis, long lastSeenMetadataVersion) {
this.timeSinceLastCheckMillis = timeSinceLastCheckMillis;
this.lastSeenMetadataVersion = lastSeenMetadataVersion;
}

public long getTimeSinceLastCheckMillis() {
return timeSinceLastCheckMillis;
}

public long getLastSeenMetadataVersion() {
return lastSeenMetadataVersion;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.ccr;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.ccr.AutoFollowStats.AutoFollowedCluster;
import org.elasticsearch.client.ccr.IndicesFollowStats.ShardFollowStats;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -185,6 +186,19 @@ private static void toXContent(CcrStatsResponse response, XContentBuilder builde
builder.endObject();
}
builder.endArray();
builder.startArray(AutoFollowStats.AUTO_FOLLOWED_CLUSTERS.getPreferredName());
for (Map.Entry<String, AutoFollowedCluster> entry : autoFollowStats.getAutoFollowedClusters().entrySet()) {
builder.startObject();
{
builder.field(AutoFollowStats.CLUSTER_NAME.getPreferredName(), entry.getKey());
builder.field(AutoFollowStats.TIME_SINCE_LAST_CHECK_MILLIS.getPreferredName(),
entry.getValue().getTimeSinceLastCheckMillis());
builder.field(AutoFollowStats.LAST_SEEN_METADATA_VERSION.getPreferredName(),
entry.getValue().getLastSeenMetadataVersion());
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();

Expand Down Expand Up @@ -315,11 +329,16 @@ private static AutoFollowStats randomAutoFollowStats() {
for (int i = 0; i < count; i++) {
readExceptions.put("" + i, new ElasticsearchException(new IllegalStateException("index [" + i + "]")));
}
final NavigableMap<String, AutoFollowedCluster> autoFollowClusters = new TreeMap<>();
for (int i = 0; i < count; i++) {
autoFollowClusters.put("" + i, new AutoFollowedCluster(randomLong(), randomNonNegativeLong()));
}
return new AutoFollowStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
readExceptions
readExceptions,
autoFollowClusters
);
}

Expand Down
4 changes: 3 additions & 1 deletion docs/reference/ccr/apis/get-ccr-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ The API returns the following results:
"number_of_failed_follow_indices" : 0,
"number_of_failed_remote_cluster_state_requests" : 0,
"number_of_successful_follow_indices" : 1,
"recent_auto_follow_errors" : []
"recent_auto_follow_errors" : [],
"auto_followed_clusters" : []
},
"follow_stats" : {
"indices" : [
Expand Down Expand Up @@ -151,6 +152,7 @@ The API returns the following results:
// TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/]
// TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/]
// TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/]
// TESTRESPONSE[s/"auto_followed_clusters" : \[\]/"auto_followed_clusters" : $body.auto_follow_stats.auto_followed_clusters/]
// TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/]
// TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/]
// TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ccr.AutoFollowStats.AutoFollowedCluster;

/**
* A component that runs only on the elected master node and follows leader indices automatically
* if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
Expand All @@ -67,6 +70,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private final Client client;
private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker;
private final LongSupplier relativeMillisTimeProvider;

private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();

Expand All @@ -79,10 +83,13 @@ public class AutoFollowCoordinator implements ClusterStateListener {
public AutoFollowCoordinator(
Client client,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker) {
CcrLicenseChecker ccrLicenseChecker,
LongSupplier relativeMillisTimeProvider) {

this.client = client;
this.clusterService = clusterService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
this.relativeMillisTimeProvider = relativeMillisTimeProvider;
clusterService.addListener(this);
this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
@Override
Expand All @@ -93,11 +100,26 @@ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchExcepti
}

public synchronized AutoFollowStats getStats() {
final Map<String, AutoFollower> autoFollowers = this.autoFollowers;
final TreeMap<String, AutoFollowedCluster> timesSinceLastAutoFollowPerRemoteCluster = new TreeMap<>();
for (Map.Entry<String, AutoFollower> entry : autoFollowers.entrySet()) {
long lastAutoFollowTimeInMillis = entry.getValue().lastAutoFollowTimeInMillis;
long lastSeenMetadataVersion = entry.getValue().metadataVersion;
if (lastAutoFollowTimeInMillis != -1) {
long timeSinceLastCheckInMillis = relativeMillisTimeProvider.getAsLong() - lastAutoFollowTimeInMillis;
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(),
new AutoFollowedCluster(timeSinceLastCheckInMillis, lastSeenMetadataVersion));
} else {
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), new AutoFollowedCluster(-1L, lastSeenMetadataVersion));
}
}

return new AutoFollowStats(
numberOfFailedIndicesAutoFollowed,
numberOfFailedRemoteClusterStateRequests,
numberOfSuccessfulIndicesAutoFollowed,
new TreeMap<>(recentAutoFollowErrors)
new TreeMap<>(recentAutoFollowErrors),
timesSinceLastAutoFollowPerRemoteCluster
);
}

Expand Down Expand Up @@ -146,7 +168,8 @@ void updateAutoFollowers(ClusterState followerClusterState) {

Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
for (String remoteCluster : newRemoteClusters) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {
AutoFollower autoFollower =
new AutoFollower(remoteCluster, this::updateStats, clusterService::state, relativeMillisTimeProvider) {

@Override
void getRemoteClusterState(final String remoteCluster,
Expand Down Expand Up @@ -239,20 +262,25 @@ abstract static class AutoFollower {
private final String remoteCluster;
private final Consumer<List<AutoFollowResult>> statsUpdater;
private final Supplier<ClusterState> followerClusterStateSupplier;
private final LongSupplier relativeTimeProvider;

private volatile long lastAutoFollowTimeInMillis = -1;
private volatile long metadataVersion = 0;
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;

AutoFollower(final String remoteCluster,
final Consumer<List<AutoFollowResult>> statsUpdater,
final Supplier<ClusterState> followerClusterStateSupplier) {
final Supplier<ClusterState> followerClusterStateSupplier,
LongSupplier relativeTimeProvider) {
this.remoteCluster = remoteCluster;
this.statsUpdater = statsUpdater;
this.followerClusterStateSupplier = followerClusterStateSupplier;
this.relativeTimeProvider = relativeTimeProvider;
}

void start() {
lastAutoFollowTimeInMillis = relativeTimeProvider.getAsLong();
final ClusterState clusterState = followerClusterStateSupplier.get();
final AutoFollowMetadata autoFollowMetadata = clusterState.metaData().custom(AutoFollowMetadata.TYPE);
if (autoFollowMetadata == null) {
Expand Down
Loading