Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Convert TransportAction.execute uses to client calls #31487

Merged
merged 2 commits into from
Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -42,16 +42,16 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction<M

private final ScriptService scriptService;
private final NamedXContentRegistry xContentRegistry;
private final TransportMultiSearchAction multiSearchAction;
private final NodeClient client;

@Inject
public TransportMultiSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, TransportMultiSearchAction multiSearchAction) {
NamedXContentRegistry xContentRegistry, NodeClient client) {
super(settings, MultiSearchTemplateAction.NAME, threadPool, transportService, actionFilters, MultiSearchTemplateRequest::new);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.multiSearchAction = multiSearchAction;
this.client = client;
}

@Override
Expand Down Expand Up @@ -81,7 +81,7 @@ protected void doExecute(MultiSearchTemplateRequest request, ActionListener<Mult
}
}

multiSearchAction.execute(multiSearchRequest, ActionListener.wrap(r -> {
client.multiSearch(multiSearchRequest, ActionListener.wrap(r -> {
for (int i = 0; i < r.getResponses().length; i++) {
MultiSearchResponse.Item item = r.getResponses()[i];
int originalSlot = originalSlots.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -50,20 +50,18 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
private static final String TEMPLATE_LANG = MustacheScriptEngine.NAME;

private final ScriptService scriptService;
private final TransportSearchAction searchAction;
private final NamedXContentRegistry xContentRegistry;
private final NodeClient client;

@Inject
public TransportSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters,
ScriptService scriptService,
TransportSearchAction searchAction,
NamedXContentRegistry xContentRegistry) {
ActionFilters actionFilters, ScriptService scriptService, NamedXContentRegistry xContentRegistry,
NodeClient client) {
super(settings, SearchTemplateAction.NAME, threadPool, transportService, actionFilters,
(Supplier<SearchTemplateRequest>) SearchTemplateRequest::new);
this.scriptService = scriptService;
this.searchAction = searchAction;
this.xContentRegistry = xContentRegistry;
this.client = client;
}

@Override
Expand All @@ -72,7 +70,7 @@ protected void doExecute(SearchTemplateRequest request, ActionListener<SearchTem
try {
SearchRequest searchRequest = convert(request, response, scriptService, xContentRegistry);
if (searchRequest != null) {
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ protected static <T extends CreateIndexResponse> void declareFields(Constructing

private String index;

protected CreateIndexResponse() {
}
public CreateIndexResponse() {}

protected CreateIndexResponse(boolean acknowledged, boolean shardsAcknowledged, String index) {
super(acknowledged, shardsAcknowledged);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down Expand Up @@ -58,16 +59,15 @@
public class TransportUpgradeAction extends TransportBroadcastByNodeAction<UpgradeRequest, UpgradeResponse, ShardUpgradeResult> {

private final IndicesService indicesService;

private final TransportUpgradeSettingsAction upgradeSettingsAction;
private final NodeClient client;

@Inject
public TransportUpgradeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, TransportUpgradeSettingsAction upgradeSettingsAction) {
IndexNameExpressionResolver indexNameExpressionResolver, NodeClient client) {
super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpgradeRequest::new, ThreadPool.Names.FORCE_MERGE);
this.indicesService = indicesService;
this.upgradeSettingsAction = upgradeSettingsAction;
this.client = client;
}

@Override
Expand Down Expand Up @@ -205,7 +205,7 @@ public void onFailure(Exception e) {

private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
upgradeSettingsAction.execute(upgradeSettingsRequest, new ActionListener<UpgradeSettingsResponse>() {
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener<UpgradeSettingsResponse>() {
@Override
public void onResponse(UpgradeSettingsResponse updateSettingsResponse) {
listener.onResponse(upgradeResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -88,38 +88,35 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final ClusterService clusterService;
private final IngestService ingestService;
private final TransportShardBulkAction shardBulkAction;
private final TransportCreateIndexAction createIndexAction;
private final LongSupplier relativeTimeProvider;
private final IngestActionForwarder ingestForwarder;
private final NodeClient client;
private final IndexNameExpressionResolver indexNameExpressionResolver;

@Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
this(settings, threadPool, transportService, clusterService, ingestService,
shardBulkAction, createIndexAction,
actionFilters, indexNameExpressionResolver,
autoCreateIndex,
System::nanoTime);
this(settings, threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters,
indexNameExpressionResolver, autoCreateIndex, System::nanoTime);
}

public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, BulkRequest::new);
Objects.requireNonNull(relativeTimeProvider);
this.clusterService = clusterService;
this.ingestService = ingestService;
this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
this.relativeTimeProvider = relativeTimeProvider;
this.ingestForwarder = new IngestActionForwarder(transportService);
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
clusterService.addStateApplier(this.ingestForwarder);
}
Expand Down Expand Up @@ -224,7 +221,7 @@ void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResp
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
createIndexAction.execute(createIndexRequest, listener);
client.admin().indices().create(createIndexRequest, listener);
}

private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest request, String index, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -47,16 +47,16 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip

private final PipelineStore pipelineStore;
private final ClusterService clusterService;
private final TransportNodesInfoAction nodesInfoAction;
private final NodeClient client;

@Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService,
TransportNodesInfoAction nodesInfoAction) {
NodeClient client) {
super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.clusterService = clusterService;
this.nodesInfoAction = nodesInfoAction;
this.client = client;
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
}

Expand All @@ -75,7 +75,7 @@ protected void masterOperation(PutPipelineRequest request, ClusterState state, A
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.ingest(true);
nodesInfoAction.execute(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
@Override
public void onResponse(NodesInfoResponse nodeInfos) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -43,27 +43,27 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear

private final int availableProcessors;
private final ClusterService clusterService;
private final TransportAction<SearchRequest, SearchResponse> searchAction;
private final LongSupplier relativeTimeProvider;
private final NodeClient client;

@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportSearchAction searchAction, ActionFilters actionFilters) {
ClusterService clusterService, ActionFilters actionFilters, NodeClient client) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
this.relativeTimeProvider = System::nanoTime;
this.client = client;
}

TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
int availableProcessors, LongSupplier relativeTimeProvider) {
ClusterService clusterService, int availableProcessors,
LongSupplier relativeTimeProvider, NodeClient client) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.client = client;
}

@Override
Expand Down Expand Up @@ -141,7 +141,7 @@ void executeSearch(
* when we handle the response rather than going recursive, we fork to another thread, otherwise we recurse.
*/
final Thread thread = Thread.currentThread();
searchAction.execute(request.request, new ActionListener<SearchResponse>() {
client.search(request.request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
handleResponse(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null));
Expand Down
Loading