Skip to content

Commit

Permalink
Removed ClientHelper dependency from PersistentTasksService.
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Jan 31, 2018
1 parent cc16f9d commit 07e727c
Showing 1 changed file with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
*/
package org.elasticsearch.persistent;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
Expand All @@ -30,16 +35,16 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;

import java.util.function.BiConsumer;
import java.util.function.Predicate;

import static org.elasticsearch.ClientHelper.PERSISTENT_TASK_ORIGIN;
import static org.elasticsearch.ClientHelper.executeAsyncWithOrigin;
import java.util.function.Supplier;

/**
* This service is used by persistent actions to propagate changes in the action state and notify about completion
Expand Down Expand Up @@ -194,4 +199,40 @@ default void onTimeout(TimeValue timeout) {
onFailure(new IllegalStateException("timed out after " + timeout));
}
}

private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";
private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";

/**
* Executes a consumer after setting the origin and wrapping the listener so that the proper context is restored
*/
public static <Request extends ActionRequest, Response extends ActionResponse> void executeAsyncWithOrigin(
ThreadContext threadContext, String origin, Request request, ActionListener<Response> listener,
BiConsumer<Request, ActionListener<Response>> consumer) {
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) {
consumer.accept(request, new ContextPreservingActionListener<>(supplier, listener));
}
}
/**
* Executes an asynchronous action using the provided client. The origin is set in the context and the listener
* is wrapped to ensure the proper context is restored
*/
public static <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void executeAsyncWithOrigin(
Client client, String origin, Action<Request, Response, RequestBuilder> action, Request request,
ActionListener<Response> listener) {
final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) {
client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener));
}
}

public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadContext, String origin) {
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);
return storedContext;
}

}

0 comments on commit 07e727c

Please sign in to comment.