Skip to content

Commit

Permalink
[CCR] Add time since last auto follow fetch to auto follow stats (#36542
Browse files Browse the repository at this point in the history
)

For each remote cluster the auto follow coordinator, starts an auto
follower that checks the remote cluster state and determines whether an
index needs to be auto followed. The time since last auto follow is
reported per remote cluster and gives insight whether the auto follow
process is alive.

Relates to #33007
Originates from #35895
  • Loading branch information
martijnvg committed Dec 17, 2018
1 parent e187710 commit 789cc8c
Show file tree
Hide file tree
Showing 11 changed files with 379 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public final class AutoFollowStats {
static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors");
static final ParseField LEADER_INDEX = new ParseField("leader_index");
static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception");
static final ParseField AUTO_FOLLOWED_CLUSTERS = new ParseField("auto_followed_clusters");
static final ParseField CLUSTER_NAME = new ParseField("cluster_name");
static final ParseField TIME_SINCE_LAST_CHECK_MILLIS = new ParseField("time_since_last_check_millis");
static final ParseField LAST_SEEN_METADATA_VERSION = new ParseField("last_seen_metadata_version");

@SuppressWarnings("unchecked")
static final ConstructingObjectParser<AutoFollowStats, Void> STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats",
Expand All @@ -48,6 +52,10 @@ public final class AutoFollowStats {
(Long) args[2],
new TreeMap<>(
((List<Map.Entry<String, ElasticsearchException>>) args[3])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
new TreeMap<>(
((List<Map.Entry<String, AutoFollowedCluster>>) args[4])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
));
Expand All @@ -57,33 +65,47 @@ public final class AutoFollowStats {
"auto_follow_stats_errors",
args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1]));

private static final ConstructingObjectParser<Map.Entry<String, AutoFollowedCluster>, Void> AUTO_FOLLOWED_CLUSTERS_PARSER =
new ConstructingObjectParser<>(
"auto_followed_clusters",
args -> new AbstractMap.SimpleEntry<>((String) args[0], new AutoFollowedCluster((Long) args[1], (Long) args[2])));

static {
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject(
ConstructingObjectParser.constructorArg(),
(p, c) -> ElasticsearchException.fromXContent(p),
AUTO_FOLLOW_EXCEPTION);

AUTO_FOLLOWED_CLUSTERS_PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_NAME);
AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_CHECK_MILLIS);
AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_SEEN_METADATA_VERSION);

STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED);
STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS);
STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED);
STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER,
RECENT_AUTO_FOLLOW_ERRORS);
STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOWED_CLUSTERS_PARSER,
AUTO_FOLLOWED_CLUSTERS);
}

private final long numberOfFailedFollowIndices;
private final long numberOfFailedRemoteClusterStateRequests;
private final long numberOfSuccessfulFollowIndices;
private final NavigableMap<String, ElasticsearchException> recentAutoFollowErrors;
private final NavigableMap<String, AutoFollowedCluster> autoFollowedClusters;

AutoFollowStats(long numberOfFailedFollowIndices,
long numberOfFailedRemoteClusterStateRequests,
long numberOfSuccessfulFollowIndices,
NavigableMap<String, ElasticsearchException> recentAutoFollowErrors) {
NavigableMap<String, ElasticsearchException> recentAutoFollowErrors,
NavigableMap<String, AutoFollowedCluster> autoFollowedClusters) {
this.numberOfFailedFollowIndices = numberOfFailedFollowIndices;
this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests;
this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices;
this.recentAutoFollowErrors = recentAutoFollowErrors;
this.autoFollowedClusters = autoFollowedClusters;
}

public long getNumberOfFailedFollowIndices() {
Expand All @@ -102,4 +124,27 @@ public NavigableMap<String, ElasticsearchException> getRecentAutoFollowErrors()
return recentAutoFollowErrors;
}

public NavigableMap<String, AutoFollowedCluster> getAutoFollowedClusters() {
return autoFollowedClusters;
}

public static class AutoFollowedCluster {

private final long timeSinceLastCheckMillis;
private final long lastSeenMetadataVersion;

public AutoFollowedCluster(long timeSinceLastCheckMillis, long lastSeenMetadataVersion) {
this.timeSinceLastCheckMillis = timeSinceLastCheckMillis;
this.lastSeenMetadataVersion = lastSeenMetadataVersion;
}

public long getTimeSinceLastCheckMillis() {
return timeSinceLastCheckMillis;
}

public long getLastSeenMetadataVersion() {
return lastSeenMetadataVersion;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.ccr;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.client.ccr.AutoFollowStats.AutoFollowedCluster;
import org.elasticsearch.client.ccr.IndicesFollowStats.ShardFollowStats;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand Down Expand Up @@ -185,6 +186,19 @@ private static void toXContent(CcrStatsResponse response, XContentBuilder builde
builder.endObject();
}
builder.endArray();
builder.startArray(AutoFollowStats.AUTO_FOLLOWED_CLUSTERS.getPreferredName());
for (Map.Entry<String, AutoFollowedCluster> entry : autoFollowStats.getAutoFollowedClusters().entrySet()) {
builder.startObject();
{
builder.field(AutoFollowStats.CLUSTER_NAME.getPreferredName(), entry.getKey());
builder.field(AutoFollowStats.TIME_SINCE_LAST_CHECK_MILLIS.getPreferredName(),
entry.getValue().getTimeSinceLastCheckMillis());
builder.field(AutoFollowStats.LAST_SEEN_METADATA_VERSION.getPreferredName(),
entry.getValue().getLastSeenMetadataVersion());
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();

Expand Down Expand Up @@ -315,11 +329,16 @@ private static AutoFollowStats randomAutoFollowStats() {
for (int i = 0; i < count; i++) {
readExceptions.put("" + i, new ElasticsearchException(new IllegalStateException("index [" + i + "]")));
}
final NavigableMap<String, AutoFollowedCluster> autoFollowClusters = new TreeMap<>();
for (int i = 0; i < count; i++) {
autoFollowClusters.put("" + i, new AutoFollowedCluster(randomLong(), randomNonNegativeLong()));
}
return new AutoFollowStats(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
readExceptions
readExceptions,
autoFollowClusters
);
}

Expand Down
4 changes: 3 additions & 1 deletion docs/reference/ccr/apis/get-ccr-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ The API returns the following results:
"number_of_failed_follow_indices" : 0,
"number_of_failed_remote_cluster_state_requests" : 0,
"number_of_successful_follow_indices" : 1,
"recent_auto_follow_errors" : []
"recent_auto_follow_errors" : [],
"auto_followed_clusters" : []
},
"follow_stats" : {
"indices" : [
Expand Down Expand Up @@ -151,6 +152,7 @@ The API returns the following results:
// TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/]
// TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/]
// TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/]
// TESTRESPONSE[s/"auto_followed_clusters" : \[\]/"auto_followed_clusters" : $body.auto_follow_stats.auto_followed_clusters/]
// TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/]
// TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/]
// TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ccr.AutoFollowStats.AutoFollowedCluster;

/**
* A component that runs only on the elected master node and follows leader indices automatically
* if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
Expand All @@ -65,6 +68,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private final Client client;
private final ClusterService clusterService;
private final CcrLicenseChecker ccrLicenseChecker;
private final LongSupplier relativeMillisTimeProvider;

private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();

Expand All @@ -77,10 +81,13 @@ public class AutoFollowCoordinator implements ClusterStateListener {
public AutoFollowCoordinator(
Client client,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker) {
CcrLicenseChecker ccrLicenseChecker,
LongSupplier relativeMillisTimeProvider) {

this.client = client;
this.clusterService = clusterService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
this.relativeMillisTimeProvider = relativeMillisTimeProvider;
clusterService.addListener(this);
this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
@Override
Expand All @@ -91,11 +98,26 @@ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchExcepti
}

public synchronized AutoFollowStats getStats() {
final Map<String, AutoFollower> autoFollowers = this.autoFollowers;
final TreeMap<String, AutoFollowedCluster> timesSinceLastAutoFollowPerRemoteCluster = new TreeMap<>();
for (Map.Entry<String, AutoFollower> entry : autoFollowers.entrySet()) {
long lastAutoFollowTimeInMillis = entry.getValue().lastAutoFollowTimeInMillis;
long lastSeenMetadataVersion = entry.getValue().metadataVersion;
if (lastAutoFollowTimeInMillis != -1) {
long timeSinceLastCheckInMillis = relativeMillisTimeProvider.getAsLong() - lastAutoFollowTimeInMillis;
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(),
new AutoFollowedCluster(timeSinceLastCheckInMillis, lastSeenMetadataVersion));
} else {
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), new AutoFollowedCluster(-1L, lastSeenMetadataVersion));
}
}

return new AutoFollowStats(
numberOfFailedIndicesAutoFollowed,
numberOfFailedRemoteClusterStateRequests,
numberOfSuccessfulIndicesAutoFollowed,
new TreeMap<>(recentAutoFollowErrors)
new TreeMap<>(recentAutoFollowErrors),
timesSinceLastAutoFollowPerRemoteCluster
);
}

Expand Down Expand Up @@ -144,7 +166,8 @@ void updateAutoFollowers(ClusterState followerClusterState) {

Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
for (String remoteCluster : newRemoteClusters) {
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {
AutoFollower autoFollower =
new AutoFollower(remoteCluster, this::updateStats, clusterService::state, relativeMillisTimeProvider) {

@Override
void getRemoteClusterState(final String remoteCluster,
Expand Down Expand Up @@ -237,20 +260,25 @@ abstract static class AutoFollower {
private final String remoteCluster;
private final Consumer<List<AutoFollowResult>> statsUpdater;
private final Supplier<ClusterState> followerClusterStateSupplier;
private final LongSupplier relativeTimeProvider;

private volatile long lastAutoFollowTimeInMillis = -1;
private volatile long metadataVersion = 0;
private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults;

AutoFollower(final String remoteCluster,
final Consumer<List<AutoFollowResult>> statsUpdater,
final Supplier<ClusterState> followerClusterStateSupplier) {
final Supplier<ClusterState> followerClusterStateSupplier,
LongSupplier relativeTimeProvider) {
this.remoteCluster = remoteCluster;
this.statsUpdater = statsUpdater;
this.followerClusterStateSupplier = followerClusterStateSupplier;
this.relativeTimeProvider = relativeTimeProvider;
}

void start() {
lastAutoFollowTimeInMillis = relativeTimeProvider.getAsLong();
final ClusterState clusterState = followerClusterStateSupplier.get();
final AutoFollowMetadata autoFollowMetadata = clusterState.metaData().custom(AutoFollowMetadata.TYPE);
if (autoFollowMetadata == null) {
Expand Down
Loading

0 comments on commit 789cc8c

Please sign in to comment.