Skip to content

Commit

Permalink
[ML][Data Frame] pull state and states for indexer from index (#42856)
Browse files Browse the repository at this point in the history
* [ML][Data Frame] pull state and states for indexer from index

* Update DataFrameTransformTask.java
  • Loading branch information
benwtrent authored Jun 6, 2019
1 parent 024f1a5 commit 75664f6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,6 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
.setAuditor(auditor)
.setClient(client)
.setIndexerState(currentIndexerState(transformPTaskState))
// If the transform persistent task state is `null` that means this is a "first run".
// If we have state then the task has relocated from another node in which case this
// state is preferred
.setInitialPosition(transformPTaskState == null ? null : transformPTaskState.getPosition())
.setProgress(transformPTaskState == null ? null : transformPTaskState.getProgress())
.setTransformsCheckpointService(dataFrameTransformsCheckpointService)
.setTransformsConfigManager(transformsConfigManager);

Expand All @@ -132,18 +126,22 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable DataFrameTr
// Schedule execution regardless
ActionListener<DataFrameTransformStateAndStats> transformStatsActionListener = ActionListener.wrap(
stateAndStats -> {
indexerBuilder.setInitialStats(stateAndStats.getTransformStats());
if (transformPTaskState == null) { // prefer the persistent task state
indexerBuilder.setInitialPosition(stateAndStats.getTransformState().getPosition());
indexerBuilder.setProgress(stateAndStats.getTransformState().getProgress());
}

final Long checkpoint = previousCheckpoint != null ? previousCheckpoint : stateAndStats.getTransformState().getCheckpoint();
logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString());
indexerBuilder.setInitialStats(stateAndStats.getTransformStats())
.setInitialPosition(stateAndStats.getTransformState().getPosition())
.setProgress(stateAndStats.getTransformState().getProgress())
.setIndexerState(currentIndexerState(stateAndStats.getTransformState()));
logger.info("[{}] Loading existing state: [{}], position [{}]",
transformId,
stateAndStats.getTransformState(),
stateAndStats.getTransformState().getPosition());

final Long checkpoint = stateAndStats.getTransformState().getCheckpoint();
startTask(buildTask, indexerBuilder, checkpoint, startTaskListener);
},
error -> {
if (error instanceof ResourceNotFoundException == false) {
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
logger.warn("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
}
startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,10 @@ public DataFrameTransformTask(long id, String type, String action, TaskId parent
String initialReason = null;
long initialGeneration = 0;
Map<String, Object> initialPosition = null;
logger.trace("[{}] init, got state: [{}]", transform.getId(), state != null);
if (state != null) {
initialTaskState = state.getTaskState();
initialReason = state.getReason();
final IndexerState existingState = state.getIndexerState();
logger.info("[{}] Loading existing state: [{}], position [{}]", transform.getId(), existingState, state.getPosition());
if (existingState.equals(IndexerState.INDEXING)) {
// reset to started as no indexer is running
initialState = IndexerState.STARTED;
Expand Down Expand Up @@ -213,6 +211,10 @@ public synchronized void start(Long startingCheckpoint, ActionListener<Response>
getIndexer().getProgress());

logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
// This keeps track of STARTED, FAILED, STOPPED
// This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
// we could not read the previous state information from said index.
persistStateToClusterState(state, ActionListener.wrap(
task -> {
auditor.info(transform.getId(),
Expand Down Expand Up @@ -301,6 +303,10 @@ synchronized void markAsFailed(String reason, ActionListener<Void> listener) {
taskState.set(DataFrameTransformTaskState.FAILED);
stateReason.set(reason);
auditor.error(transform.getId(), reason);
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
// This keeps track of STARTED, FAILED, STOPPED
// This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
// we could not read the previous state information from said index.
persistStateToClusterState(getState(), ActionListener.wrap(
r -> listener.onResponse(null),
listener::onFailure
Expand Down

0 comments on commit 75664f6

Please sign in to comment.