Skip to content

Commit

Permalink
[CCR] Move api parameters from url to request body. (#31949)
Browse files Browse the repository at this point in the history
Relates to #30102
  • Loading branch information
martijnvg committed Jul 11, 2018
1 parent b260ef5 commit 04b5681
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,13 @@ private static void refresh(String index) throws IOException {

private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,13 @@ private static void refresh(String index) throws IOException {

private static void followIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_xpack/ccr/_create_and_follow");
request.addParameter("leader_index", leaderIndex);
request.addParameter("idle_shard_retry_delay", "10ms");
request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ protected Boolean newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
String followIndex = request.getFollowRequest().getFollowIndex();
String followIndex = request.getFollowRequest().getFollowerIndex();
IndexMetaData currentIndex = currentState.metaData().index(followIndex);
if (currentIndex != null) {
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
}

MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex);

// Copy all settings, but overwrite a few settings.
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(leaderIndexMetaData.getSettings());
Expand All @@ -295,7 +295,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followIndex);
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
imdBuilder.settings(settingsBuilder);

// Copy mappings from leader IMD to follow IMD
for (ObjectObjectCursor<String, MappingMetaData> cursor : leaderIndexMetaData.getMappings()) {
imdBuilder.putMapping(cursor.value);
Expand All @@ -309,21 +309,21 @@ public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState updatedState = builder.build();

RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowIndex()));
.addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowerIndex()));
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"follow index [" + request.getFollowRequest().getFollowIndex() + "] created");
"follow index [" + request.getFollowRequest().getFollowerIndex() + "] created");

logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]",
followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas());

return updatedState;
}
});
}

private void initiateFollowing(Request request, ActionListener<Response> listener) {
activeShardsObserver.waitForActiveShards(new String[]{request.followRequest.getFollowIndex()},
activeShardsObserver.waitForActiveShards(new String[]{request.followRequest.getFollowerIndex()},
ActiveShardCount.DEFAULT, request.timeout(), result -> {
if (result) {
client.execute(FollowIndexAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
Expand All @@ -338,7 +338,7 @@ private void initiateFollowing(Request request, ActionListener<Response> listene

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowIndex());
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowRequest().getFollowerIndex());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.SearchSlowLog;
Expand Down Expand Up @@ -76,10 +82,51 @@ public Response newResponse() {
return new Response();
}

public static class Request extends ActionRequest {
public static class Request extends ActionRequest implements ToXContentObject {

private static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
private static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
private static final ConstructingObjectParser<Request, String> PARSER = new ConstructingObjectParser<>(NAME, true,
(args, followerIndex) -> {
if (args[1] != null) {
followerIndex = (String) args[1];
}
return new Request((String) args[0], followerIndex, (Integer) args[2], (Integer) args[3], (Long) args[4],
(Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8]);
});

static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOWER_INDEX_FIELD);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_BATCH_OPERATION_COUNT);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_CONCURRENT_READ_BATCHES);
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), ShardFollowTask.MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.RETRY_TIMEOUT.getPreferredName()),
ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
}

public static Request fromXContent(XContentParser parser, String followerIndex) throws IOException {
Request request = PARSER.parse(parser, followerIndex);
if (followerIndex != null) {
if (request.followerIndex == null) {
request.followerIndex = followerIndex;
} else {
if (request.followerIndex.equals(followerIndex) == false) {
throw new IllegalArgumentException("provided follower_index is not equal");
}
}
}
return request;
}

private String leaderIndex;
private String followIndex;
private String followerIndex;
private int maxBatchOperationCount;
private int maxConcurrentReadBatches;
private long maxOperationSizeInBytes;
Expand All @@ -88,9 +135,37 @@ public static class Request extends ActionRequest {
private TimeValue retryTimeout;
private TimeValue idleShardRetryDelay;

public Request(String leaderIndex, String followIndex, int maxBatchOperationCount, int maxConcurrentReadBatches,
long maxOperationSizeInBytes, int maxConcurrentWriteBatches, int maxWriteBufferSize,
public Request(String leaderIndex, String followerIndex, Integer maxBatchOperationCount, Integer maxConcurrentReadBatches,
Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches, Integer maxWriteBufferSize,
TimeValue retryTimeout, TimeValue idleShardRetryDelay) {
if (leaderIndex == null) {
throw new IllegalArgumentException("leader_index is missing");
}
if (followerIndex == null) {
throw new IllegalArgumentException("follower_index is missing");
}
if (maxBatchOperationCount == null) {
maxBatchOperationCount = ShardFollowNodeTask.DEFAULT_MAX_BATCH_OPERATION_COUNT;
}
if (maxConcurrentReadBatches == null) {
maxConcurrentReadBatches = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_READ_BATCHES;
}
if (maxOperationSizeInBytes == null) {
maxOperationSizeInBytes = ShardFollowNodeTask.DEFAULT_MAX_BATCH_SIZE_IN_BYTES;
}
if (maxConcurrentWriteBatches == null) {
maxConcurrentWriteBatches = ShardFollowNodeTask.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES;
}
if (maxWriteBufferSize == null) {
maxWriteBufferSize = ShardFollowNodeTask.DEFAULT_MAX_WRITE_BUFFER_SIZE;
}
if (retryTimeout == null) {
retryTimeout = ShardFollowNodeTask.DEFAULT_RETRY_TIMEOUT;
}
if (idleShardRetryDelay == null) {
idleShardRetryDelay = ShardFollowNodeTask.DEFAULT_IDLE_SHARD_RETRY_DELAY;
}

if (maxBatchOperationCount < 1) {
throw new IllegalArgumentException("maxBatchOperationCount must be larger than 0");
}
Expand All @@ -107,15 +182,15 @@ public Request(String leaderIndex, String followIndex, int maxBatchOperationCoun
throw new IllegalArgumentException("maxWriteBufferSize must be larger than 0");
}

this.leaderIndex = Objects.requireNonNull(leaderIndex);
this.followIndex = Objects.requireNonNull(followIndex);
this.leaderIndex = leaderIndex;
this.followerIndex = followerIndex;
this.maxBatchOperationCount = maxBatchOperationCount;
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.retryTimeout = Objects.requireNonNull(retryTimeout);
this.idleShardRetryDelay = Objects.requireNonNull(idleShardRetryDelay);
this.retryTimeout = retryTimeout;
this.idleShardRetryDelay = idleShardRetryDelay;
}

Request() {
Expand All @@ -125,8 +200,8 @@ public String getLeaderIndex() {
return leaderIndex;
}

public String getFollowIndex() {
return followIndex;
public String getFollowerIndex() {
return followerIndex;
}

public int getMaxBatchOperationCount() {
Expand All @@ -142,7 +217,7 @@ public ActionRequestValidationException validate() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
leaderIndex = in.readString();
followIndex = in.readString();
followerIndex = in.readString();
maxBatchOperationCount = in.readVInt();
maxConcurrentReadBatches = in.readVInt();
maxOperationSizeInBytes = in.readVLong();
Expand All @@ -156,7 +231,7 @@ public void readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(leaderIndex);
out.writeString(followIndex);
out.writeString(followerIndex);
out.writeVInt(maxBatchOperationCount);
out.writeVInt(maxConcurrentReadBatches);
out.writeVLong(maxOperationSizeInBytes);
Expand All @@ -166,6 +241,24 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalTimeValue(idleShardRetryDelay);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex);
builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex);
builder.field(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
builder.field(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes);
builder.field(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
}
builder.endObject();
return builder;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -179,12 +272,12 @@ public boolean equals(Object o) {
Objects.equals(retryTimeout, request.retryTimeout) &&
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) &&
Objects.equals(leaderIndex, request.leaderIndex) &&
Objects.equals(followIndex, request.followIndex);
Objects.equals(followerIndex, request.followerIndex);
}

@Override
public int hashCode() {
return Objects.hash(leaderIndex, followIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes,
return Objects.hash(leaderIndex, followerIndex, maxBatchOperationCount, maxConcurrentReadBatches, maxOperationSizeInBytes,
maxConcurrentWriteBatches, maxWriteBufferSize, retryTimeout, idleShardRetryDelay);
}
}
Expand Down Expand Up @@ -229,7 +322,7 @@ public TransportAction(Settings settings, ThreadPool threadPool, TransportServic
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
ClusterState localClusterState = clusterService.state();
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex);
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followerIndex);

String[] indices = new String[]{request.leaderIndex};
Map<String, List<String>> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false);
Expand Down Expand Up @@ -390,7 +483,7 @@ static void validate(Request request, IndexMetaData leaderIndex, IndexMetaData f
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist");
}
if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist");
throw new IllegalArgumentException("follow index [" + request.followerIndex + "] does not exist");
}
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) {
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled");
Expand Down
Loading

0 comments on commit 04b5681

Please sign in to comment.