Skip to content

Commit

Permalink
[CCR] Add auto follow pattern APIs to transport client. (elastic#33629)
Browse files Browse the repository at this point in the history
  • Loading branch information
martijnvg committed Sep 12, 2018
1 parent dbc296f commit 686205c
Show file tree
Hide file tree
Showing 16 changed files with 68 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private void handleClusterAlias(String clusterAlias, AutoFollowPattern autoFollo
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(),
autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(),
autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getRetryTimeout(),
autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getMaxRetryDelay(),
autoFollowPattern.getIdleShardRetryDelay());

// Execute if the create and follow api call succeeds:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;

import java.util.HashMap;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;

import java.util.ArrayList;
import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.Request;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction.Request;

import java.io.IOException;

import static org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.INSTANCE;
import static org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction.INSTANCE;

public class RestDeleteAutoFollowPatternAction extends BaseRestHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction.Request;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction.Request;

import java.io.IOException;

import static org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction.INSTANCE;
import static org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction.INSTANCE;

public class RestPutAutoFollowPatternAction extends BaseRestHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;

import java.util.Collection;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;

import java.util.Arrays;
import java.util.Collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction.Request;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction.Request;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;

import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public static class AutoFollowPattern implements Writeable, ToXContentObject {
public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes");
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout");
public static final ParseField MAX_RETRY_DELAY = new ParseField("retry_timeout");
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");

@SuppressWarnings("unchecked")
Expand All @@ -187,8 +187,8 @@ public static class AutoFollowPattern implements Writeable, ToXContentObject {
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()),
RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
Expand All @@ -201,20 +201,20 @@ public static class AutoFollowPattern implements Writeable, ToXContentObject {
private final Long maxOperationSizeInBytes;
private final Integer maxConcurrentWriteBatches;
private final Integer maxWriteBufferSize;
private final TimeValue retryTimeout;
private final TimeValue maxRetryDelay;
private final TimeValue idleShardRetryDelay;

public AutoFollowPattern(List<String> leaderIndexPatterns, String followIndexPattern, Integer maxBatchOperationCount,
Integer maxConcurrentReadBatches, Long maxOperationSizeInBytes, Integer maxConcurrentWriteBatches,
Integer maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay) {
Integer maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay) {
this.leaderIndexPatterns = leaderIndexPatterns;
this.followIndexPattern = followIndexPattern;
this.maxBatchOperationCount = maxBatchOperationCount;
this.maxConcurrentReadBatches = maxConcurrentReadBatches;
this.maxOperationSizeInBytes = maxOperationSizeInBytes;
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
this.maxWriteBufferSize = maxWriteBufferSize;
this.retryTimeout = retryTimeout;
this.maxRetryDelay = maxRetryDelay;
this.idleShardRetryDelay = idleShardRetryDelay;
}

Expand All @@ -226,7 +226,7 @@ public AutoFollowPattern(List<String> leaderIndexPatterns, String followIndexPat
maxOperationSizeInBytes = in.readOptionalLong();
maxConcurrentWriteBatches = in.readOptionalVInt();
maxWriteBufferSize = in.readOptionalVInt();
retryTimeout = in.readOptionalTimeValue();
maxRetryDelay = in.readOptionalTimeValue();
idleShardRetryDelay = in.readOptionalTimeValue();
}

Expand Down Expand Up @@ -266,8 +266,8 @@ public Integer getMaxWriteBufferSize() {
return maxWriteBufferSize;
}

public TimeValue getRetryTimeout() {
return retryTimeout;
public TimeValue getMaxRetryDelay() {
return maxRetryDelay;
}

public TimeValue getIdleShardRetryDelay() {
Expand All @@ -283,7 +283,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalLong(maxOperationSizeInBytes);
out.writeOptionalVInt(maxConcurrentWriteBatches);
out.writeOptionalVInt(maxWriteBufferSize);
out.writeOptionalTimeValue(retryTimeout);
out.writeOptionalTimeValue(maxRetryDelay);
out.writeOptionalTimeValue(idleShardRetryDelay);
}

Expand All @@ -308,8 +308,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (maxWriteBufferSize != null){
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
}
if (retryTimeout != null) {
builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout);
if (maxRetryDelay != null) {
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay);
}
if (idleShardRetryDelay != null) {
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay);
Expand All @@ -334,7 +334,7 @@ public boolean equals(Object o) {
Objects.equals(maxOperationSizeInBytes, that.maxOperationSizeInBytes) &&
Objects.equals(maxConcurrentWriteBatches, that.maxConcurrentWriteBatches) &&
Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) &&
Objects.equals(retryTimeout, that.retryTimeout) &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay);
}

Expand All @@ -348,7 +348,7 @@ public int hashCode() {
maxOperationSizeInBytes,
maxConcurrentWriteBatches,
maxWriteBufferSize,
retryTimeout,
maxRetryDelay,
idleShardRetryDelay
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
package org.elasticsearch.xpack.core.ccr.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
package org.elasticsearch.xpack.core.ccr.action;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestBuilder;
Expand Down Expand Up @@ -65,11 +65,11 @@ public static class Request extends AcknowledgedRequest<Request> implements ToXC
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES);
PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE);
PARSER.declareField(Request::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.RETRY_TIMEOUT.getPreferredName()),
ShardFollowTask.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName()),
AutoFollowPattern.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
PARSER.declareField(Request::setIdleShardRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
AutoFollowPattern.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
}

public static Request fromXContent(XContentParser parser, String remoteClusterAlias) throws IOException {
Expand Down Expand Up @@ -230,25 +230,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(FOLLOW_INDEX_NAME_PATTERN_FIELD.getPreferredName(), followIndexNamePattern);
}
if (maxBatchOperationCount != null) {
builder.field(ShardFollowTask.MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
builder.field(AutoFollowPattern.MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount);
}
if (maxOperationSizeInBytes != null) {
builder.field(ShardFollowTask.MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes);
builder.field(AutoFollowPattern.MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes);
}
if (maxWriteBufferSize != null) {
builder.field(ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(AutoFollowPattern.MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
}
if (maxConcurrentReadBatches != null) {
builder.field(ShardFollowTask.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
builder.field(AutoFollowPattern.MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches);
}
if (maxConcurrentWriteBatches != null) {
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
builder.field(AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
}
if (maxRetryDelay != null) {
builder.field(ShardFollowTask.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(AutoFollowPattern.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
}
if (idleShardRetryDelay != null) {
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
builder.field(AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
}
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;

import java.util.Objects;
Expand Down Expand Up @@ -70,4 +72,28 @@ public ActionFuture<AcknowledgedResponse> unfollow(final UnfollowIndexAction.Req
return listener;
}

public void putAutoFollowPattern(
final PutAutoFollowPatternAction.Request request,
final ActionListener<AcknowledgedResponse> listener) {
client.execute(PutAutoFollowPatternAction.INSTANCE, request, listener);
}

public ActionFuture<AcknowledgedResponse> putAutoFollowPattern(final PutAutoFollowPatternAction.Request request) {
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
client.execute(PutAutoFollowPatternAction.INSTANCE, request, listener);
return listener;
}

public void deleteAutoFollowPattern(
final DeleteAutoFollowPatternAction.Request request,
final ActionListener<AcknowledgedResponse> listener) {
client.execute(DeleteAutoFollowPatternAction.INSTANCE, request, listener);
}

public ActionFuture<AcknowledgedResponse> deleteAutoFollowPattern(final DeleteAutoFollowPatternAction.Request request) {
final PlainActionFuture<AcknowledgedResponse> listener = PlainActionFuture.newFuture();
client.execute(DeleteAutoFollowPatternAction.INSTANCE, request, listener);
return listener;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
package org.elasticsearch.xpack.core.ccr.action;

import org.elasticsearch.test.AbstractStreamableTestCase;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;
package org.elasticsearch.xpack.core.ccr.action;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down

0 comments on commit 686205c

Please sign in to comment.