Skip to content

Commit

Permalink
Add follower index to CCR monitoring and status (elastic#33645)
Browse files Browse the repository at this point in the history
This commit adds the follower index to CCR shard follow task status, and
to monitoring.
  • Loading branch information
jasontedor committed Sep 12, 2018
1 parent e2458bd commit d5bde72
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testFollowIndex() throws Exception {
createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
assertThat(countCcrNodeTasks(), equalTo(5));
assertBusy(() -> verifyCcrMonitoring(allowedIndex));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
// Make sure that there are no other ccr relates operations running:
assertBusy(() -> {
Expand Down Expand Up @@ -206,7 +206,7 @@ private static boolean indexExists(RestClient client, String index) throws IOExc
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}

private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException {
ensureYellow(".monitoring-*");

Request request = new Request("GET", "/.monitoring-*/_search");
Expand All @@ -222,7 +222,10 @@ private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOExc
for (int i = 0; i < hits.size(); i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
assertThat(leaderIndex, endsWith(leaderIndex));
assertThat(leaderIndex, endsWith(expectedLeaderIndex));

final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit);
assertThat(followerIndex, equalTo(expectedFollowerIndex));

int foundNumberOfOperationsReceived =
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testFollowIndex() throws Exception {
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
}
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
assertBusy(() -> verifyCcrMonitoring(leaderIndexName));
assertBusy(() -> verifyCcrMonitoring(leaderIndexName, followIndexName));
}
}

Expand Down Expand Up @@ -107,7 +107,7 @@ public void testAutoFollowPatterns() throws Exception {
ensureYellow("logs-20190101");
verifyDocuments("logs-20190101", 5);
});
assertBusy(() -> verifyCcrMonitoring("logs-20190101"));
assertBusy(() -> verifyCcrMonitoring("logs-20190101", "logs-20190101"));
}

private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
Expand Down Expand Up @@ -159,7 +159,7 @@ private static void verifyDocuments(String index, int expectedNumDocs) throws IO
}
}

private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
private static void verifyCcrMonitoring(final String expectedLeaderIndex, final String expectedFollowerIndex) throws IOException {
ensureYellow(".monitoring-*");

Request request = new Request("GET", "/.monitoring-*/_search");
Expand All @@ -175,7 +175,10 @@ private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOExc
for (int i = 0; i < hits.size(); i++) {
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
assertThat(leaderIndex, endsWith(leaderIndex));
assertThat(leaderIndex, endsWith(expectedLeaderIndex));

final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit);
assertThat(followerIndex, equalTo(expectedFollowerIndex));

int foundNumberOfOperationsReceived =
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
}
return new ShardFollowNodeTaskStatus(
leaderIndex,
params.getFollowShardId().getIndexName(),
getFollowShardId().getId(),
leaderGlobalCheckpoint,
leaderMaxSeqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ protected void taskOperation(
final CcrStatsAction.StatsRequest request,
final ShardFollowNodeTask task,
final ActionListener<CcrStatsAction.StatsResponse> listener) {
listener.onResponse(new CcrStatsAction.StatsResponse(task.getFollowShardId(), task.getStatus()));
listener.onResponse(new CcrStatsAction.StatsResponse(task.getStatus()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ protected ShardFollowNodeTaskStatus doParseInstance(XContentParser parser) throw
protected ShardFollowNodeTaskStatus createTestInstance() {
// if you change this constructor, reflect the changes in the hand-written assertions below
return new ShardFollowNodeTaskStatus(
randomAlphaOfLength(4),
randomAlphaOfLength(4),
randomInt(),
randomNonNegativeLong(),
Expand Down Expand Up @@ -61,6 +62,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInstance, final ShardFollowNodeTaskStatus newInstance) {
assertNotSame(expectedInstance, newInstance);
assertThat(newInstance.leaderIndex(), equalTo(expectedInstance.leaderIndex()));
assertThat(newInstance.followerIndex(), equalTo(expectedInstance.followerIndex()));
assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId()));
assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint()));
assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status";

private static final ParseField LEADER_INDEX = new ParseField("leader_index");
private static final ParseField FOLLOWER_INDEX = new ParseField("follower_index");
private static final ParseField SHARD_ID = new ParseField("shard_id");
private static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
private static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField("leader_max_seq_no");
Expand Down Expand Up @@ -62,16 +63,16 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
STATUS_PARSER_NAME,
args -> new ShardFollowNodeTaskStatus(
(String) args[0],
(int) args[1],
(long) args[2],
(String) args[1],
(int) args[2],
(long) args[3],
(long) args[4],
(long) args[5],
(long) args[6],
(int) args[7],
(long) args[7],
(int) args[8],
(int) args[9],
(long) args[10],
(int) args[10],
(long) args[11],
(long) args[12],
(long) args[13],
Expand All @@ -81,11 +82,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
(long) args[17],
(long) args[18],
(long) args[19],
(long) args[20],
new TreeMap<>(
((List<Map.Entry<Long, ElasticsearchException>>) args[20])
((List<Map.Entry<Long, ElasticsearchException>>) args[21])
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
(long) args[21]));
(long) args[22]));

public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";

Expand All @@ -96,6 +98,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {

static {
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOWER_INDEX);
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD);
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD);
Expand Down Expand Up @@ -136,6 +139,12 @@ public String leaderIndex() {
return leaderIndex;
}

private final String followerIndex;

public String followerIndex() {
return followerIndex;
}

private final int shardId;

public int getShardId() {
Expand Down Expand Up @@ -264,6 +273,7 @@ public long timeSinceLastFetchMillis() {

public ShardFollowNodeTaskStatus(
final String leaderIndex,
final String followerIndex,
final int shardId,
final long leaderGlobalCheckpoint,
final long leaderMaxSeqNo,
Expand All @@ -286,6 +296,7 @@ public ShardFollowNodeTaskStatus(
final NavigableMap<Long, ElasticsearchException> fetchExceptions,
final long timeSinceLastFetchMillis) {
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
this.shardId = shardId;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.leaderMaxSeqNo = leaderMaxSeqNo;
Expand All @@ -311,6 +322,7 @@ public ShardFollowNodeTaskStatus(

public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
this.leaderIndex = in.readString();
this.followerIndex = in.readString();
this.shardId = in.readVInt();
this.leaderGlobalCheckpoint = in.readZLong();
this.leaderMaxSeqNo = in.readZLong();
Expand Down Expand Up @@ -342,6 +354,7 @@ public String getWriteableName() {
@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(leaderIndex);
out.writeString(followerIndex);
out.writeVInt(shardId);
out.writeZLong(leaderGlobalCheckpoint);
out.writeZLong(leaderMaxSeqNo);
Expand Down Expand Up @@ -377,6 +390,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa

public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException {
builder.field(LEADER_INDEX.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX.getPreferredName(), followerIndex);
builder.field(SHARD_ID.getPreferredName(), shardId);
builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo);
Expand Down Expand Up @@ -439,6 +453,7 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) return false;
final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o;
return leaderIndex.equals(that.leaderIndex) &&
followerIndex.equals(that.followerIndex) &&
shardId == that.shardId &&
leaderGlobalCheckpoint == that.leaderGlobalCheckpoint &&
leaderMaxSeqNo == that.leaderMaxSeqNo &&
Expand Down Expand Up @@ -471,6 +486,7 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(
leaderIndex,
followerIndex,
shardId,
leaderGlobalCheckpoint,
leaderMaxSeqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;

Expand Down Expand Up @@ -72,8 +71,8 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
final Map<String, Map<Integer, StatsResponse>> taskResponsesByIndex = new TreeMap<>();
for (final StatsResponse statsResponse : statsResponse) {
taskResponsesByIndex.computeIfAbsent(
statsResponse.followerShardId().getIndexName(),
k -> new TreeMap<>()).put(statsResponse.followerShardId().getId(), statsResponse);
statsResponse.status().followerIndex(),
k -> new TreeMap<>()).put(statsResponse.status().getShardId(), statsResponse);
}
builder.startObject();
{
Expand Down Expand Up @@ -152,31 +151,22 @@ public void writeTo(StreamOutput out) throws IOException {

public static class StatsResponse implements Writeable {

private final ShardId followerShardId;

public ShardId followerShardId() {
return followerShardId;
}

private final ShardFollowNodeTaskStatus status;

public ShardFollowNodeTaskStatus status() {
return status;
}

public StatsResponse(final ShardId followerShardId, final ShardFollowNodeTaskStatus status) {
this.followerShardId = followerShardId;
public StatsResponse(final ShardFollowNodeTaskStatus status) {
this.status = status;
}

public StatsResponse(final StreamInput in) throws IOException {
this.followerShardId = ShardId.readShardId(in);
this.status = new ShardFollowNodeTaskStatus(in);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
followerShardId.writeTo(out);
status.writeTo(out);
}

Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugin/core/src/main/resources/monitoring-es.json
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,9 @@
"leader_index": {
"type": "keyword"
},
"follower_index": {
"type": "keyword"
},
"shard_id": {
"type": "integer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public void testToXContent() throws IOException {
final long timeSinceLastFetchMillis = randomNonNegativeLong();
final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
"cluster_alias:leader_index",
"follower_index",
shardId,
leaderGlobalCheckpoint,
leaderMaxSeqNo,
Expand Down Expand Up @@ -139,6 +140,7 @@ public void testToXContent() throws IOException {
+ "},"
+ "\"ccr_stats\":{"
+ "\"leader_index\":\"cluster_alias:leader_index\","
+ "\"follower_index\":\"follower_index\","
+ "\"shard_id\":" + shardId + ","
+ "\"leader_global_checkpoint\":" + leaderGlobalCheckpoint + ","
+ "\"leader_max_seq_no\":" + leaderMaxSeqNo + ","
Expand Down

0 comments on commit d5bde72

Please sign in to comment.