diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java index 8b1cf8b42e3d8..ec99827fdcdab 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpoint.java @@ -279,18 +279,16 @@ public static long getBehind(DataFrameTransformCheckpoint oldCheckpoint, DataFra throw new IllegalArgumentException("old checkpoint is newer than new checkpoint"); } - // all old indices must be contained in the new ones but not vice versa - if (newCheckpoint.indicesCheckpoints.keySet().containsAll(oldCheckpoint.indicesCheckpoints.keySet()) == false) { - return -1L; - } - // get the sum of of shard checkpoints // note: we require shard checkpoints to strictly increase and never decrease long oldCheckPointSum = 0; long newCheckPointSum = 0; - for (long[] v : oldCheckpoint.indicesCheckpoints.values()) { - oldCheckPointSum += Arrays.stream(v).sum(); + for (Entry entry : oldCheckpoint.indicesCheckpoints.entrySet()) { + // ignore entries that aren't part of newCheckpoint, e.g. deleted indices + if (newCheckpoint.indicesCheckpoints.containsKey(entry.getKey())) { + oldCheckPointSum += Arrays.stream(entry.getValue()).sum(); + } } for (long[] v : newCheckpoint.indicesCheckpoints.values()) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java index ad4f068870ba6..67cc4b91584c2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformCheckpointTests.java @@ -162,9 +162,10 @@ public void testGetBehind() { checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey()); assertEquals((indices - 1) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew)); - // remove 1st index from new, now old has 1 index more, behind can not be calculated + // remove 1st index from new, now old has 1 index more, which should be ignored checkpointsByIndexNew.remove(checkpointsByIndexNew.firstKey()); - assertEquals(-1L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew)); + + assertEquals((indices - 2) * shards * 10L, DataFrameTransformCheckpoint.getBehind(checkpointOld, checkpointTransientNew)); } private static Map randomCheckpointsByIndex() { diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java index d028d3248c8f6..831655163ab25 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java @@ -14,9 +14,8 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; -import org.elasticsearch.common.Strings; -import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats; @@ -94,7 +93,8 @@ public void getCheckpoint(DataFrameTransformConfig transformConfig, long checkpo // 1st get index to see the indexes the user has access to GetIndexRequest getIndexRequest = new GetIndexRequest() .indices(transformConfig.getSource().getIndex()) - .features(new GetIndexRequest.Feature[0]); + .features(new GetIndexRequest.Feature[0]) + .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); ClientHelper.executeWithHeadersAsync(transformConfig.getHeaders(), ClientHelper.DATA_FRAME_ORIGIN, client, GetIndexAction.INSTANCE, getIndexRequest, ActionListener.wrap(getIndexResponse -> { @@ -105,7 +105,8 @@ public void getCheckpoint(DataFrameTransformConfig transformConfig, long checkpo IndicesStatsAction.INSTANCE, new IndicesStatsRequest() .indices(transformConfig.getSource().getIndex()) - .clear(), + .clear() + .indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN), ActionListener.wrap( response -> { if (response.getFailedShards() != 0) { @@ -113,21 +114,18 @@ public void getCheckpoint(DataFrameTransformConfig transformConfig, long checkpo new CheckpointException("Source has [" + response.getFailedShards() + "] failed shards")); return; } - try { - Map checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices); - listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(), - timestamp, - checkpoint, - checkpointsByIndex, - timeUpperBound)); - } catch (CheckpointException checkpointException) { - listener.onFailure(checkpointException); - } + + Map checkpointsByIndex = extractIndexCheckPoints(response.getShards(), userIndices); + listener.onResponse(new DataFrameTransformCheckpoint(transformConfig.getId(), + timestamp, + checkpoint, + checkpointsByIndex, + timeUpperBound)); }, - listener::onFailure + e-> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)) )); }, - listener::onFailure + e -> listener.onFailure(new CheckpointException("Failed to create checkpoint", e)) )); } @@ -223,38 +221,44 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set checkpoints = checkpointsByIndex.get(indexName); if (checkpoints.containsKey(shard.getShardRouting().getId())) { // there is already a checkpoint entry for this index/shard combination, check if they match - if (checkpoints.get(shard.getShardRouting().getId()) != shard.getSeqNoStats().getGlobalCheckpoint()) { + if (checkpoints.get(shard.getShardRouting().getId()) != globalCheckpoint) { throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id [" + shard.getShardRouting().getId() + "]"); } } else { // 1st time we see this shard for this index, add the entry for the shard - checkpoints.put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint()); + checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint); } } else { // 1st time we see this index, create an entry for the index and add the shard checkpoint checkpointsByIndex.put(indexName, new TreeMap<>()); - checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), shard.getSeqNoStats().getGlobalCheckpoint()); + checkpointsByIndex.get(indexName).put(shard.getShardRouting().getId(), globalCheckpoint); } } } + // checkpoint extraction is done in 2 steps: + // 1. GetIndexRequest to retrieve the indices the user has access to + // 2. IndicesStatsRequest to retrieve stats about indices + // between 1 and 2 indices could get deleted or created + if (logger.isDebugEnabled()) { + Set userIndicesClone = new HashSet<>(userIndices); + + userIndicesClone.removeAll(checkpointsByIndex.keySet()); + if (userIndicesClone.isEmpty() == false) { + logger.debug("Original set of user indices contained more indexes [{}]", userIndicesClone); + } + } + // create the final structure Map checkpointsByIndexReduced = new TreeMap<>(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 8542222e4bd59..3b9f47ba39795 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -764,7 +764,13 @@ public boolean sourceHasChanged() { } }, e -> { changed.set(false); - logger.error("failure in update check", e); + logger.warn( + "Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check", + e); + + auditor.warning(transformId, + "Failed to detect changes for data frame transform, skipping update till next check. Exception: " + + e.getMessage()); }), latch)); try { @@ -773,7 +779,11 @@ public boolean sourceHasChanged() { return changed.get(); } } catch (InterruptedException e) { - logger.error("Failed to check for update", e); + logger.warn("Failed to detect changes for data frame transform [" + transformId + "], skipping update till next check", e); + + auditor.warning(transformId, + "Failed to detect changes for data frame transform, skipping update till next check. Exception: " + + e.getMessage()); } return false; diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointServiceTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointServiceTests.java index 9cc2769e7d149..c8d9be89611d4 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointServiceTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointServiceTests.java @@ -52,7 +52,24 @@ public void testExtractIndexCheckpoints() { Map expectedCheckpoints = new HashMap<>(); Set indices = randomUserIndices(); - ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false); + ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, false); + + Map checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices); + + assertEquals(expectedCheckpoints.size(), checkpoints.size()); + assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); + + // low-level compare + for (Entry entry : expectedCheckpoints.entrySet()) { + assertTrue(Arrays.equals(entry.getValue(), checkpoints.get(entry.getKey()))); + } + } + + public void testExtractIndexCheckpointsMissingSeqNoStats() { + Map expectedCheckpoints = new HashMap<>(); + Set indices = randomUserIndices(); + + ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, false, false, true); Map checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices); @@ -69,7 +86,7 @@ public void testExtractIndexCheckpointsLostPrimaries() { Map expectedCheckpoints = new HashMap<>(); Set indices = randomUserIndices(); - ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false); + ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, true, false, false); Map checkpoints = DataFrameTransformsCheckpointService.extractIndexCheckPoints(shardStatsArray, indices); @@ -86,7 +103,7 @@ public void testExtractIndexCheckpointsInconsistentGlobalCheckpoints() { Map expectedCheckpoints = new HashMap<>(); Set indices = randomUserIndices(); - ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true); + ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false); // fail CheckpointException e = expectThrows(CheckpointException.class, @@ -120,10 +137,11 @@ private static Set randomUserIndices() { * @param userIndices set of indices that are visible * @param skipPrimaries whether some shards do not have a primary shard at random * @param inconsistentGlobalCheckpoints whether to introduce inconsistent global checkpoints + * @param missingSeqNoStats whether some indices miss SeqNoStats * @return array of ShardStats */ private static ShardStats[] createRandomShardStats(Map expectedCheckpoints, Set userIndices, - boolean skipPrimaries, boolean inconsistentGlobalCheckpoints) { + boolean skipPrimaries, boolean inconsistentGlobalCheckpoints, boolean missingSeqNoStats) { // always create the full list List indices = new ArrayList<>(); @@ -131,6 +149,8 @@ private static ShardStats[] createRandomShardStats(Map expectedC indices.add(new Index("index-2", UUIDs.randomBase64UUID(random()))); indices.add(new Index("index-3", UUIDs.randomBase64UUID(random()))); + String missingSeqNoStatsIndex = randomFrom(userIndices); + List shardStats = new ArrayList<>(); for (final Index index : indices) { int numShards = randomIntBetween(1, 5); @@ -160,8 +180,15 @@ private static ShardStats[] createRandomShardStats(Map expectedC long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L); long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint); - final SeqNoStats validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); - checkpoints.add(globalCheckpoint); + SeqNoStats validSeqNoStats = null; + + // add broken seqNoStats if requested + if (missingSeqNoStats && index.getName().equals(missingSeqNoStatsIndex)) { + checkpoints.add(0L); + } else { + validSeqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); + checkpoints.add(globalCheckpoint); + } for (int replica = 0; replica < numShardCopies; replica++) { ShardId shardId = new ShardId(index, shardIndex);