Skip to content

Commit

Permalink
Persistent Tasks: check the current state in waitForPersistentTaskSta…
Browse files Browse the repository at this point in the history
…tus (#935)

Add a check for the current state waitForPersistentTaskStatus before waiting for the next one. This fixes sporadic failure in testPersistentActionStatusUpdate test.

Fixes #928
  • Loading branch information
imotov authored and martijnvg committed Feb 5, 2018
1 parent b105020 commit 96e9f6f
Showing 1 changed file with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,28 +130,32 @@ public void removeTask(long taskId, PersistentTaskOperationListener listener) {
}

/**
* Waits for the persistent task with giving id (taskId) to achieve the desired status.
* Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't
* waits of it.
*/
public void waitForPersistentTaskStatus(long taskId, Predicate<PersistentTask<?>> predicate, @Nullable TimeValue timeout,
WaitForPersistentTaskStatusListener listener) {
ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext());
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(taskId);
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));

}

@Override
public void onTimeout(TimeValue timeout) {
listener.onTimeout(timeout);
}
}, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)));
if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) {
listener.onResponse(taskId);
} else {
stateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
listener.onResponse(taskId);
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
listener.onTimeout(timeout);
}
}, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)));
}
}

public interface WaitForPersistentTaskStatusListener extends PersistentTaskOperationListener {
Expand Down

0 comments on commit 96e9f6f

Please sign in to comment.