Skip to content

Commit

Permalink
Core: Combine doExecute methods in TransportAction (#31517)
Browse files Browse the repository at this point in the history
TransportAction currently contains 2 doExecute methods, one which takes
a the task, and one that does not. The latter is what some subclasses
implement, while the first one just calls the latter, dropping the given
task. This commit combines these methods, in favor of just always
assuming a task is present.
  • Loading branch information
rjernst authored and colings86 committed Jun 25, 2018
1 parent 2ba50db commit 49387bf
Show file tree
Hide file tree
Showing 111 changed files with 240 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
Expand All @@ -42,7 +43,7 @@ public TransportNoopBulkAction(Settings settings, TransportService transportServ
}

@Override
protected void doExecute(BulkRequest request, ActionListener<BulkResponse> listener) {
protected void doExecute(Task task, BulkRequest request, ActionListener<BulkResponse> listener) {
final int itemCount = request.requests().size();
// simulate at least a realistic amount of data that gets serialized
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[itemCount];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.util.Collections;
Expand All @@ -44,7 +45,7 @@ public TransportNoopSearchAction(Settings settings, TransportService transportSe
}

@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
listener.onResponse(new SearchResponse(new InternalSearchResponse(
new SearchHits(
new SearchHit[0], 0L, 0.0f),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -118,7 +119,7 @@ public TransportAction(Settings settings, TransportService transportService, Act
}

@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
try {
listener.onResponse(new Response(GROK_PATTERNS));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand All @@ -54,7 +55,7 @@ public TransportMultiSearchTemplateAction(Settings settings, TransportService tr
}

@Override
protected void doExecute(MultiSearchTemplateRequest request, ActionListener<MultiSearchTemplateResponse> listener) {
protected void doExecute(Task task, MultiSearchTemplateRequest request, ActionListener<MultiSearchTemplateResponse> listener) {
List<Integer> originalSlots = new ArrayList<>();
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
multiSearchRequest.indicesOptions(request.indicesOptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand All @@ -63,7 +64,7 @@ public TransportSearchTemplateAction(Settings settings, TransportService transpo
}

@Override
protected void doExecute(SearchTemplateRequest request, ActionListener<SearchTemplateResponse> listener) {
protected void doExecute(Task task, SearchTemplateRequest request, ActionListener<SearchTemplateResponse> listener) {
final SearchTemplateResponse response = new SearchTemplateResponse();
try {
SearchRequest searchRequest = convert(request, response, scriptService, xContentRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -285,7 +286,7 @@ public TransportAction(Settings settings, TransportService transportService,
this.scriptService = scriptService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
switch (request.context) {
case PAINLESS_TEST:
PainlessTestScript.Factory factory = scriptService.compile(request.script, PainlessTestScript.CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -83,7 +84,7 @@ public TransportRankEvalAction(Settings settings, ActionFilters actionFilters, C
}

@Override
protected void doExecute(RankEvalRequest request, ActionListener<RankEvalResponse> listener) {
protected void doExecute(Task task, RankEvalRequest request, ActionListener<RankEvalResponse> listener) {
RankEvalSpec evaluationSpecification = request.getRankEvalSpec();
EvaluationMetric metric = evaluationSpecification.getMetric();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,4 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<Bu
}
);
}

@Override
protected void doExecute(DeleteByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
throw new UnsupportedOperationException("task required");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkB
);
}

@Override
protected void doExecute(ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
throw new UnsupportedOperationException("task required");
}

static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
if (remoteInfo == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener
);
}

@Override
protected void doExecute(UpdateByQueryRequest request, ActionListener<BulkByScrollResponse> listener) {
throw new UnsupportedOperationException("task required");
}

/**
* Simple implementation of update-by-query using scrolling and bulk.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ public TransportGetTaskAction(Settings settings, ThreadPool threadPool, Transpor
this.xContentRegistry = xContentRegistry;
}

@Override
protected void doExecute(GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
throw new UnsupportedOperationException("Task is required");
}

@Override
protected void doExecute(Task thisTask, GetTaskRequest request, ActionListener<GetTaskResponse> listener) {
if (clusterService.localNode().getId().equals(request.getTaskId().getNodeId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.Supplier;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -45,7 +46,7 @@ public TransportRemoteInfoAction(Settings settings, TransportService transportSe
}

@Override
protected void doExecute(RemoteInfoRequest remoteInfoRequest, ActionListener<RemoteInfoResponse> listener) {
protected void doExecute(Task task, RemoteInfoRequest remoteInfoRequest, ActionListener<RemoteInfoResponse> listener) {
listener.onResponse(new RemoteInfoResponse(remoteClusterService.getRemoteConnectionInfos().collect(toList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

/**
Expand All @@ -45,7 +46,7 @@ public TransportSyncedFlushAction(Settings settings, TransportService transportS
}

@Override
protected void doExecute(SyncedFlushRequest request, ActionListener<SyncedFlushResponse> listener) {
protected void doExecute(Task task, SyncedFlushRequest request, ActionListener<SyncedFlushResponse> listener) {
syncedFlushService.attemptSyncedFlush(request.indices(), request.indicesOptions(), listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.util.HashMap;
Expand All @@ -53,7 +54,7 @@ public TransportGetFieldMappingsAction(Settings settings, TransportService trans
}

@Override
protected void doExecute(GetFieldMappingsRequest request, final ActionListener<GetFieldMappingsResponse> listener) {
protected void doExecute(Task task, GetFieldMappingsRequest request, final ActionListener<GetFieldMappingsResponse> listener) {
ClusterState clusterState = clusterService.state();
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request);
final AtomicInteger indexCounter = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportSe
clusterService.addStateApplier(this.ingestForwarder);
}

@Override
protected final void doExecute(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
throw new UnsupportedOperationException("task parameter is required for this operation");
}

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
if (bulkRequest.hasIndexRequestsWithPipelines()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.search.rescore.RescoreContext;
import org.elasticsearch.search.rescore.Rescorer;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -67,9 +68,9 @@ public TransportExplainAction(Settings settings, ThreadPool threadPool, ClusterS
}

@Override
protected void doExecute(ExplainRequest request, ActionListener<ExplainResponse> listener) {
protected void doExecute(Task task, ExplainRequest request, ActionListener<ExplainResponse> listener) {
request.nowInMillis = System.currentTimeMillis();
super.doExecute(request, listener);
super.doExecute(task, request, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
Expand Down Expand Up @@ -63,8 +64,7 @@ public TransportFieldCapabilitiesAction(Settings settings, TransportService tran
}

@Override
protected void doExecute(FieldCapabilitiesRequest request,
final ActionListener<FieldCapabilitiesResponse> listener) {
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(),
request.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.util.HashMap;
Expand All @@ -53,7 +54,7 @@ public TransportMultiGetAction(Settings settings, TransportService transportServ
}

@Override
protected void doExecute(final MultiGetRequest request, final ActionListener<MultiGetResponse> listener) {
protected void doExecute(Task task, final MultiGetRequest request, final ActionListener<MultiGetResponse> listener) {
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -48,7 +49,7 @@ public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool,
}

@Override
protected void doExecute(SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
protected void doExecute(Task task, SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
final Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();

final SimulatePipelineRequest.Parsed simulateRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

public class TransportMainAction extends HandledTransportAction<MainRequest, MainResponse> {
Expand All @@ -44,7 +45,7 @@ public TransportMainAction(Settings settings, TransportService transportService,
}

@Override
protected void doExecute(MainRequest request, ActionListener<MainResponse> listener) {
protected void doExecute(Task task, MainRequest request, ActionListener<MainResponse> listener) {
ClusterState clusterState = clusterService.state();
assert Node.NODE_NAME_SETTING.exists(settings);
final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

public class TransportClearScrollAction extends HandledTransportAction<ClearScrollRequest, ClearScrollResponse> {
Expand All @@ -43,7 +44,7 @@ public TransportClearScrollAction(Settings settings, TransportService transportS
}

@Override
protected void doExecute(ClearScrollRequest request, final ActionListener<ClearScrollResponse> listener) {
protected void doExecute(Task task, ClearScrollRequest request, final ActionListener<ClearScrollResponse> listener) {
Runnable runnable = new ClearScrollController(request, listener, clusterService.state().nodes(), logger, searchTransportService);
runnable.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -70,7 +71,7 @@ public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, Tran
}

@Override
protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
protected void doExecute(Task task, MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
final long relativeStartTime = relativeTimeProvider.getAsLong();

ClusterState clusterState = clusterService.state();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,6 @@ static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShards
return new GroupShardsIterator<>(shards);
}

@Override
protected final void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}

private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators,
SearchTimeProvider timeProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ public TransportSearchScrollAction(Settings settings, TransportService transport
this.searchPhaseController = searchPhaseController;
}

@Override
protected final void doExecute(SearchScrollRequest request, ActionListener<SearchResponse> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
@Override
protected void doExecute(Task task, SearchScrollRequest request, ActionListener<SearchResponse> listener) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@ public final void execute(Task task, Request request, ActionListener<Response> l
requestFilterChain.proceed(task, actionName, request, listener);
}

protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
doExecute(request, listener);
}

protected abstract void doExecute(Request request, ActionListener<Response> listener);
protected abstract void doExecute(Task task, Request request, ActionListener<Response> listener);

private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse>
implements ActionFilterChain<Request, Response> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
new AsyncBroadcastAction(task, request, listener).start();
}

@Override
protected final void doExecute(Request request, ActionListener<Response> listener) {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}

protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState);

protected abstract ShardRequest newShardRequest(int numShards, ShardRouting shard, Request request);
Expand Down
Loading

0 comments on commit 49387bf

Please sign in to comment.