-
Notifications
You must be signed in to change notification settings - Fork 24.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Track fetch exceptions for shard follow tasks #33047
Changes from 5 commits
3468fc2
1736935
b944503
36ad0d9
91e6ac9
477c3bf
3efdf20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -154,7 +154,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() { | |
ShardFollowTask::new), | ||
|
||
// Task statuses | ||
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.NAME, | ||
new NamedWriteableRegistry.Entry(Task.Status.class, ShardFollowNodeTask.Status.STATUS_PARSER_NAME, | ||
ShardFollowNodeTask.Status::new) | ||
); | ||
} | ||
|
@@ -166,7 +166,7 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() { | |
ShardFollowTask::fromXContent), | ||
|
||
// Task statuses | ||
new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.NAME), | ||
new NamedXContentRegistry.Entry(ShardFollowNodeTask.Status.class, new ParseField(ShardFollowNodeTask.Status.STATUS_PARSER_NAME), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this line is too long, causing a check style violation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. I pushed 477c3bf. |
||
ShardFollowNodeTask.Status::fromXContent) | ||
); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,20 +30,25 @@ | |
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; | ||
|
||
import java.io.IOException; | ||
import java.util.AbstractMap; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Comparator; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.NavigableMap; | ||
import java.util.Objects; | ||
import java.util.PriorityQueue; | ||
import java.util.Queue; | ||
import java.util.TreeMap; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Consumer; | ||
import java.util.function.LongConsumer; | ||
import java.util.function.LongSupplier; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* The node task that fetch the write operations from a leader shard and | ||
|
@@ -86,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { | |
private long numberOfFailedBulkOperations = 0; | ||
private long numberOfOperationsIndexed = 0; | ||
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo)); | ||
private final LinkedHashMap<Long, ElasticsearchException> fetchExceptions; | ||
|
||
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers, | ||
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) { | ||
|
@@ -95,6 +101,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { | |
this.relativeTimeProvider = relativeTimeProvider; | ||
this.retryTimeout = params.getRetryTimeout(); | ||
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); | ||
/* | ||
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of | ||
* concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry | ||
* when the fetch task associated with that from sequence number succeeds. | ||
*/ | ||
this.fetchExceptions = new LinkedHashMap<Long, ElasticsearchException>() { | ||
@Override | ||
protected boolean removeEldestEntry(final Map.Entry<Long, ElasticsearchException> eldest) { | ||
return size() > params.getMaxConcurrentReadBatches(); | ||
} | ||
}; | ||
} | ||
|
||
void start( | ||
|
@@ -224,6 +241,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR | |
synchronized (ShardFollowNodeTask.this) { | ||
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); | ||
numberOfSuccessfulFetches++; | ||
fetchExceptions.remove(from); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 to the approach to keep track of exceptions by from seqno in a fixed size linked hashmap. |
||
operationsReceived += response.getOperations().length; | ||
totalTransferredBytes += Arrays.stream(response.getOperations()).mapToLong(Translog.Operation::estimateSize).sum(); | ||
} | ||
|
@@ -233,6 +251,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR | |
synchronized (ShardFollowNodeTask.this) { | ||
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); | ||
numberOfFailedFetches++; | ||
fetchExceptions.put(from, new ElasticsearchException(e)); | ||
} | ||
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter)); | ||
}); | ||
|
@@ -412,12 +431,13 @@ public synchronized Status getStatus() { | |
totalIndexTimeMillis, | ||
numberOfSuccessfulBulkOperations, | ||
numberOfFailedBulkOperations, | ||
numberOfOperationsIndexed); | ||
numberOfOperationsIndexed, | ||
new TreeMap<>(fetchExceptions)); | ||
} | ||
|
||
public static class Status implements Task.Status { | ||
|
||
public static final String NAME = "shard-follow-node-task-status"; | ||
public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status"; | ||
|
||
static final ParseField SHARD_ID = new ParseField("shard_id"); | ||
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint"); | ||
|
@@ -438,8 +458,10 @@ public static class Status implements Task.Status { | |
static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField("number_of_successful_bulk_operations"); | ||
static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField("number_of_failed_bulk_operations"); | ||
static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField("number_of_operations_indexed"); | ||
static final ParseField FETCH_EXCEPTIONS = new ParseField("fetch_exceptions"); | ||
|
||
static final ConstructingObjectParser<Status, Void> PARSER = new ConstructingObjectParser<>(NAME, | ||
@SuppressWarnings("unchecked") | ||
static final ConstructingObjectParser<Status, Void> STATUS_PARSER = new ConstructingObjectParser<>(STATUS_PARSER_NAME, | ||
args -> new Status( | ||
(int) args[0], | ||
(long) args[1], | ||
|
@@ -459,28 +481,51 @@ public static class Status implements Task.Status { | |
(long) args[15], | ||
(long) args[16], | ||
(long) args[17], | ||
(long) args[18])); | ||
(long) args[18], | ||
new TreeMap<>( | ||
((List<Map.Entry<Long, ElasticsearchException>>) args[19]) | ||
.stream() | ||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))); | ||
|
||
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry"; | ||
|
||
static final ConstructingObjectParser<Map.Entry<Long, ElasticsearchException>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER = | ||
new ConstructingObjectParser<>( | ||
FETCH_EXCEPTIONS_ENTRY_PARSER_NAME, | ||
args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1])); | ||
|
||
static { | ||
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD); | ||
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); | ||
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); | ||
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); | ||
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); | ||
STATUS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_PARSER, FETCH_EXCEPTIONS); | ||
} | ||
|
||
static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no"); | ||
static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception"); | ||
|
||
static { | ||
PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_GLOBAL_CHECKPOINT_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), FOLLOWER_MAX_SEQ_NO_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_REQUESTED_SEQ_NO_FIELD); | ||
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); | ||
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); | ||
PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), OPERATIONS_RECEIVED_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_TRANSFERRED_BYTES); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_INDEX_TIME_MILLIS_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD); | ||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_OPERATIONS_INDEXED_FIELD); | ||
FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO); | ||
FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject( | ||
ConstructingObjectParser.constructorArg(), | ||
(p, c) -> ElasticsearchException.fromXContent(p), | ||
FETCH_EXCEPTIONS_ENTRY_EXCEPTION); | ||
} | ||
|
||
private final int shardId; | ||
|
@@ -597,6 +642,12 @@ public long numberOfOperationsIndexed() { | |
return numberOfOperationsIndexed; | ||
} | ||
|
||
private final NavigableMap<Long, ElasticsearchException> fetchExceptions; | ||
|
||
public NavigableMap<Long, ElasticsearchException> fetchExceptions() { | ||
return fetchExceptions; | ||
} | ||
|
||
Status( | ||
final int shardId, | ||
final long leaderGlobalCheckpoint, | ||
|
@@ -616,7 +667,8 @@ public long numberOfOperationsIndexed() { | |
final long totalIndexTimeMillis, | ||
final long numberOfSuccessfulBulkOperations, | ||
final long numberOfFailedBulkOperations, | ||
final long numberOfOperationsIndexed) { | ||
final long numberOfOperationsIndexed, | ||
final NavigableMap<Long, ElasticsearchException> fetchExceptions) { | ||
this.shardId = shardId; | ||
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; | ||
this.leaderMaxSeqNo = leaderMaxSeqNo; | ||
|
@@ -636,6 +688,7 @@ public long numberOfOperationsIndexed() { | |
this.numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations; | ||
this.numberOfFailedBulkOperations = numberOfFailedBulkOperations; | ||
this.numberOfOperationsIndexed = numberOfOperationsIndexed; | ||
this.fetchExceptions = fetchExceptions; | ||
} | ||
|
||
public Status(final StreamInput in) throws IOException { | ||
|
@@ -658,11 +711,12 @@ public Status(final StreamInput in) throws IOException { | |
this.numberOfSuccessfulBulkOperations = in.readVLong(); | ||
this.numberOfFailedBulkOperations = in.readVLong(); | ||
this.numberOfOperationsIndexed = in.readVLong(); | ||
this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException)); | ||
} | ||
|
||
@Override | ||
public String getWriteableName() { | ||
return NAME; | ||
return STATUS_PARSER_NAME; | ||
} | ||
|
||
@Override | ||
|
@@ -686,6 +740,7 @@ public void writeTo(final StreamOutput out) throws IOException { | |
out.writeVLong(numberOfSuccessfulBulkOperations); | ||
out.writeVLong(numberOfFailedBulkOperations); | ||
out.writeVLong(numberOfOperationsIndexed); | ||
out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException); | ||
} | ||
|
||
@Override | ||
|
@@ -720,13 +775,30 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa | |
builder.field(NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfSuccessfulBulkOperations); | ||
builder.field(NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD.getPreferredName(), numberOfFailedBulkOperations); | ||
builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed); | ||
builder.startArray(FETCH_EXCEPTIONS.getPreferredName()); | ||
{ | ||
for (final Map.Entry<Long, ElasticsearchException> entry : fetchExceptions.entrySet()) { | ||
builder.startObject(); | ||
{ | ||
builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey()); | ||
builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName()); | ||
builder.startObject(); | ||
{ | ||
ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue()); | ||
} | ||
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
} | ||
} | ||
builder.endArray(); | ||
} | ||
builder.endObject(); | ||
return builder; | ||
} | ||
|
||
public static Status fromXContent(final XContentParser parser) { | ||
return PARSER.apply(parser, null); | ||
return STATUS_PARSER.apply(parser, null); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah this was just missing in this base class. It does exist in the
AbstractXContentTestCase
base class.Should we add this change back into master / 6.x separately after this PR is merged or just wait for when the ccr branches are merged? I have no strong opinion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this offline. We will keep this in this branch for now, and it will come to 6.x/master when we integrate there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up opening #33114 since I have to wait for a build on #33113, and then subsequently a build here.