Skip to content

Commit

Permalink
[CCR] Change resume follow api to be a master node action (elastic#35249
Browse files Browse the repository at this point in the history
)

In order to start shard follow tasks, the resume follow api already
needs execute N requests to the elected master node.

The pause follow API is also a master node action, which would make
how both APIs execute more consistent.
  • Loading branch information
martijnvg authored and pgomulka committed Nov 13, 2018
1 parent 3ed09e3 commit f043ea9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -33,7 +36,6 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
Expand All @@ -50,7 +52,7 @@
import java.util.Set;
import java.util.stream.Collectors;

public class TransportResumeFollowAction extends HandledTransportAction<ResumeFollowAction.Request, AcknowledgedResponse> {
public class TransportResumeFollowAction extends TransportMasterNodeAction<ResumeFollowAction.Request, AcknowledgedResponse> {

static final ByteSizeValue DEFAULT_MAX_READ_REQUEST_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB);
static final ByteSizeValue DEFAULT_MAX_WRITE_REQUEST_SIZE = new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES);
Expand All @@ -65,7 +67,6 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo

private final Client client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final PersistentTasksService persistentTasksService;
private final IndicesService indicesService;
private final CcrLicenseChecker ccrLicenseChecker;
Expand All @@ -77,28 +78,43 @@ public TransportResumeFollowAction(
final ActionFilters actionFilters,
final Client client,
final ClusterService clusterService,
final IndexNameExpressionResolver indexNameExpressionResolver,
final PersistentTasksService persistentTasksService,
final IndicesService indicesService,
final CcrLicenseChecker ccrLicenseChecker) {
super(ResumeFollowAction.NAME, transportService, actionFilters, in -> new ResumeFollowAction.Request(in));
super(ResumeFollowAction.NAME, true, transportService, clusterService, threadPool, actionFilters,
ResumeFollowAction.Request::new, indexNameExpressionResolver);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.persistentTasksService = persistentTasksService;
this.indicesService = indicesService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
}

@Override
protected void doExecute(final Task task,
final ResumeFollowAction.Request request,
final ActionListener<AcknowledgedResponse> listener) {
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}

@Override
protected ClusterBlockException checkBlock(ResumeFollowAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected void masterOperation(final ResumeFollowAction.Request request,
ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
if (ccrLicenseChecker.isCcrAllowed() == false) {
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
}

final ClusterState state = clusterService.state();
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
if (followerIndexMetadata == null) {
listener.onFailure(new IndexNotFoundException(request.getFollowerIndex()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
package org.elasticsearch.xpack.core.ccr.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -41,7 +41,7 @@ public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}

public static class Request extends ActionRequest implements ToXContentObject {
public static class Request extends MasterNodeRequest<Request> implements ToXContentObject {

static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count");
Expand Down

0 comments on commit f043ea9

Please sign in to comment.