Skip to content

Commit

Permalink
[ML-DataFrame] make checkpointing more robust (#44344) (#44415)
Browse files Browse the repository at this point in the history
make checkpointing more robust:

- do not let checkpointing fail if indexes got deleted
- treat missing seqNoStats as just created indices (checkpoint 0)
- loglevel: do not treat failed updated checks as error

fixes #43992
  • Loading branch information
Hendrik Muhs committed Jul 16, 2019
1 parent 92cc913 commit f05cc83
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,18 +279,16 @@ public static long getBehind(DataFrameTransformCheckpoint oldCheckpoint, DataFra
throw new IllegalArgumentException("old checkpoint is newer than new checkpoint");
}

// all old indices must be contained in the new ones but not vice versa
if (newCheckpoint.indicesCheckpoints.keySet().containsAll(oldCheckpoint.indicesCheckpoints.keySet()) == false) {
return -1L;
}

// get the sum of of shard checkpoints
// note: we require shard checkpoints to strictly increase and never decrease
long oldCheckPointSum = 0;
long newCheckPointSum = 0;

for (long[] v : oldCheckpoint.indicesCheckpoints.values()) {
oldCheckPointSum += Arrays.stream(v).sum();
for (Entry<String, long[]> entry : oldCheckpoint.indicesCheckpoints.entrySet()) {
// ignore entries that aren't part of newCheckpoint, e.g. deleted indices
if (newCheckpoint.indicesCheckpoints.containsKey(entry.getKey())) {
oldCheckPointSum += Arrays.stream(entry.getValue()).sum();
}
}

for (long[] v : newCheckpoint.indicesCheckpoints.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ public void testGetBehind() {
checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
assertEquals((indices - 1) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));

// remove 1st index from new, now old has 1 index more, behind can not be calculated
// remove 1st index from new, now old has 1 index more, which should be ignored
checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey());
assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));

assertEquals((indices - 2) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew));
}

private static Map<String, long[]> randomCheckpointsByIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
Expand Down Expand Up @@ -94,7 +93,8 @@ public void getCheckpoint(DataFrameTransformConfig transformConfig, long checkpo
// 1st get index to see the indexes the user has access to
GetIndexRequest getIndexRequest = new GetIndexRequest()
.indices(transformConfig.getSource().getIndex())
.features(new GetIndexRequest.Feature[0]);
.features(new GetIndexRequest.Feature[0])
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);

ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, GetIndexAction.INSTANCE,
getIndexRequest, ActionListener.wrap(getIndexResponse -> {
Expand All @@ -105,29 +105,27 @@ public void getCheckpoint(DataFrameTransformConfig transformConfig, long checkpo
IndicesStatsAction.INSTANCE,
new IndicesStatsRequest()
.indices(transformConfig.getSource().getIndex())
.clear(),
.clear()
.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN),
ActionListener.wrap(
response -> {
if (response.getFailedShards() != 0) {
listener.onFailure(
new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards"));
return;
}
try {
Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(),
timestamp,
checkpoint,
checkpointsByIndex,
timeUpperBound));
} catch (CheckpointException checkpointException) {
listener.onFailure(checkpointException);
}

Map<String, long[]> checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices);
listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(),
timestamp,
checkpoint,
checkpointsByIndex,
timeUpperBound));
},
listener::onFailure
e-> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))
));
},
listener::onFailure
e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e))
));

}
Expand Down Expand Up @@ -223,38 +221,44 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri

for (ShardStats shard : shards) {
String indexName = shard.getShardRouting().getIndexName();

if (userIndices.contains(indexName)) {
SeqNoStats seqNoStats = shard.getSeqNoStats();
// SeqNoStats could be `null`. This indicates that an `AlreadyClosed` exception was thrown somewhere down the stack
// Indicates that the index COULD be closed, or at least that the shard is not fully recovered yet.
if (seqNoStats == null) {
logger.warn("failure gathering checkpoint information for index [{}] as seq_no_stats were null. Shard Stats [{}]",
indexName,
Strings.toString(shard));
throw new CheckpointException(
"Unable to gather checkpoint information for index [" + indexName + "]. seq_no_stats are missing.");
}
// SeqNoStats could be `null`, assume the global checkpoint to be 0 in this case
long globalCheckpoint = shard.getSeqNoStats() == null ? 0 : shard.getSeqNoStats().getGlobalCheckpoint();
if (checkpointsByIndex.containsKey(indexName)) {
// we have already seen this index, just check/add shards
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
if (checkpoints.containsKey(shard.getShardRouting().getId())) {
// there is already a checkpoint entry for this index/shard combination, check if they match
if (checkpoints.get(shard.getShardRouting().getId()) != shard.getSeqNoStats().getGlobalCheckpoint()) {
if (checkpoints.get(shard.getShardRouting().getId()) != globalCheckpoint) {
throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id ["
+ shard.getShardRouting().getId() + "]");
}
} else {
// 1st time we see this shard for this index, add the entry for the shard
checkpoints.put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint());
checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint);
}
} else {
// 1st time we see this index, create an entry for the index and add the shard checkpoint
checkpointsByIndex.put(indexName, new TreeMap<>());
checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint());
checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), globalCheckpoint);
}
}
}

// checkpoint extraction is done in 2 steps:
// 1. GetIndexRequest to retrieve the indices the user has access to
// 2. IndicesStatsRequest to retrieve stats about indices
// between 1 and 2 indices could get deleted or created
if (logger.isDebugEnabled()) {
Set<String> userIndicesClone = new HashSet<>(userIndices);

userIndicesClone.removeAll(checkpointsByIndex.keySet());
if (userIndicesClone.isEmpty() == false) {
logger.debug("Original set of user indices contained more indexes [{}]", userIndicesClone);
}
}

// create the final structure
Map<String, long[]> checkpointsByIndexReduced = new TreeMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,13 @@ public boolean sourceHasChanged() {
}
}, e -> {
changed.set(false);
logger.error("failure in update check", e);
logger.warn(
"Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check",
e);

auditor.warning(transformId,
"Failed to detect changes for data frame transform, skipping update till next check. Exception: "
+ e.getMessage());
}), latch));

try {
Expand All @@ -773,7 +779,11 @@ public boolean sourceHasChanged() {
return changed.get();
}
} catch (InterruptedException e) {
logger.error("Failed to check for update", e);
logger.warn("Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check", e);

auditor.warning(transformId,
"Failed to detect changes for data frame transform, skipping update till next check. Exception: "
+ e.getMessage());
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,24 @@ public void testExtractIndexCheckpoints() {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();

ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false);
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, false);

Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);

assertEquals(expectedCheckpoints.size(), checkpoints.size());
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());

// low-level compare
for (Entry<String, long[]> entry : expectedCheckpoints.entrySet()) {
assertTrue(Arrays.equals(entry.getValue(), checkpoints.get(entry.getKey())));
}
}

public void testExtractIndexCheckpointsMissingSeqNoStats() {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();

ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, true);

Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);

Expand All @@ -69,7 +86,7 @@ public void testExtractIndexCheckpointsLostPrimaries() {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();

ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false);
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false, false);

Map<String, long[]> checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices);

Expand All @@ -86,7 +103,7 @@ public void testExtractIndexCheckpointsInconsistentGlobalCheckpoints() {
Map<String, long[]> expectedCheckpoints = new HashMap<>();
Set<String> indices = randomUserIndices();

ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true);
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false);

// fail
CheckpointException e = expectThrows(CheckpointException.class,
Expand Down Expand Up @@ -120,17 +137,20 @@ private static Set<String> randomUserIndices() {
* @param userIndices set of indices that are visible
* @param skipPrimaries whether some shards do not have a primary shard at random
* @param inconsistentGlobalCheckpoints whether to introduce inconsistent global checkpoints
* @param missingSeqNoStats whether some indices miss SeqNoStats
* @return array of ShardStats
*/
private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedCheckpoints, Set<String> userIndices,
boolean skipPrimaries, boolean inconsistentGlobalCheckpoints) {
boolean skipPrimaries, boolean inconsistentGlobalCheckpoints, boolean missingSeqNoStats) {

// always create the full list
List<Index> indices = new ArrayList<>();
indices.add(new Index("index-1", UUIDs.randomBase64UUID(random())));
indices.add(new Index("index-2", UUIDs.randomBase64UUID(random())));
indices.add(new Index("index-3", UUIDs.randomBase64UUID(random())));

String missingSeqNoStatsIndex = randomFrom(userIndices);

List<ShardStats> shardStats = new ArrayList<>();
for (final Index index : indices) {
int numShards = randomIntBetween(1, 5);
Expand Down Expand Up @@ -160,8 +180,15 @@ private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedC
long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L);
long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint);

final SeqNoStats validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
checkpoints.add(globalCheckpoint);
SeqNoStats validSeqNoStats = null;

// add broken seqNoStats if requested
if (missingSeqNoStats && index.getName().equals(missingSeqNoStatsIndex)) {
checkpoints.add(0L);
} else {
validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
checkpoints.add(globalCheckpoint);
}

for (int replica = 0; replica < numShardCopies; replica++) {
ShardId shardId = new ShardId(index, shardIndex);
Expand Down

0 comments on commit f05cc83

Please sign in to comment.