Skip to content

Commit

Permalink
HLRC: Reindex should support requests_per_seconds parameter (#33808)
Browse files Browse the repository at this point in the history
The high level Rest clients reindex method currently doesn't pass on the
"requests_per_second" that are optionally set in ReindexRequest through the Rest
layer. This change makes sure the value is added to the request parameters if
set and also includes it for the update-by-query and delete-by-query cases.
  • Loading branch information
Christoph Büscher authored and kcm committed Oct 30, 2018
1 parent 9230878 commit b163f3d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,8 @@ static Request reindex(ReindexRequest reindexRequest) throws IOException {
Params params = new Params(request)
.withRefresh(reindexRequest.isRefresh())
.withTimeout(reindexRequest.getTimeout())
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards());
.withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
.withRequestsPerSecond(reindexRequest.getRequestsPerSecond());

if (reindexRequest.getScrollTime() != null) {
params.putParam("scroll", reindexRequest.getScrollTime());
Expand All @@ -484,6 +485,7 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I
.withRefresh(updateByQueryRequest.isRefresh())
.withTimeout(updateByQueryRequest.getTimeout())
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond())
.withIndicesOptions(updateByQueryRequest.indicesOptions());
if (updateByQueryRequest.isAbortOnVersionConflict() == false) {
params.putParam("conflicts", "proceed");
Expand All @@ -510,6 +512,7 @@ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws I
.withRefresh(deleteByQueryRequest.isRefresh())
.withTimeout(deleteByQueryRequest.getTimeout())
.withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards())
.withRequestsPerSecond(deleteByQueryRequest.getRequestsPerSecond())
.withIndicesOptions(deleteByQueryRequest.indicesOptions());
if (deleteByQueryRequest.isAbortOnVersionConflict() == false) {
params.putParam("conflicts", "proceed");
Expand Down Expand Up @@ -714,6 +717,16 @@ Params withRefreshPolicy(RefreshPolicy refreshPolicy) {
return this;
}

Params withRequestsPerSecond(float requestsPerSecond) {
// the default in AbstractBulkByScrollRequest is Float.POSITIVE_INFINITY,
// but we don't want to add that to the URL parameters, instead we leave it out
if (Float.isFinite(requestsPerSecond)) {
return putParam("requests_per_second", Float.toString(requestsPerSecond));
} else {
return putParam("requests_per_second", "-1");
}
}

Params withRetryOnConflict(int retryOnConflict) {
if (retryOnConflict > 0) {
return putParam("retry_on_conflict", String.valueOf(retryOnConflict));
Expand Down Expand Up @@ -958,7 +971,7 @@ String build() {
private static String encodePart(String pathPart) {
try {
//encode each part (e.g. index, type and id) separately before merging them into the path
//we prepend "/" to the path part to make this pate absolute, otherwise there can be issues with
//we prepend "/" to the path part to make this path absolute, otherwise there can be issues with
//paths that start with `-` or contain `:`
URI uri = new URI(null, null, null, -1, "/" + pathPart, null, null);
//manually encode any slash that each part may contain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,13 @@ public void testReindex() throws IOException {
if (randomBoolean()) {
reindexRequest.setDestPipeline("my_pipeline");
}
if (randomBoolean()) {
float requestsPerSecond = (float) randomDoubleBetween(0.0, 10.0, false);
expectedParams.put("requests_per_second", Float.toString(requestsPerSecond));
reindexRequest.setRequestsPerSecond(requestsPerSecond);
} else {
expectedParams.put("requests_per_second", "-1");
}
if (randomBoolean()) {
reindexRequest.setDestRouting("=cat");
}
Expand Down Expand Up @@ -359,6 +366,13 @@ public void testUpdateByQuery() throws IOException {
updateByQueryRequest.setPipeline("my_pipeline");
expectedParams.put("pipeline", "my_pipeline");
}
if (randomBoolean()) {
float requestsPerSecond = (float) randomDoubleBetween(0.0, 10.0, false);
expectedParams.put("requests_per_second", Float.toString(requestsPerSecond));
updateByQueryRequest.setRequestsPerSecond(requestsPerSecond);
} else {
expectedParams.put("requests_per_second", "-1");
}
if (randomBoolean()) {
updateByQueryRequest.setRouting("=cat");
expectedParams.put("routing", "=cat");
Expand Down Expand Up @@ -430,6 +444,13 @@ public void testDeleteByQuery() throws IOException {
if (randomBoolean()) {
deleteByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval"));
}
if (randomBoolean()) {
float requestsPerSecond = (float) randomDoubleBetween(0.0, 10.0, false);
expectedParams.put("requests_per_second", Float.toString(requestsPerSecond));
deleteByQueryRequest.setRequestsPerSecond(requestsPerSecond);
} else {
expectedParams.put("requests_per_second", "-1");
}
setRandomIndicesOptions(deleteByQueryRequest::setIndicesOptions, deleteByQueryRequest::indicesOptions, expectedParams);
setRandomTimeout(deleteByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
Request request = RequestConverters.deleteByQuery(deleteByQueryRequest);
Expand Down

0 comments on commit b163f3d

Please sign in to comment.