diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 443d499dfefd1..97d4f9a818bbd 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -111,12 +111,6 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId) .setAuditor(auditor) .setClient(client) - .setIndexerState(currentIndexerState(transformPTaskState)) - // If the transform persistent task state is `null` that means this is a "first run". - // If we have state then the task has relocated from another node in which case this - // state is preferred - .setInitialPosition(transformPTaskState == null ? null : transformPTaskState.getPosition()) - .setProgress(transformPTaskState == null ? null : transformPTaskState.getProgress()) .setTransformsCheckpointService(dataFrameTransformsCheckpointService) .setTransformsConfigManager(transformsConfigManager); @@ -132,18 +126,22 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr // Schedule execution regardless ActionListener transformStatsActionListener = ActionListener.wrap( stateAndStats -> { - indexerBuilder.setInitialStats(stateAndStats.getTransformStats()); - if (transformPTaskState == null) { // prefer the persistent task state - indexerBuilder.setInitialPosition(stateAndStats.getTransformState().getPosition()); - indexerBuilder.setProgress(stateAndStats.getTransformState().getProgress()); - } - - final Long checkpoint = previousCheckpoint != null ? previousCheckpoint : stateAndStats.getTransformState().getCheckpoint(); + logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString()); + indexerBuilder.setInitialStats(stateAndStats.getTransformStats()) + .setInitialPosition(stateAndStats.getTransformState().getPosition()) + .setProgress(stateAndStats.getTransformState().getProgress()) + .setIndexerState(currentIndexerState(stateAndStats.getTransformState())); + logger.info("[{}] Loading existing state: [{}], position [{}]", + transformId, + stateAndStats.getTransformState(), + stateAndStats.getTransformState().getPosition()); + + final Long checkpoint = stateAndStats.getTransformState().getCheckpoint(); startTask(buildTask, indexerBuilder, checkpoint, startTaskListener); }, error -> { if (error instanceof ResourceNotFoundException == false) { - logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error); + logger.warn("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error); } startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index 20ef5be09e899..a5ce462926a96 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -87,12 +87,10 @@ public DataFrameTransformTask(long id, String type, String action, TaskId parent String initialReason = null; long initialGeneration = 0; Map initialPosition = null; - logger.trace("[{}] init, got state: [{}]", transform.getId(), state != null); if (state != null) { initialTaskState = state.getTaskState(); initialReason = state.getReason(); final IndexerState existingState = state.getIndexerState(); - logger.info("[{}] Loading existing state: [{}], position [{}]", transform.getId(), existingState, state.getPosition()); if (existingState.equals(IndexerState.INDEXING)) { // reset to started as no indexer is running initialState = IndexerState.STARTED; @@ -211,6 +209,10 @@ public synchronized void start(Long startingCheckpoint, ActionListener getIndexer().getProgress()); logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); + // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate + // This keeps track of STARTED, FAILED, STOPPED + // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that + // we could not read the previous state information from said index. persistStateToClusterState(state, ActionListener.wrap( task -> { auditor.info(transform.getId(), @@ -295,6 +297,10 @@ synchronized void markAsFailed(String reason, ActionListener listener) { taskState.set(DataFrameTransformTaskState.FAILED); stateReason.set(reason); auditor.error(transform.getId(), reason); + // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate + // This keeps track of STARTED, FAILED, STOPPED + // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that + // we could not read the previous state information from said index. persistStateToClusterState(getState(), ActionListener.wrap( r -> listener.onResponse(null), listener::onFailure