Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Combine messageRecieved methods in TransportRequestHandler #31519

Merged
merged 2 commits into from
Jun 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -91,7 +92,7 @@ public void testScheduledPing() throws Exception {
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ private static MockTransportService startTransport(
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null);
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
(request, channel) -> {
(request, channel, task) -> {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel) -> {
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : knownNodes) {
builder.add(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
Expand All @@ -39,7 +40,7 @@ public TransportLivenessAction(ClusterService clusterService, TransportService t
}

@Override
public void messageReceived(LivenessRequest request, TransportChannel channel) throws Exception {
public void messageReceived(LivenessRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(new LivenessResponse(clusterService.getClusterName(), clusterService.localNode()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -285,7 +286,7 @@ public void writeTo(StreamOutput out) throws IOException {

class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRequest> {
@Override
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel, Task task) throws Exception {
if (request.ban) {
logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId,
clusterService.localNode().getId(), request.reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,10 @@
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
Expand Down Expand Up @@ -314,150 +311,116 @@ public void writeTo(StreamOutput out) throws IOException {

public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
new TaskAwareTransportRequestHandler<ScrollFreeContextRequest>() {
@Override
public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}
});
(request, channel, task) -> {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME,
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
@Override
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}
});
(request, channel, task) -> {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME,
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) throws Exception {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
});
ThreadPool.Names.SAME, (request, channel, task) -> {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
() -> TransportResponse.Empty.INSTANCE);

transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
(request, channel, task) -> {
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
});

}
}
});
});
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);

transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
(request, channel, task) -> {
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
}

@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
});
}
}
});
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
@Override
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);

transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);

transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
@Override
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);

transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchSearchRequest::new,
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
@Override
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);

// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
boolean canMatch = searchService.canMatch(request);
channel.sendResponse(new CanMatchResponse(canMatch));
}
(request, channel, task) -> {
boolean canMatch = searchService.canMatch(request);
channel.sendResponse(new CanMatchResponse(canMatch));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
(Supplier<TransportResponse>) CanMatchResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@ protected HandledTransportAction(Settings settings, String actionName, boolean c

class TransportHandler implements TransportRequestHandler<Request> {

@Override
public final void messageReceived(Request request, TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}

@Override
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
// We already got the task created on the network layer - no need to create it again on the transport layer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,5 @@ class ShardTransportHandler implements TransportRequestHandler<ShardRequest> {
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(shardOperation(request, task));
}

@Override
public final void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ protected void onCompletion() {

class BroadcastByNodeTransportRequestHandler implements TransportRequestHandler<NodeRequest> {
@Override
public void messageReceived(final NodeRequest request, TransportChannel channel) throws Exception {
public void messageReceived(final NodeRequest request, TransportChannel channel, Task task) throws Exception {
List<ShardRouting> shards = request.getShards();
final int totalShards = shards.size();
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,6 @@ class NodeTransportHandler implements TransportRequestHandler<NodeRequest> {
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(nodeOperation(request, task));
}

@Override
public void messageReceived(NodeRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(nodeOperation(request));
}

}

}
Loading