diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java index 5aa64a5c1375e..b08b045d287c0 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/IndicesClient.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -269,6 +270,28 @@ public void flushAsync(FlushRequest flushRequest, ActionListener listener, emptySet(), headers); } + /** Initiate a synced flush manually using the synced flush API + *

+ * See + * Synced flush API on elastic.co + */ + public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, Header... headers) throws IOException { + return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, RequestConverters::flushSynced, + SyncedFlushResponse::fromXContent, emptySet(), headers); + } + + /** + * Asynchronously initiate a synced flush manually using the synced flush API + *

+ * See + * Synced flush API on elastic.co + */ + public void flushSyncedAsync(SyncedFlushRequest syncedFlushRequest, ActionListener listener, Header... headers) { + restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, RequestConverters::flushSynced, + SyncedFlushResponse::fromXContent, listener, emptySet(), headers); + } + + /** * Retrieve the settings of one or more indices *

diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 03f2b0d184b2c..290d61d81b76d 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -41,6 +41,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -214,6 +215,14 @@ static Request flush(FlushRequest flushRequest) { return request; } + static Request flushSynced(SyncedFlushRequest syncedFlushRequest) { + String[] indices = syncedFlushRequest.indices() == null ? Strings.EMPTY_ARRAY : syncedFlushRequest.indices(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_flush/synced")); + Params parameters = new Params(request); + parameters.withIndicesOptions(syncedFlushRequest.indicesOptions()); + return request; + } + static Request forceMerge(ForceMergeRequest forceMergeRequest) { String[] indices = forceMergeRequest.indices() == null ? Strings.EMPTY_ARRAY : forceMergeRequest.indices(); Request request = new Request(HttpPost.METHOD_NAME, endpoint(indices, "_forcemerge")); diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java new file mode 100644 index 0000000000000..53f3f3358ba2f --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/SyncedFlushResponse.java @@ -0,0 +1,344 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.ParsingException; +import org.elasticsearch.common.xcontent.ToXContentFragment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentLocation; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentParser.Token; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; + +public class SyncedFlushResponse extends ActionResponse implements ToXContentFragment { + + public static final String SHARDS_FIELD = "_shards"; + + private ShardCounts totalCounts; + private Map indexResults; + + SyncedFlushResponse(ShardCounts totalCounts, Map indexResults) { + this.totalCounts = new ShardCounts(totalCounts.total, totalCounts.successful, totalCounts.failed); + this.indexResults = Collections.unmodifiableMap(indexResults); + } + + /** + * @return The total number of shard copies that were processed across all indexes + */ + public int totalShards() { + return totalCounts.total; + } + + /** + * @return The number of successful shard copies that were processed across all indexes + */ + public int successfulShards() { + return totalCounts.successful; + } + + /** + * @return The number of failed shard copies that were processed across all indexes + */ + public int failedShards() { + return totalCounts.failed; + } + + /** + * @return A map of results for each index where the keys of the map are the index names + * and the values are the results encapsulated in {@link IndexResult}. + */ + public Map getIndexResults() { + return indexResults; + } + + ShardCounts getShardCounts() { + return totalCounts; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(SHARDS_FIELD); + totalCounts.toXContent(builder, params); + builder.endObject(); + for (Map.Entry entry: indexResults.entrySet()) { + String indexName = entry.getKey(); + IndexResult indexResult = entry.getValue(); + builder.startObject(indexName); + indexResult.toXContent(builder, params); + builder.endObject(); + } + return builder; + } + + public static SyncedFlushResponse fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + ShardCounts totalCounts = null; + Map indexResults = new HashMap<>(); + XContentLocation startLoc = parser.getTokenLocation(); + while (parser.nextToken().equals(Token.FIELD_NAME)) { + if (parser.currentName().equals(SHARDS_FIELD)) { + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + totalCounts = ShardCounts.fromXContent(parser); + } else { + String indexName = parser.currentName(); + IndexResult indexResult = IndexResult.fromXContent(parser); + indexResults.put(indexName, indexResult); + } + } + if (totalCounts != null) { + return new SyncedFlushResponse(totalCounts, indexResults); + } else { + throw new ParsingException( + startLoc, + "Unable to reconstruct object. Total counts for shards couldn't be parsed." + ); + } + } + + /** + * Encapsulates the number of total successful and failed shard copies + */ + public static final class ShardCounts implements ToXContentFragment { + + public static final String TOTAL_FIELD = "total"; + public static final String SUCCESSFUL_FIELD = "successful"; + public static final String FAILED_FIELD = "failed"; + + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "shardcounts", + a -> new ShardCounts((Integer) a[0], (Integer) a[1], (Integer) a[2]) + ); + static { + PARSER.declareInt(constructorArg(), new ParseField(TOTAL_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(SUCCESSFUL_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(FAILED_FIELD)); + } + + private int total; + private int successful; + private int failed; + + + ShardCounts(int total, int successful, int failed) { + this.total = total; + this.successful = successful; + this.failed = failed; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(TOTAL_FIELD, total); + builder.field(SUCCESSFUL_FIELD, successful); + builder.field(FAILED_FIELD, failed); + return builder; + } + + public static ShardCounts fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + public boolean equals(ShardCounts other) { + if (other != null) { + return + other.total == this.total && + other.successful == this.successful && + other.failed == this.failed; + } else { + return false; + } + } + + } + + /** + * Description for the flush/synced results for a particular index. + * This includes total, successful and failed copies along with failure description for each failed copy. + */ + public static final class IndexResult implements ToXContentFragment { + + public static final String TOTAL_FIELD = "total"; + public static final String SUCCESSFUL_FIELD = "successful"; + public static final String FAILED_FIELD = "failed"; + public static final String FAILURES_FIELD = "failures"; + + @SuppressWarnings("unchecked") + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>( + "indexresult", + a -> new IndexResult((Integer) a[0], (Integer) a[1], (Integer) a[2], (List)a[3]) + ); + static { + PARSER.declareInt(constructorArg(), new ParseField(TOTAL_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(SUCCESSFUL_FIELD)); + PARSER.declareInt(constructorArg(), new ParseField(FAILED_FIELD)); + PARSER.declareObjectArray(optionalConstructorArg(), ShardFailure.PARSER, new ParseField(FAILURES_FIELD)); + } + + private ShardCounts counts; + private List failures; + + IndexResult(int total, int successful, int failed, List failures) { + counts = new ShardCounts(total, successful, failed); + if (failures != null) { + this.failures = Collections.unmodifiableList(failures); + } else { + this.failures = Collections.unmodifiableList(new ArrayList<>()); + } + } + + /** + * @return The total number of shard copies that were processed for this index. + */ + public int totalShards() { + return counts.total; + } + + /** + * @return The number of successful shard copies that were processed for this index. + */ + public int successfulShards() { + return counts.successful; + } + + /** + * @return The number of failed shard copies that were processed for this index. + */ + public int failedShards() { + return counts.failed; + } + + /** + * @return A list of {@link ShardFailure} objects that describe each of the failed shard copies for this index. + */ + public List failures() { + return failures; + } + + ShardCounts getShardCounts() { + return counts; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + counts.toXContent(builder, params); + if (failures.size() > 0) { + builder.startArray(FAILURES_FIELD); + for (ShardFailure failure : failures) { + failure.toXContent(builder, params); + } + builder.endArray(); + } + return builder; + } + + public static IndexResult fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + } + + /** + * Description of a failed shard copy for an index. + */ + public static final class ShardFailure implements ToXContentFragment { + + public static String SHARD_ID_FIELD = "shard"; + public static String FAILURE_REASON_FIELD = "reason"; + public static String ROUTING_FIELD = "routing"; + + private int shardId; + private String failureReason; + private Map routing; + + @SuppressWarnings("unchecked") + static ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "shardfailure", + a -> new ShardFailure((Integer)a[0], (String)a[1], (Map)a[2]) + ); + static { + PARSER.declareInt(constructorArg(), new ParseField(SHARD_ID_FIELD)); + PARSER.declareString(constructorArg(), new ParseField(FAILURE_REASON_FIELD)); + PARSER.declareObject( + optionalConstructorArg(), + (parser, c) -> parser.map(), + new ParseField(ROUTING_FIELD) + ); + } + + ShardFailure(int shardId, String failureReason, Map routing) { + this.shardId = shardId; + this.failureReason = failureReason; + if (routing != null) { + this.routing = Collections.unmodifiableMap(routing); + } else { + this.routing = Collections.unmodifiableMap(new HashMap<>()); + } + } + + /** + * @return Id of the shard whose copy failed + */ + public int getShardId() { + return shardId; + } + + /** + * @return Reason for failure of the shard copy + */ + public String getFailureReason() { + return failureReason; + } + + /** + * @return Additional information about the failure. + */ + public Map getRouting() { + return routing; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(SHARD_ID_FIELD, shardId); + builder.field(FAILURE_REASON_FIELD, failureReason); + if (routing.size() > 0) { + builder.field(ROUTING_FIELD, routing); + } + builder.endObject(); + return builder; + } + + public static ShardFailure fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java index cf7aeb389f8a6..120ba65673b36 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java @@ -38,6 +38,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -565,6 +566,39 @@ public void testFlush() throws IOException { } } + public void testSyncedFlush() throws IOException { + { + String index = "index"; + Settings settings = Settings.builder() + .put("number_of_shards", 1) + .put("number_of_replicas", 0) + .build(); + createIndex(index, settings); + SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index); + SyncedFlushResponse flushResponse = + execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync); + assertThat(flushResponse.totalShards(), equalTo(1)); + assertThat(flushResponse.successfulShards(), equalTo(1)); + assertThat(flushResponse.failedShards(), equalTo(0)); + } + { + String nonExistentIndex = "non_existent_index"; + assertFalse(indexExists(nonExistentIndex)); + SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(nonExistentIndex); + ElasticsearchException exception = expectThrows( + ElasticsearchException.class, + () -> + execute( + syncedFlushRequest, + highLevelClient().indices()::flushSynced, + highLevelClient().indices()::flushSyncedAsync + ) + ); + assertEquals(RestStatus.NOT_FOUND, exception.status()); + } + } + + public void testClearCache() throws IOException { { String index = "index"; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 6fb94b60f92c4..402ccf16f42b3 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; @@ -659,6 +660,29 @@ public void testFlush() { assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); } + public void testSyncedFlush() { + String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5); + SyncedFlushRequest syncedFlushRequest; + if (randomBoolean()) { + syncedFlushRequest = new SyncedFlushRequest(indices); + } else { + syncedFlushRequest = new SyncedFlushRequest(); + syncedFlushRequest.indices(indices); + } + Map expectedParams = new HashMap<>(); + setRandomIndicesOptions(syncedFlushRequest::indicesOptions, syncedFlushRequest::indicesOptions, expectedParams); + Request request = RequestConverters.flushSynced(syncedFlushRequest); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + if (indices != null && indices.length > 0) { + endpoint.add(String.join(",", indices)); + } + endpoint.add("_flush/synced"); + assertThat(request.getEndpoint(), equalTo(endpoint.toString())); + assertThat(request.getParameters(), equalTo(expectedParams)); + assertThat(request.getEntity(), nullValue()); + assertThat(request.getMethod(), equalTo(HttpPost.METHOD_NAME)); + } + public void testForceMerge() { String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5); ForceMergeRequest forceMergeRequest; diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java new file mode 100644 index 0000000000000..bc8fc90dd75e6 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/SyncedFlushResponseTests.java @@ -0,0 +1,269 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client; + +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import com.carrotsearch.hppc.ObjectIntMap; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.flush.ShardsSyncedFlushResult; +import org.elasticsearch.indices.flush.SyncedFlushService; +import org.elasticsearch.test.ESTestCase; + +public class SyncedFlushResponseTests extends ESTestCase { + + public void testXContentSerialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + TestPlan plan = createTestPlan(); + + XContentBuilder serverResponsebuilder = XContentBuilder.builder(xContentType.xContent()); + assertNotNull(plan.result); + serverResponsebuilder.startObject(); + plan.result.toXContent(serverResponsebuilder, ToXContent.EMPTY_PARAMS); + serverResponsebuilder.endObject(); + XContentBuilder clientResponsebuilder = XContentBuilder.builder(xContentType.xContent()); + assertNotNull(plan.result); + clientResponsebuilder.startObject(); + plan.clientResult.toXContent(clientResponsebuilder, ToXContent.EMPTY_PARAMS); + clientResponsebuilder.endObject(); + Map serverContentMap = convertFailureListToSet( + serverResponsebuilder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + BytesReference.bytes(serverResponsebuilder).streamInput() + ).map() + ); + Map clientContentMap = convertFailureListToSet( + clientResponsebuilder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + BytesReference.bytes(clientResponsebuilder).streamInput() + ) + .map() + ); + assertEquals(serverContentMap, clientContentMap); + } + + public void testXContentDeserialization() throws IOException { + final XContentType xContentType = randomFrom(XContentType.values()); + TestPlan plan = createTestPlan(); + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + builder.startObject(); + plan.result.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + XContentParser parser = builder + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput() + ); + SyncedFlushResponse originalResponse = plan.clientResult; + SyncedFlushResponse parsedResponse = SyncedFlushResponse.fromXContent(parser); + assertNotNull(parsedResponse); + assertShardCounts(originalResponse.getShardCounts(), parsedResponse.getShardCounts()); + for (Map.Entry entry: originalResponse.getIndexResults().entrySet()) { + String index = entry.getKey(); + SyncedFlushResponse.IndexResult responseResult = entry.getValue(); + SyncedFlushResponse.IndexResult parsedResult = parsedResponse.getIndexResults().get(index); + assertNotNull(responseResult); + assertNotNull(parsedResult); + assertShardCounts(responseResult.getShardCounts(), parsedResult.getShardCounts()); + assertEquals(responseResult.failures().size(), parsedResult.failures().size()); + for (SyncedFlushResponse.ShardFailure responseShardFailure: responseResult.failures()) { + assertTrue(containsFailure(parsedResult.failures(), responseShardFailure)); + } + } + } + + static class TestPlan { + SyncedFlushResponse.ShardCounts totalCounts; + Map countsPerIndex = new HashMap<>(); + ObjectIntMap expectedFailuresPerIndex = new ObjectIntHashMap<>(); + org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse result; + SyncedFlushResponse clientResult; + } + + TestPlan createTestPlan() throws IOException { + final TestPlan testPlan = new TestPlan(); + final Map> indicesResults = new HashMap<>(); + Map indexResults = new HashMap<>(); + final XContentType xContentType = randomFrom(XContentType.values()); + final int indexCount = randomIntBetween(1, 10); + int totalShards = 0; + int totalSuccessful = 0; + int totalFailed = 0; + for (int i = 0; i < indexCount; i++) { + final String index = "index_" + i; + int shards = randomIntBetween(1, 4); + int replicas = randomIntBetween(0, 2); + int successful = 0; + int failed = 0; + int failures = 0; + List shardsResults = new ArrayList<>(); + List shardFailures = new ArrayList<>(); + for (int shard = 0; shard < shards; shard++) { + final ShardId shardId = new ShardId(index, "_na_", shard); + if (randomInt(5) < 2) { + // total shard failure + failed += replicas + 1; + failures++; + shardsResults.add(new ShardsSyncedFlushResult(shardId, replicas + 1, "simulated total failure")); + shardFailures.add( + new SyncedFlushResponse.ShardFailure( + shardId.id(), + "simulated total failure", + new HashMap<>() + ) + ); + } else { + Map shardResponses = new HashMap<>(); + for (int copy = 0; copy < replicas + 1; copy++) { + final ShardRouting shardRouting = + TestShardRouting.newShardRouting( + index, shard, "node_" + shardId + "_" + copy, null, + copy == 0, ShardRoutingState.STARTED + ); + if (randomInt(5) < 2) { + // shard copy failure + failed++; + failures++; + shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse("copy failure " + shardId)); + // Building the shardRouting map here. + XContentBuilder builder = XContentBuilder.builder(xContentType.xContent()); + Map routing = + shardRouting.toXContent(builder, ToXContent.EMPTY_PARAMS) + .generator() + .contentType() + .xContent() + .createParser( + xContentRegistry(), LoggingDeprecationHandler.INSTANCE, + BytesReference.bytes(builder).streamInput() + ) + .map(); + shardFailures.add( + new SyncedFlushResponse.ShardFailure( + shardId.id(), + "copy failure " + shardId, + routing + ) + ); + } else { + successful++; + shardResponses.put(shardRouting, new SyncedFlushService.ShardSyncedFlushResponse()); + } + } + shardsResults.add(new ShardsSyncedFlushResult(shardId, "_sync_id_" + shard, replicas + 1, shardResponses)); + } + } + indicesResults.put(index, shardsResults); + indexResults.put( + index, + new SyncedFlushResponse.IndexResult( + shards * (replicas + 1), + successful, + failed, + shardFailures + ) + ); + testPlan.countsPerIndex.put(index, new SyncedFlushResponse.ShardCounts(shards * (replicas + 1), successful, failed)); + testPlan.expectedFailuresPerIndex.put(index, failures); + totalFailed += failed; + totalShards += shards * (replicas + 1); + totalSuccessful += successful; + } + testPlan.result = new org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse(indicesResults); + testPlan.totalCounts = new SyncedFlushResponse.ShardCounts(totalShards, totalSuccessful, totalFailed); + testPlan.clientResult = new SyncedFlushResponse( + new SyncedFlushResponse.ShardCounts(totalShards, totalSuccessful, totalFailed), + indexResults + ); + return testPlan; + } + + public boolean containsFailure(List failures, SyncedFlushResponse.ShardFailure origFailure) { + for (SyncedFlushResponse.ShardFailure failure: failures) { + if (failure.getShardId() == origFailure.getShardId() && + failure.getFailureReason().equals(origFailure.getFailureReason()) && + failure.getRouting().equals(origFailure.getRouting())) { + return true; + } + } + return false; + } + + + public void assertShardCounts(SyncedFlushResponse.ShardCounts first, SyncedFlushResponse.ShardCounts second) { + if (first == null) { + assertNull(second); + } else { + assertTrue(first.equals(second)); + } + } + + public Map convertFailureListToSet(Map input) { + Map retMap = new HashMap<>(); + for (Map.Entry entry: input.entrySet()) { + if (entry.getKey().equals(SyncedFlushResponse.SHARDS_FIELD)) { + retMap.put(entry.getKey(), entry.getValue()); + } else { + // This was an index entry. + @SuppressWarnings("unchecked") + Map indexResult = (Map)entry.getValue(); + Map retResult = new HashMap<>(); + for (Map.Entry entry2: indexResult.entrySet()) { + if (entry2.getKey().equals(SyncedFlushResponse.IndexResult.FAILURES_FIELD)) { + @SuppressWarnings("unchecked") + List failures = (List)entry2.getValue(); + Set retSet = new HashSet<>(failures); + retResult.put(entry.getKey(), retSet); + } else { + retResult.put(entry2.getKey(), entry2.getValue()); + } + } + retMap.put(entry.getKey(), retResult); + } + } + return retMap; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java index d4d5af9e694ad..0ec3eeb986429 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.flush.SyncedFlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; @@ -55,8 +56,6 @@ import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; import org.elasticsearch.action.admin.indices.shrink.ResizeResponse; import org.elasticsearch.action.admin.indices.shrink.ResizeType; -import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest; -import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse; import org.elasticsearch.action.support.ActiveShardCount; @@ -64,6 +63,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.SyncedFlushResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -81,8 +81,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.equalTo; - /** * This class is used to generate the Java Indices API documentation. * You need to wrap your code between two tags like: @@ -784,6 +782,89 @@ public void onFailure(Exception e) { } } + public void testSyncedFlushIndex() throws Exception { + RestHighLevelClient client = highLevelClient(); + + { + createIndex("index1", Settings.EMPTY); + } + + { + // tag::flush-synced-request + SyncedFlushRequest request = new SyncedFlushRequest("index1"); // <1> + SyncedFlushRequest requestMultiple = new SyncedFlushRequest("index1", "index2"); // <2> + SyncedFlushRequest requestAll = new SyncedFlushRequest(); // <3> + // end::flush-synced-request + + // tag::flush-synced-request-indicesOptions + request.indicesOptions(IndicesOptions.lenientExpandOpen()); // <1> + // end::flush-synced-request-indicesOptions + + // tag::flush-synced-execute + SyncedFlushResponse flushSyncedResponse = client.indices().flushSynced(request); + // end::flush-synced-execute + + // tag::flush-synced-response + int totalShards = flushSyncedResponse.totalShards(); // <1> + int successfulShards = flushSyncedResponse.successfulShards(); // <2> + int failedShards = flushSyncedResponse.failedShards(); // <3> + + for (Map.Entry responsePerIndexEntry: + flushSyncedResponse.getIndexResults().entrySet()) { + String indexName = responsePerIndexEntry.getKey(); // <4> + SyncedFlushResponse.IndexResult indexResult = responsePerIndexEntry.getValue(); + int totalShardsForIndex = indexResult.totalShards(); // <5> + int successfulShardsForIndex = indexResult.successfulShards(); // <6> + int failedShardsForIndex = indexResult.failedShards(); // <7> + if (failedShardsForIndex > 0) { + for (SyncedFlushResponse.ShardFailure failureEntry: indexResult.failures()) { + int shardId = failureEntry.getShardId(); // <8> + String failureReason = failureEntry.getFailureReason(); // <9> + Map routing = failureEntry.getRouting(); // <10> + } + } + } + // end::flush-synced-response + + // tag::flush-synced-execute-listener + ActionListener listener = new ActionListener() { + @Override + public void onResponse(SyncedFlushResponse refreshResponse) { + // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::flush-synced-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::flush-synced-execute-async + client.indices().flushSyncedAsync(request, listener); // <1> + // end::flush-synced-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + + { + // tag::flush-synced-notfound + try { + SyncedFlushRequest request = new SyncedFlushRequest("does_not_exist"); + client.indices().flushSynced(request); + } catch (ElasticsearchException exception) { + if (exception.status() == RestStatus.NOT_FOUND) { + // <1> + } + } + // end::flush-synced-notfound + } + } + public void testGetSettings() throws Exception { RestHighLevelClient client = highLevelClient(); diff --git a/distribution/archives/build.gradle b/distribution/archives/build.gradle index ae4e6a431c977..c1097b68b898f 100644 --- a/distribution/archives/build.gradle +++ b/distribution/archives/build.gradle @@ -106,15 +106,23 @@ tasks.withType(AbstractArchiveTask) { baseName = "elasticsearch${ subdir.contains('oss') ? '-oss' : ''}" } +Closure commonZipConfig = { + dirMode 0755 + fileMode 0644 +} + task buildIntegTestZip(type: Zip) { + configure(commonZipConfig) with archiveFiles(transportModulesFiles, 'zip', false) } task buildZip(type: Zip) { + configure(commonZipConfig) with archiveFiles(modulesFiles(false), 'zip', false) } task buildOssZip(type: Zip) { + configure(commonZipConfig) with archiveFiles(modulesFiles(true), 'zip', true) } diff --git a/docs/java-rest/high-level/indices/flush_synced.asciidoc b/docs/java-rest/high-level/indices/flush_synced.asciidoc new file mode 100644 index 0000000000000..65afaa533a640 --- /dev/null +++ b/docs/java-rest/high-level/indices/flush_synced.asciidoc @@ -0,0 +1,91 @@ +[[java-rest-high-flush-synced]] +=== Flush Synced API + +[[java-rest-high-flush-synced-request]] +==== Flush Synced Request + +A `SyncedFlushRequest` can be applied to one or more indices, or even on `_all` the indices: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request] +-------------------------------------------------- +<1> Flush synced one index +<2> Flush synced multiple indices +<3> Flush synced all the indices + +==== Optional arguments + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-request-indicesOptions] +-------------------------------------------------- +<1> Setting `IndicesOptions` controls how unavailable indices are resolved and +how wildcard expressions are expanded + +[[java-rest-high-flush-synced-sync]] +==== Synchronous Execution + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute] +-------------------------------------------------- + +[[java-rest-high-flush-synced-async]] +==== Asynchronous Execution + +The asynchronous execution of a flush request requires both the `SyncedFlushRequest` +instance and an `ActionListener` instance to be passed to the asynchronous +method: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-async] +-------------------------------------------------- +<1> The `SyncedFlushRequest` to execute and the `ActionListener` to use when +the execution completes + +The asynchronous method does not block and returns immediately. Once it is +completed the `ActionListener` is called back using the `onResponse` method +if the execution successfully completed or using the `onFailure` method if +it failed. + +A typical listener for `SyncedFlushResponse` looks like: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-execute-listener] +-------------------------------------------------- +<1> Called when the execution is successfully completed. The response is +provided as an argument +<2> Called in case of failure. The raised exception is provided as an argument + +[[java-rest-high-flush-synced-response]] +==== Flush Synced Response + +The returned `SyncedFlushResponse` allows to retrieve information about the +executed operation as follows: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-response] +-------------------------------------------------- +<1> Total number of shards hit by the flush request +<2> Number of shards where the flush has succeeded +<3> Number of shards where the flush has failed +<4> Name of the index whose results we are about to calculate. +<5> Total number of shards for index mentioned in 4. +<6> Successful shards for index mentioned in 4. +<7> Failed shards for index mentioned in 4. +<8> One of the failed shard ids of the failed index mentioned in 4. +<9> Reason for failure of copies of the shard mentioned in 8. +<10> JSON represented by a Map. Contains shard related information like id, state, version etc. +for the failed shard copies. If the entire shard failed then this returns an empty map. + +By default, if the indices were not found, an `ElasticsearchException` will be thrown: + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/IndicesClientDocumentationIT.java[flush-synced-notfound] +-------------------------------------------------- +<1> Do something if the indices to be flushed were not found diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 62b07d710b34a..cedeb90bbc3d3 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -67,6 +67,7 @@ Index Management:: * <> * <> * <> +* <> * <> * <> * <> @@ -89,6 +90,7 @@ include::indices/shrink_index.asciidoc[] include::indices/split_index.asciidoc[] include::indices/refresh.asciidoc[] include::indices/flush.asciidoc[] +include::indices/flush_synced.asciidoc[] include::indices/clear_cache.asciidoc[] include::indices/force_merge.asciidoc[] include::indices/rollover.asciidoc[] diff --git a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc index 688cf20c5320f..019094cfa3fe2 100644 --- a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc @@ -348,6 +348,34 @@ GET /_search \... will sort the composite bucket in descending order when comparing values from the `date_histogram` source and in ascending order when comparing values from the `terms` source. +====== Missing bucket + +By default documents without a value for a given source are ignored. +It is possible to include them in the response by setting `missing_bucket` to +`true` (defaults to `false`): + +[source,js] +-------------------------------------------------- +GET /_search +{ + "aggs" : { + "my_buckets": { + "composite" : { + "sources" : [ + { "product_name": { "terms" : { "field": "product", "missing_bucket": true } } } + ] + } + } + } +} +-------------------------------------------------- +// CONSOLE + +In the example above the source `product_name` will emit an explicit `null` value +for documents without a value for the field `product`. +The `order` specified in the source dictates whether the `null` values should rank +first (ascending order, `asc`) or last (descending order, `desc`). + ==== Size The `size` parameter can be set to define how many composite buckets should be returned. diff --git a/docs/reference/mapping/types/text.asciidoc b/docs/reference/mapping/types/text.asciidoc index 069e50fc79ac7..988a2ada38d7e 100644 --- a/docs/reference/mapping/types/text.asciidoc +++ b/docs/reference/mapping/types/text.asciidoc @@ -89,7 +89,7 @@ The following parameters are accepted by `text` fields: What information should be stored in the index, for search and highlighting purposes. Defaults to `positions`. -<>:: +<>:: If enabled, term prefixes of between 2 and 5 characters are indexed into a separate field. This allows prefix searches to run more efficiently, at @@ -138,7 +138,7 @@ The following parameters are accepted by `text` fields: [[index-prefix-config]] ==== Index Prefix configuration -Text fields may also index term prefixes to speed up prefix searches. The `index_prefix` +Text fields may also index term prefixes to speed up prefix searches. The `index_prefixes` parameter is configured as below. Either or both of `min_chars` and `max_chars` may be excluded. Both values are treated as inclusive @@ -151,7 +151,7 @@ PUT my_index "properties": { "full_name": { "type": "text", - "index_prefix" : { + "index_prefixes" : { "min_chars" : 1, <1> "max_chars" : 10 <2> } diff --git a/docs/reference/migration/migrate_6_0/aggregations.asciidoc b/docs/reference/migration/migrate_6_0/aggregations.asciidoc index 8f2d4892522fb..e4ee8b8181558 100644 --- a/docs/reference/migration/migrate_6_0/aggregations.asciidoc +++ b/docs/reference/migration/migrate_6_0/aggregations.asciidoc @@ -63,4 +63,9 @@ If the `format` in the mappings is not compatible with the numeric input value, The execution hints `global_ordinals_hash` and `global_ordinals_low_cardinality` are deprecated and should be replaced by `global_ordinals` which now internally choose whether it should remap global ordinals to dense ordinals or directly use the -segment ordinals. \ No newline at end of file +segment ordinals. + +==== `missing` is deprecated in the `composite` aggregation + +The `missing` option of the `composite` aggregation is deprecated, `missing_bucket` +should be used instead. \ No newline at end of file diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml index b8c89517ec119..7b29ddd2f0301 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml @@ -323,3 +323,32 @@ setup: - length: { aggregations.test.buckets: 2 } - length: { aggregations.test.after_key: 1 } - match: { aggregations.test.after_key.keyword: "foo" } + +--- +"Composite aggregation and array size": + - skip: + version: " - 6.3.99" + reason: starting in 6.4 the composite sources do not allocate arrays eagerly. + + - do: + search: + index: test + body: + aggregations: + test: + composite: + size: 1000000000 + sources: [ + { + "keyword": { + "terms": { + "field": "keyword", + } + } + } + ] + + - match: {hits.total: 6} + - length: { aggregations.test.buckets: 2 } + - length: { aggregations.test.after_key: 1 } + - match: { aggregations.test.after_key.keyword: "foo" } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/190_index_prefix_search.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/190_index_prefix_search.yml index 04954b56fb317..963bed70750a5 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/190_index_prefix_search.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/190_index_prefix_search.yml @@ -1,8 +1,8 @@ --- "search with index prefixes": - skip: - version: " - 6.2.99" - reason: index_prefix is only available as of 6.3.0 + version: " - 6.99.99" + reason: index_prefixes is only available as of 6.3.0 - do: indices.create: index: test @@ -12,7 +12,7 @@ properties: text: type: text - index_prefix: + index_prefixes: min_chars: 1 max_chars: 10 diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.status/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.status/10_basic.yml index 7e9d30830647b..e9742eb6cce69 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.status/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.status/10_basic.yml @@ -12,8 +12,8 @@ setup: --- "Get snapshot status": - skip: - version: " - 6.99.99" - reason: "backporting in progress: https://github.com/elastic/elasticsearch/pull/29602" + version: " - 6.3.99" + reason: "extra stats fields was introduced in 6.4.0" - do: indices.create: index: test_index diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index f6c190fee0922..0821b176e75e6 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -141,9 +141,10 @@ public void close() throws IOException { Path finalPath = stateLocation.resolve(fileName); try { Files.copy(finalStatePath, tmpPath); + IOUtils.fsync(tmpPath, false); // fsync the state file // we are on the same FileSystem / Partition here we can do an atomic move Files.move(tmpPath, finalPath, StandardCopyOption.ATOMIC_MOVE); - IOUtils.fsync(stateLocation, true); // we just fsync the dir here.. + IOUtils.fsync(stateLocation, true); } finally { Files.deleteIfExists(tmpPath); } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java index 664c908a47417..0e5d0fb131e15 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/TextFieldMapper.java @@ -156,7 +156,7 @@ public TextFieldMapper build(BuilderContext context) { PrefixFieldMapper prefixMapper = null; if (prefixFieldType != null) { if (fieldType().isSearchable() == false) { - throw new IllegalArgumentException("Cannot set index_prefix on unindexed field [" + name() + "]"); + throw new IllegalArgumentException("Cannot set index_prefixes on unindexed field [" + name() + "]"); } if (fieldType.indexOptions() == IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) { prefixFieldType.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS); @@ -203,7 +203,7 @@ public Mapper.Builder parse(String fieldName, Map node, ParserCo builder.fielddataFrequencyFilter(minFrequency, maxFrequency, minSegmentSize); DocumentMapperParser.checkNoRemainingFields(propName, frequencyFilter, parserContext.indexVersionCreated()); iterator.remove(); - } else if (propName.equals("index_prefix")) { + } else if (propName.equals("index_prefixes")) { Map indexPrefix = (Map) propNode; int minChars = XContentMapValues.nodeIntegerValue(indexPrefix.remove("min_chars"), Defaults.INDEX_PREFIX_MIN_CHARS); @@ -243,7 +243,7 @@ protected TokenStreamComponents wrapComponents(String fieldName, TokenStreamComp } } - private static final class PrefixFieldType extends StringFieldType { + static final class PrefixFieldType extends StringFieldType { final int minChars; final int maxChars; @@ -268,14 +268,14 @@ boolean accept(int length) { } void doXContent(XContentBuilder builder) throws IOException { - builder.startObject("index_prefix"); + builder.startObject("index_prefixes"); builder.field("min_chars", minChars); builder.field("max_chars", maxChars); builder.endObject(); } @Override - public MappedFieldType clone() { + public PrefixFieldType clone() { return new PrefixFieldType(name(), minChars, maxChars); } @@ -292,12 +292,16 @@ public String toString() { @Override public void checkCompatibility(MappedFieldType other, List conflicts, boolean strict) { super.checkCompatibility(other, conflicts, strict); - PrefixFieldType otherFieldType = (PrefixFieldType) other; - if (otherFieldType.minChars != this.minChars) { - conflicts.add("mapper [" + name() + "] has different min_chars values"); - } - if (otherFieldType.maxChars != this.maxChars) { - conflicts.add("mapper [" + name() + "] has different max_chars values"); + if (strict) { + PrefixFieldType otherFieldType = (PrefixFieldType) other; + if (otherFieldType.minChars != this.minChars) { + conflicts.add("mapper [" + name() + "] is used by multiple types. Set update_all_types to true to update " + + "[index_prefixes.min_chars] across all types."); + } + if (otherFieldType.maxChars != this.maxChars) { + conflicts.add("mapper [" + name() + "] is used by multiple types. Set update_all_types to true to update " + + "[index_prefixes.max_chars] across all types."); + } } } @@ -305,6 +309,22 @@ public void checkCompatibility(MappedFieldType other, List conflicts, bo public Query existsQuery(QueryShardContext context) { throw new UnsupportedOperationException(); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + PrefixFieldType that = (PrefixFieldType) o; + return minChars == that.minChars && + maxChars == that.maxChars; + } + + @Override + public int hashCode() { + + return Objects.hash(super.hashCode(), minChars, maxChars); + } } private static final class PrefixFieldMapper extends FieldMapper { @@ -355,6 +375,9 @@ protected TextFieldType(TextFieldType ref) { this.fielddataMinFrequency = ref.fielddataMinFrequency; this.fielddataMaxFrequency = ref.fielddataMaxFrequency; this.fielddataMinSegmentSize = ref.fielddataMinSegmentSize; + if (ref.prefixFieldType != null) { + this.prefixFieldType = ref.prefixFieldType.clone(); + } } public TextFieldType clone() { @@ -368,6 +391,7 @@ public boolean equals(Object o) { } TextFieldType that = (TextFieldType) o; return fielddata == that.fielddata + && Objects.equals(prefixFieldType, that.prefixFieldType) && fielddataMinFrequency == that.fielddataMinFrequency && fielddataMaxFrequency == that.fielddataMaxFrequency && fielddataMinSegmentSize == that.fielddataMinSegmentSize; @@ -375,7 +399,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(super.hashCode(), fielddata, + return Objects.hash(super.hashCode(), fielddata, prefixFieldType, fielddataMinFrequency, fielddataMaxFrequency, fielddataMinSegmentSize); } @@ -401,6 +425,10 @@ public void checkCompatibility(MappedFieldType other, conflicts.add("mapper [" + name() + "] is used by multiple types. Set update_all_types to true to update " + "[fielddata_frequency_filter.min_segment_size] across all types."); } + if (Objects.equals(this.prefixFieldType, ((TextFieldType) other).prefixFieldType) == false) { + conflicts.add("mapper [" + name() + "] is used by multiple types. Set update_all_types to true to update " + + "[index_prefixes] across all types."); + } } } @@ -445,6 +473,10 @@ void setPrefixFieldType(PrefixFieldType prefixFieldType) { this.prefixFieldType = prefixFieldType; } + public PrefixFieldType getPrefixFieldType() { + return this.prefixFieldType; + } + @Override public String typeName() { return CONTENT_TYPE; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java index bf73b6e199eaf..d1747eb464504 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java @@ -24,49 +24,93 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; -import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.StringFieldType; -import org.elasticsearch.index.mapper.TextFieldMapper; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.LeafBucketCollector; import java.io.IOException; +import java.util.function.LongConsumer; /** * A {@link SingleDimensionValuesSource} for binary source ({@link BytesRef}). */ class BinaryValuesSource extends SingleDimensionValuesSource { + private final LongConsumer breakerConsumer; private final CheckedFunction docValuesFunc; - private final BytesRef[] values; + private ObjectArray values; + private ObjectArray valueBuilders; private BytesRef currentValue; - BinaryValuesSource(MappedFieldType fieldType, CheckedFunction docValuesFunc, - DocValueFormat format, Object missing, int size, int reverseMul) { - super(format, fieldType, missing, size, reverseMul); + BinaryValuesSource(BigArrays bigArrays, LongConsumer breakerConsumer, + MappedFieldType fieldType, CheckedFunction docValuesFunc, + DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) { + super(bigArrays, format, fieldType, missingBucket, missing, size, reverseMul); + this.breakerConsumer = breakerConsumer; this.docValuesFunc = docValuesFunc; - this.values = new BytesRef[size]; + this.values = bigArrays.newObjectArray(Math.min(size, 100)); + this.valueBuilders = bigArrays.newObjectArray(Math.min(size, 100)); } @Override - public void copyCurrent(int slot) { - values[slot] = BytesRef.deepCopyOf(currentValue); + void copyCurrent(int slot) { + values = bigArrays.grow(values, slot+1); + valueBuilders = bigArrays.grow(valueBuilders, slot+1); + BytesRefBuilder builder = valueBuilders.get(slot); + int byteSize = builder == null ? 0 : builder.bytes().length; + if (builder == null) { + builder = new BytesRefBuilder(); + valueBuilders.set(slot, builder); + } + if (missingBucket && currentValue == null) { + values.set(slot, null); + } else { + assert currentValue != null; + builder.copyBytes(currentValue); + breakerConsumer.accept(builder.bytes().length - byteSize); + values.set(slot, builder.get()); + } } @Override - public int compare(int from, int to) { - return compareValues(values[from], values[to]); + int compare(int from, int to) { + if (missingBucket) { + if (values.get(from) == null) { + return values.get(to) == null ? 0 : -1 * reverseMul; + } else if (values.get(to) == null) { + return reverseMul; + } + } + return compareValues(values.get(from), values.get(to)); } @Override int compareCurrent(int slot) { - return compareValues(currentValue, values[slot]); + if (missingBucket) { + if (currentValue == null) { + return values.get(slot) == null ? 0 : -1 * reverseMul; + } else if (values.get(slot) == null) { + return reverseMul; + } + } + return compareValues(currentValue, values.get(slot)); } @Override int compareCurrentWithAfter() { + if (missingBucket) { + if (currentValue == null) { + return afterValue == null ? 0 : -1 * reverseMul; + } else if (afterValue == null) { + return reverseMul; + } + } return compareValues(currentValue, afterValue); } @@ -76,7 +120,9 @@ int compareValues(BytesRef v1, BytesRef v2) { @Override void setAfter(Comparable value) { - if (value.getClass() == String.class) { + if (missingBucket && value == null) { + afterValue = null; + } else if (value.getClass() == String.class) { afterValue = format.parseBytesRef(value.toString()); } else { throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName()); @@ -85,7 +131,7 @@ void setAfter(Comparable value) { @Override BytesRef toComparable(int slot) { - return values[slot]; + return values.get(slot); } @Override @@ -100,6 +146,9 @@ public void collect(int doc, long bucket) throws IOException { currentValue = dvs.nextValue(); next.collect(doc, bucket); } + } else if (missingBucket) { + currentValue = null; + next.collect(doc, bucket); } } }; @@ -130,5 +179,7 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer } @Override - public void close() {} + public void close() { + Releasables.close(values, valueBuilders); + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BitArray.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BitArray.java new file mode 100644 index 0000000000000..6b35d7d2e2e0a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BitArray.java @@ -0,0 +1,68 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; + +/** + * A bit array that is implemented using a growing {@link LongArray} + * created from {@link BigArrays}. + * The underlying long array grows lazily based on the biggest index + * that needs to be set. + */ +final class BitArray implements Releasable { + private final BigArrays bigArrays; + private LongArray bits; + + BitArray(BigArrays bigArrays, int initialSize) { + this.bigArrays = bigArrays; + this.bits = bigArrays.newLongArray(initialSize, true); + } + + public void set(int index) { + fill(index, true); + } + + public void clear(int index) { + fill(index, false); + } + + public boolean get(int index) { + int wordNum = index >> 6; + long bitmask = 1L << index; + return (bits.get(wordNum) & bitmask) != 0; + } + + private void fill(int index, boolean bit) { + int wordNum = index >> 6; + bits = bigArrays.grow(bits,wordNum+1); + long bitmask = 1L << index; + long value = bit ? bits.get(wordNum) | bitmask : bits.get(wordNum) & ~bitmask; + bits.set(wordNum, value); + } + + @Override + public void close() { + Releasables.close(bits); + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregation.java index 8147f94487f9b..b5b5218fc53a0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregation.java @@ -19,7 +19,6 @@ package org.elasticsearch.search.aggregations.bucket.composite; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; @@ -66,11 +65,7 @@ static XContentBuilder toXContentFragment(CompositeAggregation aggregation, XCon static void buildCompositeMap(String fieldName, Map composite, XContentBuilder builder) throws IOException { builder.startObject(fieldName); for (Map.Entry entry : composite.entrySet()) { - if (entry.getValue().getClass() == BytesRef.class) { - builder.field(entry.getKey(), ((BytesRef) entry.getValue()).utf8ToString()); - } else { - builder.field(entry.getKey(), entry.getValue()); - } + builder.field(entry.getKey(), entry.getValue()); } builder.endObject(); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java index 8a0b4eedfed81..482b8be1c4516 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java @@ -170,7 +170,9 @@ protected AggregatorFactory doBuild(SearchContext context, AggregatorFactory< throw new IllegalArgumentException("Missing value for [after." + sources.get(i).name() + "]"); } Object obj = after.get(sourceName); - if (obj instanceof Comparable) { + if (configs[i].missingBucket() && obj == null) { + values[i] = null; + } else if (obj instanceof Comparable) { values[i] = (Comparable) obj; } else { throw new IllegalArgumentException("Invalid value for [after." + sources.get(i).name() + diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index 472697abe788a..d94f3ab02c30a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -30,6 +30,7 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; import org.apache.lucene.util.RoaringDocIdSet; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.search.DocValueFormat; @@ -50,6 +51,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.LongUnaryOperator; import java.util.stream.Collectors; final class CompositeAggregator extends BucketsAggregator { @@ -59,9 +61,10 @@ final class CompositeAggregator extends BucketsAggregator { private final int[] reverseMuls; private final List formats; + private final SingleDimensionValuesSource[] sources; private final CompositeValuesCollectorQueue queue; - private final List entries; + private final List entries = new ArrayList<>(); private LeafReaderContext currentLeaf; private RoaringDocIdSet.Builder docIdSetBuilder; private BucketCollector deferredCollectors; @@ -74,19 +77,19 @@ final class CompositeAggregator extends BucketsAggregator { this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).collect(Collectors.toList()); this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray(); this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList()); - final SingleDimensionValuesSource[] sources = - createValuesSources(context.bigArrays(), context.searcher().getIndexReader(), context.query(), sourceConfigs, size); - this.queue = new CompositeValuesCollectorQueue(sources, size); - this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query()); - if (rawAfterKey != null) { - queue.setAfter(rawAfterKey.values()); + this.sources = new SingleDimensionValuesSource[sourceConfigs.length]; + for (int i = 0; i < sourceConfigs.length; i++) { + this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), + context.query(), sourceConfigs[i], size, i); } - this.entries = new ArrayList<>(); + this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); + this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query()); } @Override protected void doClose() { Releasables.close(queue); + Releasables.close(sources); } @Override @@ -256,94 +259,93 @@ public void collect(int doc, long zeroBucket) throws IOException { }; } - private static SingleDimensionValuesSource[] createValuesSources(BigArrays bigArrays, IndexReader reader, Query query, - CompositeValuesSourceConfig[] configs, int size) { - final SingleDimensionValuesSource[] sources = new SingleDimensionValuesSource[configs.length]; - for (int i = 0; i < sources.length; i++) { - final int reverseMul = configs[i].reverseMul(); - if (configs[i].valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) { - ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) configs[i].valuesSource(); - sources[i] = new GlobalOrdinalValuesSource( + private SingleDimensionValuesSource createValuesSource(BigArrays bigArrays, IndexReader reader, Query query, + CompositeValuesSourceConfig config, int sortRank, int size) { + + final int reverseMul = config.reverseMul(); + if (config.valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) { + ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) config.valuesSource(); + SingleDimensionValuesSource source = new GlobalOrdinalValuesSource( + bigArrays, + config.fieldType(), + vs::globalOrdinalsValues, + config.format(), + config.missingBucket(), + config.missing(), + size, + reverseMul + ); + + if (sortRank == 0 && source.createSortedDocsProducerOrNull(reader, query) != null) { + // this the leading source and we can optimize it with the sorted docs producer but + // we don't want to use global ordinals because the number of visited documents + // should be low and global ordinals need one lookup per visited term. + Releasables.close(source); + return new BinaryValuesSource( bigArrays, - configs[i].fieldType(), - vs::globalOrdinalsValues, - configs[i].format(), - configs[i].missing(), + this::addRequestCircuitBreakerBytes, + config.fieldType(), + vs::bytesValues, + config.format(), + config.missingBucket(), + config.missing(), size, reverseMul ); + } else { + return source; + } + } else if (config.valuesSource() instanceof ValuesSource.Bytes) { + ValuesSource.Bytes vs = (ValuesSource.Bytes) config.valuesSource(); + return new BinaryValuesSource( + bigArrays, + this::addRequestCircuitBreakerBytes, + config.fieldType(), + vs::bytesValues, + config.format(), + config.missingBucket(), + config.missing(), + size, + reverseMul + ); - if (i == 0 && sources[i].createSortedDocsProducerOrNull(reader, query) != null) { - // this the leading source and we can optimize it with the sorted docs producer but - // we don't want to use global ordinals because the number of visited documents - // should be low and global ordinals need one lookup per visited term. - Releasables.close(sources[i]); - sources[i] = new BinaryValuesSource( - configs[i].fieldType(), - vs::bytesValues, - configs[i].format(), - configs[i].missing(), - size, - reverseMul - ); - } - } else if (configs[i].valuesSource() instanceof ValuesSource.Bytes) { - ValuesSource.Bytes vs = (ValuesSource.Bytes) configs[i].valuesSource(); - sources[i] = new BinaryValuesSource( - configs[i].fieldType(), - vs::bytesValues, - configs[i].format(), - configs[i].missing(), + } else if (config.valuesSource() instanceof ValuesSource.Numeric) { + final ValuesSource.Numeric vs = (ValuesSource.Numeric) config.valuesSource(); + if (vs.isFloatingPoint()) { + return new DoubleValuesSource( + bigArrays, + config.fieldType(), + vs::doubleValues, + config.format(), + config.missingBucket(), + config.missing(), size, reverseMul ); - } else if (configs[i].valuesSource() instanceof ValuesSource.Numeric) { - final ValuesSource.Numeric vs = (ValuesSource.Numeric) configs[i].valuesSource(); - if (vs.isFloatingPoint()) { - sources[i] = new DoubleValuesSource( - bigArrays, - configs[i].fieldType(), - vs::doubleValues, - configs[i].format(), - configs[i].missing(), - size, - reverseMul - ); - + } else { + final LongUnaryOperator rounding; + if (vs instanceof RoundingValuesSource) { + rounding = ((RoundingValuesSource) vs)::round; } else { - if (vs instanceof RoundingValuesSource) { - sources[i] = new LongValuesSource( - bigArrays, - configs[i].fieldType(), - vs::longValues, - ((RoundingValuesSource) vs)::round, - configs[i].format(), - configs[i].missing(), - size, - reverseMul - ); - - } else { - sources[i] = new LongValuesSource( - bigArrays, - configs[i].fieldType(), - vs::longValues, - (value) -> value, - configs[i].format(), - configs[i].missing(), - size, - reverseMul - ); - - } + rounding = LongUnaryOperator.identity(); } - } else { - throw new IllegalArgumentException("Unknown value source: " + configs[i].valuesSource().getClass().getName() + - " for field: " + sources[i].fieldType.name()); + return new LongValuesSource( + bigArrays, + config.fieldType(), + vs::longValues, + rounding, + config.format(), + config.missingBucket(), + config.missing(), + size, + reverseMul + ); } + } else { + throw new IllegalArgumentException("Unknown values source type: " + config.valuesSource().getClass().getName() + + " for source: " + config.name()); } - return sources; } private static class Entry { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index 5be4508612ece..b7b29a8841489 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -22,10 +22,11 @@ import org.apache.lucene.index.LeafReaderContext; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.search.aggregations.LeafBucketCollector; import java.io.IOException; -import java.util.Arrays; import java.util.Set; import java.util.TreeMap; @@ -36,29 +37,33 @@ final class CompositeValuesCollectorQueue implements Releasable { // the slot for the current candidate private static final int CANDIDATE_SLOT = Integer.MAX_VALUE; + private final BigArrays bigArrays; private final int maxSize; private final TreeMap keys; private final SingleDimensionValuesSource[] arrays; - private final int[] docCounts; - private boolean afterValueSet = false; + private IntArray docCounts; + private boolean afterKeyIsSet = false; /** * Constructs a composite queue with the specified size and sources. * * @param sources The list of {@link CompositeValuesSourceConfig} to build the composite buckets. * @param size The number of composite buckets to keep. + * @param afterKey */ - CompositeValuesCollectorQueue(SingleDimensionValuesSource[] sources, int size) { + CompositeValuesCollectorQueue(BigArrays bigArrays, SingleDimensionValuesSource[] sources, int size, CompositeKey afterKey) { + this.bigArrays = bigArrays; this.maxSize = size; this.arrays = sources; - this.docCounts = new int[size]; this.keys = new TreeMap<>(this::compare); - } - - void clear() { - keys.clear(); - Arrays.fill(docCounts, 0); - afterValueSet = false; + if (afterKey != null) { + assert afterKey.size() == sources.length; + afterKeyIsSet = true; + for (int i = 0; i < afterKey.size(); i++) { + sources[i].setAfter(afterKey.get(i)); + } + } + this.docCounts = bigArrays.newIntArray(1, false); } /** @@ -94,7 +99,7 @@ Integer compareCurrent() { * Returns the lowest value (exclusive) of the leading source. */ Comparable getLowerValueLeadSource() { - return afterValueSet ? arrays[0].getAfter() : null; + return afterKeyIsSet ? arrays[0].getAfter() : null; } /** @@ -107,7 +112,7 @@ Comparable getUpperValueLeadSource() throws IOException { * Returns the document count in slot. */ int getDocCount(int slot) { - return docCounts[slot]; + return docCounts.get(slot); } /** @@ -117,7 +122,8 @@ private void copyCurrent(int slot) { for (int i = 0; i < arrays.length; i++) { arrays[i].copyCurrent(slot); } - docCounts[slot] = 1; + docCounts = bigArrays.grow(docCounts, slot+1); + docCounts.set(slot, 1); } /** @@ -134,17 +140,6 @@ int compare(int slot1, int slot2) { return 0; } - /** - * Sets the after values for this comparator. - */ - void setAfter(Comparable[] values) { - assert values.length == arrays.length; - afterValueSet = true; - for (int i = 0; i < arrays.length; i++) { - arrays[i].setAfter(values[i]); - } - } - /** * Compares the after values with the values in slot. */ @@ -207,10 +202,10 @@ int addIfCompetitive() { Integer topSlot = compareCurrent(); if (topSlot != null) { // this key is already in the top N, skip it - docCounts[topSlot] += 1; + docCounts.increment(topSlot, 1); return topSlot; } - if (afterValueSet && compareCurrentWithAfter() <= 0) { + if (afterKeyIsSet && compareCurrentWithAfter() <= 0) { // this key is greater than the top value collected in the previous round, skip it return -1; } @@ -239,9 +234,8 @@ int addIfCompetitive() { return newSlot; } - @Override public void close() { - Releasables.close(arrays); + Releasables.close(docCounts); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java index 994f8c43a83ac..6829579d1f58b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java @@ -23,6 +23,8 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryShardException; @@ -40,10 +42,14 @@ * A {@link ValuesSource} builder for {@link CompositeAggregationBuilder} */ public abstract class CompositeValuesSourceBuilder> implements Writeable, ToXContentFragment { + private static final DeprecationLogger DEPRECATION_LOGGER = + new DeprecationLogger(Loggers.getLogger(CompositeValuesSourceBuilder.class)); + protected final String name; private String field = null; private Script script = null; private ValueType valueType = null; + private boolean missingBucket = false; private Object missing = null; private SortOrder order = SortOrder.ASC; private String format = null; @@ -66,6 +72,11 @@ public abstract class CompositeValuesSourceBuilder config = ValuesSourceConfig.resolve(context.getQueryShardContext(), valueType, field, script, missing, null, format); - if (config.unmapped() && field != null && config.missing() == null) { + if (config.unmapped() && field != null && missing == null && missingBucket == false) { // this source cannot produce any values so we refuse to build - // since composite buckets are not created on null values + // since composite buckets are not created on null values by default. + throw new QueryShardException(context.getQueryShardContext(), + "failed to find field [" + field + "] and [missing_bucket] is not set"); + } + if (missingBucket && missing != null) { throw new QueryShardException(context.getQueryShardContext(), - "failed to find field [" + field + "] and [missing] is not provided"); + "cannot use [missing] option in conjunction with [missing_bucket]"); } return innerBuild(context, config); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java index aad713b305d02..c0d3098247788 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java @@ -33,6 +33,7 @@ class CompositeValuesSourceConfig { private final DocValueFormat format; private final int reverseMul; private final Object missing; + private final boolean missingBucket; /** * Creates a new {@link CompositeValuesSourceConfig}. @@ -44,12 +45,14 @@ class CompositeValuesSourceConfig { * @param missing The missing value or null if documents with missing value should be ignored. */ CompositeValuesSourceConfig(String name, @Nullable MappedFieldType fieldType, ValuesSource vs, DocValueFormat format, - SortOrder order, @Nullable Object missing) { + SortOrder order, boolean missingBucket, @Nullable Object missing) { this.name = name; this.fieldType = fieldType; this.vs = vs; this.format = format; this.reverseMul = order == SortOrder.ASC ? 1 : -1; + this.missingBucket = missingBucket; + assert missingBucket == false || missing == null; this.missing = missing; } @@ -89,6 +92,13 @@ Object missing() { return missing; } + /** + * If true, an explicit `null bucket represents documents with missing values. + */ + boolean missingBucket() { + return missingBucket; + } + /** * The sort order for the values source (e.g. -1 for descending and 1 for ascending). */ diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceParserHelper.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceParserHelper.java index c5d3d6f2fa6ff..8dae1fecc241f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceParserHelper.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceParserHelper.java @@ -38,9 +38,9 @@ static , T> void declareValuesSource ValueType targetValueType) { objectParser.declareField(VB::field, XContentParser::text, new ParseField("field"), ObjectParser.ValueType.STRING); - objectParser.declareField(VB::missing, XContentParser::objectText, new ParseField("missing"), ObjectParser.ValueType.VALUE); + objectParser.declareBoolean(VB::missingBucket, new ParseField("missing_bucket")); objectParser.declareField(VB::valueType, p -> { ValueType valueType = ValueType.resolveForScript(p.text()); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index 0b373f15d5ccb..b620c2bf0fce7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -226,7 +226,7 @@ protected CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSo // is specified in the builder. final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format(); final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; - return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), missing()); + return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), missingBucket(), missing()); } else { throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java index 0f74544fe2bc5..2288d69146399 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java @@ -38,34 +38,67 @@ */ class DoubleValuesSource extends SingleDimensionValuesSource { private final CheckedFunction docValuesFunc; - private final DoubleArray values; + private final BitArray bits; + private DoubleArray values; private double currentValue; + private boolean missingCurrentValue; DoubleValuesSource(BigArrays bigArrays, MappedFieldType fieldType, CheckedFunction docValuesFunc, - DocValueFormat format, Object missing, int size, int reverseMul) { - super(format, fieldType, missing, size, reverseMul); + DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) { + super(bigArrays, format, fieldType, missingBucket, missing, size, reverseMul); this.docValuesFunc = docValuesFunc; - this.values = bigArrays.newDoubleArray(size, false); + this.bits = missingBucket ? new BitArray(bigArrays, 100) : null; + this.values = bigArrays.newDoubleArray(Math.min(size, 100), false); } @Override void copyCurrent(int slot) { - values.set(slot, currentValue); + values = bigArrays.grow(values, slot+1); + if (missingBucket && missingCurrentValue) { + bits.clear(slot); + } else { + assert missingCurrentValue == false; + if (missingBucket) { + bits.set(slot); + } + values.set(slot, currentValue); + } } @Override int compare(int from, int to) { + if (missingBucket) { + if (bits.get(from) == false) { + return bits.get(to) ? -1 * reverseMul : 0; + } else if (bits.get(to) == false) { + return reverseMul; + } + } return compareValues(values.get(from), values.get(to)); } @Override int compareCurrent(int slot) { + if (missingBucket) { + if (missingCurrentValue) { + return bits.get(slot) ? -1 * reverseMul : 0; + } else if (bits.get(slot) == false) { + return reverseMul; + } + } return compareValues(currentValue, values.get(slot)); } @Override int compareCurrentWithAfter() { + if (missingBucket) { + if (missingCurrentValue) { + return afterValue != null ? -1 * reverseMul : 0; + } else if (afterValue == null) { + return reverseMul; + } + } return compareValues(currentValue, afterValue); } @@ -75,7 +108,9 @@ private int compareValues(double v1, double v2) { @Override void setAfter(Comparable value) { - if (value instanceof Number) { + if (missingBucket && value == null) { + afterValue = null; + } else if (value instanceof Number) { afterValue = ((Number) value).doubleValue(); } else { afterValue = format.parseDouble(value.toString(), false, () -> { @@ -86,6 +121,10 @@ void setAfter(Comparable value) { @Override Double toComparable(int slot) { + if (missingBucket && bits.get(slot) == false) { + return null; + } + assert missingBucket == false || bits.get(slot); return values.get(slot); } @@ -99,8 +138,12 @@ public void collect(int doc, long bucket) throws IOException { int num = dvs.docValueCount(); for (int i = 0; i < num; i++) { currentValue = dvs.nextValue(); + missingCurrentValue = false; next.collect(doc, bucket); } + } else if (missingBucket) { + missingCurrentValue = true; + next.collect(doc, bucket); } } }; @@ -127,6 +170,6 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer @Override public void close() { - Releasables.close(values); + Releasables.close(values, bits); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java index a83f92e21fdc8..e24558d761962 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java @@ -43,7 +43,7 @@ */ class GlobalOrdinalValuesSource extends SingleDimensionValuesSource { private final CheckedFunction docValuesFunc; - private final LongArray values; + private LongArray values; private SortedSetDocValues lookup; private long currentValue; private Long afterValueGlobalOrd; @@ -52,16 +52,17 @@ class GlobalOrdinalValuesSource extends SingleDimensionValuesSource { private long lastLookupOrd = -1; private BytesRef lastLookupValue; - GlobalOrdinalValuesSource(BigArrays bigArrays, - MappedFieldType type, CheckedFunction docValuesFunc, - DocValueFormat format, Object missing, int size, int reverseMul) { - super(format, type, missing, size, reverseMul); + GlobalOrdinalValuesSource(BigArrays bigArrays, MappedFieldType type, + CheckedFunction docValuesFunc, + DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) { + super(bigArrays, format, type, missingBucket, missing, size, reverseMul); this.docValuesFunc = docValuesFunc; - this.values = bigArrays.newLongArray(size, false); + this.values = bigArrays.newLongArray(Math.min(size, 100), false); } @Override void copyCurrent(int slot) { + values = bigArrays.grow(values, slot+1); values.set(slot, currentValue); } @@ -89,7 +90,10 @@ int compareCurrentWithAfter() { @Override void setAfter(Comparable value) { - if (value.getClass() == String.class) { + if (missingBucket && value == null) { + afterValue = null; + afterValueGlobalOrd = -1L; + } else if (value.getClass() == String.class) { afterValue = format.parseBytesRef(value.toString()); } else { throw new IllegalArgumentException("invalid value, expected string, got " + value.getClass().getSimpleName()); @@ -99,10 +103,12 @@ void setAfter(Comparable value) { @Override BytesRef toComparable(int slot) throws IOException { long globalOrd = values.get(slot); - if (globalOrd == lastLookupOrd) { + if (missingBucket && globalOrd == -1) { + return null; + } else if (globalOrd == lastLookupOrd) { return lastLookupValue; } else { - lastLookupOrd= globalOrd; + lastLookupOrd = globalOrd; lastLookupValue = BytesRef.deepCopyOf(lookup.lookupOrd(values.get(slot))); return lastLookupValue; } @@ -123,6 +129,9 @@ public void collect(int doc, long bucket) throws IOException { currentValue = ord; next.collect(doc, bucket); } + } else if (missingBucket) { + currentValue = -1; + next.collect(doc, bucket); } } }; @@ -143,7 +152,7 @@ LeafBucketCollector getLeafCollector(Comparable value, LeafReaderContext cont @Override public void collect(int doc, long bucket) throws IOException { - if (!currentValueIsSet) { + if (currentValueIsSet == false) { if (dvs.advanceExact(doc)) { long ord; while ((ord = dvs.nextOrd()) != NO_MORE_ORDS) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java index fb3585c87391a..76e2bc823ffd4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java @@ -115,7 +115,7 @@ protected CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSo ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig; final HistogramValuesSource vs = new HistogramValuesSource(numeric, interval); final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; - return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(), missing()); + return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(), missingBucket(), missing()); } else { throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index c9cb320d80d99..1428a31a8dedc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -332,6 +332,14 @@ InternalBucket reduce(List buckets, ReduceContext reduceContext) @Override public int compareKey(InternalBucket other) { for (int i = 0; i < key.size(); i++) { + if (key.get(i) == null) { + if (other.key.get(i) == null) { + continue; + } + return -1 * reverseMuls[i]; + } else if (other.key.get(i) == null) { + return reverseMuls[i]; + } assert key.get(i).getClass() == other.key.get(i).getClass(); @SuppressWarnings("unchecked") int cmp = ((Comparable) key.get(i)).compareTo(other.key.get(i)) * reverseMuls[i]; @@ -357,26 +365,29 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws * for numbers and a string for {@link BytesRef}s. */ static Object formatObject(Object obj, DocValueFormat format) { + if (obj == null) { + return null; + } if (obj.getClass() == BytesRef.class) { BytesRef value = (BytesRef) obj; if (format == DocValueFormat.RAW) { return value.utf8ToString(); } else { - return format.format((BytesRef) obj); + return format.format(value); } } else if (obj.getClass() == Long.class) { - Long value = (Long) obj; + long value = (long) obj; if (format == DocValueFormat.RAW) { return value; } else { return format.format(value); } } else if (obj.getClass() == Double.class) { - Double value = (Double) obj; + double value = (double) obj; if (format == DocValueFormat.RAW) { return value; } else { - return format.format((Double) obj); + return format.format(value); } } return obj; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java index 20e1fa4794786..bdb6d3d0b7156 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -45,38 +45,73 @@ * A {@link SingleDimensionValuesSource} for longs. */ class LongValuesSource extends SingleDimensionValuesSource { + private final BigArrays bigArrays; private final CheckedFunction docValuesFunc; private final LongUnaryOperator rounding; - private final LongArray values; + private BitArray bits; + private LongArray values; private long currentValue; + private boolean missingCurrentValue; - LongValuesSource(BigArrays bigArrays, MappedFieldType fieldType, - CheckedFunction docValuesFunc, - LongUnaryOperator rounding, DocValueFormat format, Object missing, int size, int reverseMul) { - super(format, fieldType, missing, size, reverseMul); + LongValuesSource(BigArrays bigArrays, + MappedFieldType fieldType, CheckedFunction docValuesFunc, + LongUnaryOperator rounding, DocValueFormat format, boolean missingBucket, Object missing, int size, int reverseMul) { + super(bigArrays, format, fieldType, missingBucket, missing, size, reverseMul); + this.bigArrays = bigArrays; this.docValuesFunc = docValuesFunc; this.rounding = rounding; - this.values = bigArrays.newLongArray(size, false); + this.bits = missingBucket ? new BitArray(bigArrays, Math.min(size, 100)) : null; + this.values = bigArrays.newLongArray(Math.min(size, 100), false); } @Override void copyCurrent(int slot) { - values.set(slot, currentValue); + values = bigArrays.grow(values, slot+1); + if (missingBucket && missingCurrentValue) { + bits.clear(slot); + } else { + assert missingCurrentValue == false; + if (missingBucket) { + bits.set(slot); + } + values.set(slot, currentValue); + } } @Override int compare(int from, int to) { + if (missingBucket) { + if (bits.get(from) == false) { + return bits.get(to) ? -1 * reverseMul : 0; + } else if (bits.get(to) == false) { + return reverseMul; + } + } return compareValues(values.get(from), values.get(to)); } @Override int compareCurrent(int slot) { + if (missingBucket) { + if (missingCurrentValue) { + return bits.get(slot) ? -1 * reverseMul : 0; + } else if (bits.get(slot) == false) { + return reverseMul; + } + } return compareValues(currentValue, values.get(slot)); } @Override int compareCurrentWithAfter() { + if (missingBucket) { + if (missingCurrentValue) { + return afterValue != null ? -1 * reverseMul : 0; + } else if (afterValue == null) { + return reverseMul; + } + } return compareValues(currentValue, afterValue); } @@ -86,7 +121,9 @@ private int compareValues(long v1, long v2) { @Override void setAfter(Comparable value) { - if (value instanceof Number) { + if (missingBucket && value == null) { + afterValue = null; + } else if (value instanceof Number) { afterValue = ((Number) value).longValue(); } else { // for date histogram source with "format", the after value is formatted @@ -99,6 +136,9 @@ void setAfter(Comparable value) { @Override Long toComparable(int slot) { + if (missingBucket && bits.get(slot) == false) { + return null; + } return values.get(slot); } @@ -112,8 +152,12 @@ public void collect(int doc, long bucket) throws IOException { int num = dvs.docValueCount(); for (int i = 0; i < num; i++) { currentValue = dvs.nextValue(); + missingCurrentValue = false; next.collect(doc, bucket); } + } else if (missingBucket) { + missingCurrentValue = true; + next.collect(doc, bucket); } } }; @@ -182,6 +226,6 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer @Override public void close() { - Releasables.close(values); + Releasables.close(values, bits); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java index bb7314eed147f..0781ed5d94db7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java @@ -25,6 +25,7 @@ import org.apache.lucene.search.Query; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.LeafBucketCollector; @@ -36,11 +37,13 @@ * A source that can record and compare values of similar type. */ abstract class SingleDimensionValuesSource> implements Releasable { + protected final BigArrays bigArrays; protected final DocValueFormat format; @Nullable protected final MappedFieldType fieldType; @Nullable protected final Object missing; + protected final boolean missingBucket; protected final int size; protected final int reverseMul; @@ -50,17 +53,23 @@ abstract class SingleDimensionValuesSource> implements R /** * Creates a new {@link SingleDimensionValuesSource}. * + * @param bigArrays The big arrays object. * @param format The format of the source. * @param fieldType The field type or null if the source is a script. + * @param missingBucket If true, an explicit `null bucket represents documents with missing values. * @param missing The missing value or null if documents with missing value should be ignored. * @param size The number of values to record. * @param reverseMul -1 if the natural order ({@link SortOrder#ASC} should be reversed. */ - SingleDimensionValuesSource(DocValueFormat format, @Nullable MappedFieldType fieldType, @Nullable Object missing, + SingleDimensionValuesSource(BigArrays bigArrays, DocValueFormat format, + @Nullable MappedFieldType fieldType, boolean missingBucket, @Nullable Object missing, int size, int reverseMul) { + assert missing == null || missingBucket == false; + this.bigArrays = bigArrays; this.format = format; this.fieldType = fieldType; this.missing = missing; + this.missingBucket = missingBucket; this.size = size; this.reverseMul = reverseMul; this.afterValue = null; @@ -139,6 +148,7 @@ abstract LeafBucketCollector getLeafCollector(Comparable value, protected boolean checkIfSortedDocsIsApplicable(IndexReader reader, MappedFieldType fieldType) { if (fieldType == null || missing != null || + (missingBucket && afterValue == null) || fieldType.indexOptions() == IndexOptions.NONE || // inverse of the natural order reverseMul == -1) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java index f9d9877e320b4..2c0c6188f5c07 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsSortedDocsProducer.java @@ -61,8 +61,9 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), terms) : null; PostingsEnum reuse = null; boolean first = true; + final BytesRef upper = upperValue == null ? null : BytesRef.deepCopyOf(upperValue); do { - if (upperValue != null && upperValue.compareTo(te.term()) < 0) { + if (upper != null && upper.compareTo(te.term()) < 0) { break; } reuse = te.postings(reuse, PostingsEnum.NONE); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java index 60fcf43a086fb..04d99d9652a50 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java @@ -93,6 +93,6 @@ protected CompositeValuesSourceConfig innerBuild(SearchContext context, ValuesSo } else { format = config.format(); } - return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missing()); + return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missingBucket(), missing()); } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java b/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java index 0fccd625b764c..a0e6d309c75bc 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java @@ -607,7 +607,7 @@ public void testIndexPrefixIndexTypes() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix").endObject() + .startObject("index_prefixes").endObject() .field("index_options", "offsets") .endObject().endObject().endObject().endObject()); @@ -623,7 +623,7 @@ public void testIndexPrefixIndexTypes() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix").endObject() + .startObject("index_prefixes").endObject() .field("index_options", "positions") .endObject().endObject().endObject().endObject()); @@ -640,7 +640,7 @@ public void testIndexPrefixIndexTypes() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix").endObject() + .startObject("index_prefixes").endObject() .field("term_vector", "with_positions_offsets") .endObject().endObject().endObject().endObject()); @@ -657,7 +657,7 @@ public void testIndexPrefixIndexTypes() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix").endObject() + .startObject("index_prefixes").endObject() .field("term_vector", "with_positions") .endObject().endObject().endObject().endObject()); @@ -682,7 +682,7 @@ public void testIndexPrefixMapping() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix") + .startObject("index_prefixes") .field("min_chars", 1) .field("max_chars", 10) .endObject() @@ -716,7 +716,7 @@ public void testIndexPrefixMapping() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix").endObject() + .startObject("index_prefixes").endObject() .endObject().endObject() .endObject().endObject()); CompressedXContent json = new CompressedXContent(mapping); @@ -734,25 +734,6 @@ public void testIndexPrefixMapping() throws IOException { Query q6 = mapper.mappers().getMapper("field").fieldType().prefixQuery("goings", CONSTANT_SCORE_REWRITE, queryShardContext); assertThat(q6, instanceOf(PrefixQuery.class)); - - indexService.mapperService().merge("type", json, MergeReason.MAPPING_UPDATE, true); - - String badUpdate = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type") - .startObject("properties").startObject("field") - .field("type", "text") - .field("analyzer", "english") - .startObject("index_prefix") - .field("min_chars", 1) - .field("max_chars", 10) - .endObject() - .endObject().endObject() - .endObject().endObject()); - - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { - indexService.mapperService() - .merge("type", new CompressedXContent(badUpdate), MergeReason.MAPPING_UPDATE, true); - }); - assertThat(e.getMessage(), containsString("mapper [field._index_prefix] has different min_chars values")); } { @@ -760,7 +741,7 @@ public void testIndexPrefixMapping() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix") + .startObject("index_prefixes") .field("min_chars", 1) .field("max_chars", 10) .endObject() @@ -783,7 +764,7 @@ public void testIndexPrefixMapping() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix") + .startObject("index_prefixes") .field("min_chars", 11) .field("max_chars", 10) .endObject() @@ -800,7 +781,7 @@ public void testIndexPrefixMapping() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix") + .startObject("index_prefixes") .field("min_chars", 0) .field("max_chars", 10) .endObject() @@ -817,7 +798,7 @@ public void testIndexPrefixMapping() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .startObject("index_prefix") + .startObject("index_prefixes") .field("min_chars", 1) .field("max_chars", 25) .endObject() @@ -834,13 +815,13 @@ public void testIndexPrefixMapping() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("analyzer", "english") - .field("index_prefix", (String) null) + .field("index_prefixes", (String) null) .endObject().endObject() .endObject().endObject()); MapperParsingException e = expectThrows(MapperParsingException.class, () -> parser.parse("type", new CompressedXContent(badConfigMapping)) ); - assertThat(e.getMessage(), containsString("[index_prefix] must not have a [null] value")); + assertThat(e.getMessage(), containsString("[index_prefixes] must not have a [null] value")); } { @@ -848,13 +829,13 @@ public void testIndexPrefixMapping() throws IOException { .startObject("properties").startObject("field") .field("type", "text") .field("index", "false") - .startObject("index_prefix").endObject() + .startObject("index_prefixes").endObject() .endObject().endObject() .endObject().endObject()); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> parser.parse("type", new CompressedXContent(badConfigMapping)) ); - assertThat(e.getMessage(), containsString("Cannot set index_prefix on unindexed field [field]")); + assertThat(e.getMessage(), containsString("Cannot set index_prefixes on unindexed field [field]")); } } } diff --git a/server/src/test/java/org/elasticsearch/index/mapper/TextFieldTypeTests.java b/server/src/test/java/org/elasticsearch/index/mapper/TextFieldTypeTests.java index 895bb97e16665..815e946e023d8 100644 --- a/server/src/test/java/org/elasticsearch/index/mapper/TextFieldTypeTests.java +++ b/server/src/test/java/org/elasticsearch/index/mapper/TextFieldTypeTests.java @@ -71,6 +71,19 @@ public void modify(MappedFieldType ft) { tft.setFielddataMinSegmentSize(1000); } }); + addModifier(new Modifier("index_prefixes", true) { + @Override + public void modify(MappedFieldType ft) { + TextFieldMapper.TextFieldType tft = (TextFieldMapper.TextFieldType)ft; + TextFieldMapper.PrefixFieldType pft = tft.getPrefixFieldType(); + if (pft == null) { + tft.setPrefixFieldType(new TextFieldMapper.PrefixFieldType(ft.name(), 3, 3)); + } + else { + tft.setPrefixFieldType(null); + } + } + }); } public void testTermQuery() { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/BitArrayTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/BitArrayTests.java new file mode 100644 index 0000000000000..1806080260f28 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/BitArrayTests.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket.composite; + +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class BitArrayTests extends ESTestCase { + public void testRandom() { + try (BitArray bitArray = new BitArray(BigArrays.NON_RECYCLING_INSTANCE, 1)) { + int numBits = randomIntBetween(1000, 10000); + for (int step = 0; step < 3; step++) { + boolean[] bits = new boolean[numBits]; + List slots = new ArrayList<>(); + for (int i = 0; i < numBits; i++) { + bits[i] = randomBoolean(); + slots.add(i); + } + Collections.shuffle(slots, random()); + for (int i : slots) { + if (bits[i]) { + bitArray.set(i); + } else { + bitArray.clear(i); + } + } + for (int i = 0; i < numBits; i++) { + assertEquals(bitArray.get(i), bits[i]); + } + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilderTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilderTests.java index ae28d8f9304a9..ac985660399d7 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilderTests.java @@ -44,6 +44,9 @@ private DateHistogramValuesSourceBuilder randomDateHistogramSourceBuilder() { if (randomBoolean()) { histo.timeZone(randomDateTimeZone()); } + if (randomBoolean()) { + histo.missingBucket(true); + } return histo; } @@ -55,6 +58,9 @@ private TermsValuesSourceBuilder randomTermsSourceBuilder() { terms.script(new Script(randomAlphaOfLengthBetween(10, 20))); } terms.order(randomFrom(SortOrder.values())); + if (randomBoolean()) { + terms.missingBucket(true); + } return terms; } @@ -65,6 +71,9 @@ private HistogramValuesSourceBuilder randomHistogramSourceBuilder() { } else { histo.script(new Script(randomAlphaOfLengthBetween(10, 20))); } + if (randomBoolean()) { + histo.missingBucket(true); + } histo.interval(randomDoubleBetween(Math.nextUp(0), Double.MAX_VALUE, false)); return histo; } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index 900b0b627b6f2..856f291a08a3b 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -136,12 +136,25 @@ public void testUnmappedField() throws Exception { IndexSearcher searcher = new IndexSearcher(new MultiReader()); QueryShardException exc = expectThrows(QueryShardException.class, () -> createAggregatorFactory(builder, searcher)); - assertThat(exc.getMessage(), containsString("failed to find field [unknown] and [missing] is not provided")); - // should work when missing is provided - terms.missing("missing"); + assertThat(exc.getMessage(), containsString("failed to find field [unknown] and [missing_bucket] is not set")); + // should work when missing_bucket is set + terms.missingBucket(true); createAggregatorFactory(builder, searcher); } + public void testMissingBucket() throws Exception { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder(randomAlphaOfLengthBetween(5, 10)) + .field("unknown") + .missingBucket(true) + .missing("MISSING"); + CompositeAggregationBuilder builder = new CompositeAggregationBuilder("test", Collections.singletonList(terms)); + IndexSearcher searcher = new IndexSearcher(new MultiReader()); + QueryShardException exc = + expectThrows(QueryShardException.class, () -> createAggregator(builder, searcher)); + assertWarnings("[missing] is deprecated. Please use [missing_bucket] instead."); + assertThat(exc.getMessage(), containsString("cannot use [missing] option in conjunction with [missing_bucket]")); + } + public void testWithKeyword() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll( @@ -187,6 +200,97 @@ public void testWithKeyword() throws Exception { ); } + public void testWithKeywordAndMissingBucket() throws Exception { + final List>> dataset = new ArrayList<>(); + dataset.addAll( + Arrays.asList( + createDocument("keyword", "a"), + createDocument("long", 0L), + createDocument("keyword", "c"), + createDocument("keyword", "a"), + createDocument("keyword", "d"), + createDocument("keyword", "c"), + createDocument("long", 5L) + ) + ); + + // sort ascending, null bucket is first + testSearchCase(Arrays.asList(new MatchAllDocsQuery()), dataset, + () -> { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") + .field("keyword") + .missingBucket(true); + return new CompositeAggregationBuilder("name", Collections.singletonList(terms)); + }, (result) -> { + assertEquals(4, result.getBuckets().size()); + assertEquals("{keyword=d}", result.afterKey().toString()); + assertEquals("{keyword=null}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=a}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(1).getDocCount()); + assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(2).getDocCount()); + assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(3).getDocCount()); + } + ); + + // sort descending, null bucket is last + testSearchCase(Arrays.asList(new MatchAllDocsQuery()), dataset, + () -> { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") + .field("keyword") + .missingBucket(true) + .order(SortOrder.DESC); + return new CompositeAggregationBuilder("name", Collections.singletonList(terms)); + }, (result) -> { + assertEquals(4, result.getBuckets().size()); + assertEquals("{keyword=null}", result.afterKey().toString()); + assertEquals("{keyword=null}", result.getBuckets().get(3).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(3).getDocCount()); + assertEquals("{keyword=a}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(2).getDocCount()); + assertEquals("{keyword=c}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(1).getDocCount()); + assertEquals("{keyword=d}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(0).getDocCount()); + } + ); + + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, + () -> { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") + .field("keyword") + .missingBucket(true); + return new CompositeAggregationBuilder("name", Collections.singletonList(terms)) + .aggregateAfter(Collections.singletonMap("keyword", null)); + }, (result) -> { + assertEquals(3, result.getBuckets().size()); + assertEquals("{keyword=d}", result.afterKey().toString()); + assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=c}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(1).getDocCount()); + assertEquals("{keyword=d}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(2).getDocCount()); + } + ); + + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, + () -> { + TermsValuesSourceBuilder terms = new TermsValuesSourceBuilder("keyword") + .field("keyword") + .missingBucket(true) + .order(SortOrder.DESC); + return new CompositeAggregationBuilder("name", Collections.singletonList(terms)) + .aggregateAfter(Collections.singletonMap("keyword", null)); + }, (result) -> { + assertEquals(0, result.getBuckets().size()); + assertNull(result.afterKey()); + } + ); + } + public void testWithKeywordMissingAfter() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll( @@ -518,6 +622,67 @@ public void testWithKeywordAndLongDesc() throws Exception { ); } + public void testWithKeywordLongAndMissingBucket() throws Exception { + final List>> dataset = new ArrayList<>(); + dataset.addAll( + Arrays.asList( + createDocument("keyword", "a", "long", 100L), + createDocument("double", 0d), + createDocument("keyword", "c", "long", 100L), + createDocument("keyword", "a", "long", 0L), + createDocument("keyword", "d", "long", 10L), + createDocument("keyword", "c"), + createDocument("keyword", "c", "long", 100L), + createDocument("long", 100L), + createDocument("double", 0d) + ) + ); + testSearchCase(Arrays.asList(new MatchAllDocsQuery()), dataset, + () -> new CompositeAggregationBuilder("name", + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword").missingBucket(true), + new TermsValuesSourceBuilder("long").field("long").missingBucket(true) + ) + ), + (result) -> { + assertEquals(7, result.getBuckets().size()); + assertEquals("{keyword=d, long=10}", result.afterKey().toString()); + assertEquals("{keyword=null, long=null}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=null, long=100}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + assertEquals("{keyword=a, long=0}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(2).getDocCount()); + assertEquals("{keyword=a, long=100}", result.getBuckets().get(3).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(3).getDocCount()); + assertEquals("{keyword=c, long=null}", result.getBuckets().get(4).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(4).getDocCount()); + assertEquals("{keyword=c, long=100}", result.getBuckets().get(5).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(5).getDocCount()); + assertEquals("{keyword=d, long=10}", result.getBuckets().get(6).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(6).getDocCount()); + } + ); + + testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, + () -> new CompositeAggregationBuilder("name", + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword").missingBucket(true), + new TermsValuesSourceBuilder("long").field("long").missingBucket(true) + ) + ).aggregateAfter(createAfterKey("keyword", "c", "long", null) + ), + (result) -> { + assertEquals(2, result.getBuckets().size()); + assertEquals("{keyword=d, long=10}", result.afterKey().toString()); + assertEquals("{keyword=c, long=100}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=d, long=10}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + } + ); + } + public void testMultiValuedWithKeywordAndLong() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll( diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java index a6cf15c4105d7..02a6a8b9ed65e 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java @@ -129,21 +129,24 @@ public void testRandom() throws IOException { assert(false); } } - testRandomCase(true, types); + testRandomCase(types); } private void testRandomCase(ClassAndName... types) throws IOException { - testRandomCase(true, types); - testRandomCase(false, types); + testRandomCase(true, true, types); + testRandomCase(true, false, types); + testRandomCase(false, true, types); + testRandomCase(false, false, types); } - private void testRandomCase(boolean forceMerge, ClassAndName... types) throws IOException { + private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndName... types) throws IOException { final BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE; int numDocs = randomIntBetween(50, 100); List[]> possibleValues = new ArrayList<>(); for (ClassAndName type : types) { - int numValues = randomIntBetween(1, numDocs*2); - Comparable[] values = new Comparable[numValues]; + final Comparable[] values; + int numValues = randomIntBetween(1, numDocs * 2); + values = new Comparable[numValues]; if (type.clazz == Long.class) { for (int i = 0; i < numValues; i++) { values[i] = randomLong(); @@ -157,7 +160,7 @@ private void testRandomCase(boolean forceMerge, ClassAndName... types) throws IO values[i] = new BytesRef(randomAlphaOfLengthBetween(5, 50)); } } else { - assert(false); + assert (false); } possibleValues.add(values); } @@ -171,30 +174,34 @@ private void testRandomCase(boolean forceMerge, ClassAndName... types) throws IO boolean hasAllField = true; for (int j = 0; j < types.length; j++) { int numValues = randomIntBetween(0, 5); + List> values = new ArrayList<>(); if (numValues == 0) { hasAllField = false; - } - List> values = new ArrayList<>(); - for (int k = 0; k < numValues; k++) { - values.add(possibleValues.get(j)[randomIntBetween(0, possibleValues.get(j).length-1)]); - if (types[j].clazz == Long.class) { - long value = (Long) values.get(k); - document.add(new SortedNumericDocValuesField(types[j].fieldType.name(), value)); - document.add(new LongPoint(types[j].fieldType.name(), value)); - } else if (types[j].clazz == Double.class) { - document.add(new SortedNumericDocValuesField(types[j].fieldType.name(), - NumericUtils.doubleToSortableLong((Double) values.get(k)))); - } else if (types[j].clazz == BytesRef.class) { - BytesRef value = (BytesRef) values.get(k); - document.add(new SortedSetDocValuesField(types[j].fieldType.name(), (BytesRef) values.get(k))); - document.add(new TextField(types[j].fieldType.name(), value.utf8ToString(), Field.Store.NO)); - } else { - assert(false); + if (missingBucket) { + values.add(null); + } + } else { + for (int k = 0; k < numValues; k++) { + values.add(possibleValues.get(j)[randomIntBetween(0, possibleValues.get(j).length - 1)]); + if (types[j].clazz == Long.class) { + long value = (Long) values.get(k); + document.add(new SortedNumericDocValuesField(types[j].fieldType.name(), value)); + document.add(new LongPoint(types[j].fieldType.name(), value)); + } else if (types[j].clazz == Double.class) { + document.add(new SortedNumericDocValuesField(types[j].fieldType.name(), + NumericUtils.doubleToSortableLong((Double) values.get(k)))); + } else if (types[j].clazz == BytesRef.class) { + BytesRef value = (BytesRef) values.get(k); + document.add(new SortedSetDocValuesField(types[j].fieldType.name(), (BytesRef) values.get(k))); + document.add(new TextField(types[j].fieldType.name(), value.utf8ToString(), Field.Store.NO)); + } else { + assert (false); + } } } docValues.add(values); } - if (hasAllField) { + if (hasAllField || missingBucket) { List comb = createListCombinations(docValues); keys.addAll(comb); } @@ -210,29 +217,53 @@ private void testRandomCase(boolean forceMerge, ClassAndName... types) throws IO for (int i = 0; i < types.length; i++) { final MappedFieldType fieldType = types[i].fieldType; if (types[i].clazz == Long.class) { - sources[i] = new LongValuesSource(bigArrays, fieldType, - context -> DocValues.getSortedNumeric(context.reader(), fieldType.name()), value -> value, - DocValueFormat.RAW, null, size, 1); + sources[i] = new LongValuesSource( + bigArrays, + fieldType, + context -> DocValues.getSortedNumeric(context.reader(), fieldType.name()), + value -> value, + DocValueFormat.RAW, + missingBucket, + null, + size, + 1 + ); } else if (types[i].clazz == Double.class) { sources[i] = new DoubleValuesSource( - bigArrays, fieldType, + bigArrays, + fieldType, context -> FieldData.sortableLongBitsToDoubles(DocValues.getSortedNumeric(context.reader(), fieldType.name())), - DocValueFormat.RAW, null, size, 1 + DocValueFormat.RAW, + missingBucket, + null, + size, + 1 ); } else if (types[i].clazz == BytesRef.class) { if (forceMerge) { // we don't create global ordinals but we test this mode when the reader has a single segment // since ordinals are global in this case. sources[i] = new GlobalOrdinalValuesSource( - bigArrays, fieldType, + bigArrays, + fieldType, context -> DocValues.getSortedSet(context.reader(), fieldType.name()), - DocValueFormat.RAW, null, size, 1 + DocValueFormat.RAW, + missingBucket, + null, + size, + 1 ); } else { sources[i] = new BinaryValuesSource( + bigArrays, + (b) -> {}, fieldType, context -> FieldData.toString(DocValues.getSortedSet(context.reader(), fieldType.name())), - DocValueFormat.RAW, null, size, 1 + DocValueFormat.RAW, + missingBucket, + null, + size, + 1 ); } } else { @@ -241,20 +272,13 @@ private void testRandomCase(boolean forceMerge, ClassAndName... types) throws IO } CompositeKey[] expected = keys.toArray(new CompositeKey[0]); Arrays.sort(expected, (a, b) -> compareKey(a, b)); - CompositeValuesCollectorQueue queue = new CompositeValuesCollectorQueue(sources, size); - final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()); for (boolean withProducer : new boolean[] {true, false}) { - if (withProducer && docsProducer == null) { - continue; - } int pos = 0; CompositeKey last = null; while (pos < size) { - queue.clear(); - if (last != null) { - queue.setAfter(last.values()); - } - + final CompositeValuesCollectorQueue queue = + new CompositeValuesCollectorQueue(BigArrays.NON_RECYCLING_INSTANCE, sources, size, last); + final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()); for (LeafReaderContext leafReaderContext : reader.leaves()) { final LeafBucketCollector leafCollector = new LeafBucketCollector() { @Override @@ -262,7 +286,7 @@ public void collect(int doc, long bucket) throws IOException { queue.addIfCompetitive(); } }; - if (withProducer) { + if (docsProducer != null && withProducer) { assertEquals(DocIdSet.EMPTY, docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false)); } else { @@ -310,6 +334,14 @@ private static MappedFieldType createKeyword(String name) { private static int compareKey(CompositeKey key1, CompositeKey key2) { assert key1.size() == key2.size(); for (int i = 0; i < key1.size(); i++) { + if (key1.get(i) == null) { + if (key2.get(i) == null) { + continue; + } + return -1; + } else if (key2.get(i) == null) { + return 1; + } Comparable cmp1 = (Comparable) key1.get(i); int cmp = cmp1.compareTo(key2.get(i)); if (cmp != 0) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java index fa653e5ed4195..b436b43b4621c 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSourceTests.java @@ -40,9 +40,12 @@ public void testBinarySorted() { MappedFieldType keyword = new KeywordFieldMapper.KeywordFieldType(); keyword.setName("keyword"); BinaryValuesSource source = new BinaryValuesSource( + BigArrays.NON_RECYCLING_INSTANCE, + (b) -> {}, keyword, context -> null, DocValueFormat.RAW, + false, null, 1, 1 @@ -55,9 +58,12 @@ public void testBinarySorted() { new TermQuery(new Term("keyword", "toto)")))); source = new BinaryValuesSource( + BigArrays.NON_RECYCLING_INSTANCE, + (b) -> {}, keyword, context -> null, DocValueFormat.RAW, + false, "missing_value", 1, 1 @@ -66,9 +72,26 @@ public void testBinarySorted() { assertNull(source.createSortedDocsProducerOrNull(reader, null)); source = new BinaryValuesSource( + BigArrays.NON_RECYCLING_INSTANCE, + (b) -> {}, keyword, context -> null, DocValueFormat.RAW, + true, + null, + 1, + 1 + ); + assertNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); + assertNull(source.createSortedDocsProducerOrNull(reader, null)); + + source = new BinaryValuesSource( + BigArrays.NON_RECYCLING_INSTANCE, + (b) -> {}, + keyword, + context -> null, + DocValueFormat.RAW, + false, null, 0, -1 @@ -77,7 +100,16 @@ public void testBinarySorted() { MappedFieldType ip = new IpFieldMapper.IpFieldType(); ip.setName("ip"); - source = new BinaryValuesSource(ip, context -> null, DocValueFormat.RAW,null, 1, 1); + source = new BinaryValuesSource( + BigArrays.NON_RECYCLING_INSTANCE, + (b) -> {}, + ip, + context -> null, + DocValueFormat.RAW, + false, + null, + 1, + 1); assertNull(source.createSortedDocsProducerOrNull(reader, null)); } @@ -88,6 +120,7 @@ public void testGlobalOrdinalsSorted() { BigArrays.NON_RECYCLING_INSTANCE, keyword, context -> null, DocValueFormat.RAW, + false, null, 1, 1 @@ -104,6 +137,7 @@ public void testGlobalOrdinalsSorted() { keyword, context -> null, DocValueFormat.RAW, + false, "missing_value", 1, 1 @@ -116,6 +150,20 @@ public void testGlobalOrdinalsSorted() { keyword, context -> null, DocValueFormat.RAW, + true, + null, + 1, + 1 + ); + assertNull(source.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); + assertNull(source.createSortedDocsProducerOrNull(reader, null)); + + source = new GlobalOrdinalValuesSource( + BigArrays.NON_RECYCLING_INSTANCE, + keyword, + context -> null, + DocValueFormat.RAW, + false, null, 1, -1 @@ -129,6 +177,7 @@ public void testGlobalOrdinalsSorted() { ip, context -> null, DocValueFormat.RAW, + false, null, 1, 1 @@ -152,6 +201,7 @@ public void testNumericSorted() { context -> null, value -> value, DocValueFormat.RAW, + false, null, 1, 1 @@ -169,6 +219,7 @@ public void testNumericSorted() { context -> null, value -> value, DocValueFormat.RAW, + false, 0d, 1, 1); @@ -176,12 +227,27 @@ public void testNumericSorted() { assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, null)); assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)")))); + sourceWithMissing = new LongValuesSource( + BigArrays.NON_RECYCLING_INSTANCE, + number, + context -> null, + value -> value, + DocValueFormat.RAW, + true, + null, + 1, + 1); + assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery())); + assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, null)); + assertNull(sourceWithMissing.createSortedDocsProducerOrNull(reader, new TermQuery(new Term("keyword", "toto)")))); + LongValuesSource sourceRev = new LongValuesSource( BigArrays.NON_RECYCLING_INSTANCE, number, context -> null, value -> value, DocValueFormat.RAW, + false, null, 1, -1 @@ -195,6 +261,7 @@ public void testNumericSorted() { number, context -> null, DocValueFormat.RAW, + false, null, 1, 1 diff --git a/x-pack/docs/en/security/tribe-clients-integrations/cross-cluster.asciidoc b/x-pack/docs/en/security/tribe-clients-integrations/cross-cluster.asciidoc index eceb0315b20c9..e5f43a08e7aee 100644 --- a/x-pack/docs/en/security/tribe-clients-integrations/cross-cluster.asciidoc +++ b/x-pack/docs/en/security/tribe-clients-integrations/cross-cluster.asciidoc @@ -155,5 +155,5 @@ GET two:logs-2017.04/_search <1> // TEST[skip:todo] //TBD: Is there a missing description of the <1> callout above? -:edit_url: https://github.com/elastic/kibana/edit/{branch}/x-pack/docs/en/security/cross-cluster-kibana.asciidoc -include::{xkb-repo-dir}/security/cross-cluster-kibana.asciidoc[] +:edit_url: https://github.com/elastic/kibana/edit/{branch}/docs/security/cross-cluster-kibana.asciidoc +include::{kib-repo-dir}/security/cross-cluster-kibana.asciidoc[] diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/ssl/SSLClientAuthTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/ssl/SSLClientAuthTests.java new file mode 100644 index 0000000000000..6d98062ffb124 --- /dev/null +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/ssl/SSLClientAuthTests.java @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ssl; + +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.message.BasicHeader; +import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; +import org.apache.http.ssl.SSLContexts; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.MockSecureSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.test.SecurityIntegTestCase; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.xpack.core.TestXPackTransportClient; +import org.elasticsearch.xpack.core.security.SecurityField; +import org.elasticsearch.xpack.core.ssl.SSLClientAuth; +import org.elasticsearch.xpack.security.LocalStateSecurity; + + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.TrustManagerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.security.cert.CertPathBuilderException; + +import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; + +public class SSLClientAuthTests extends SecurityIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + // invert the require auth settings + .put("xpack.ssl.client_authentication", SSLClientAuth.REQUIRED) + .put("xpack.security.http.ssl.enabled", true) + .put("xpack.security.http.ssl.client_authentication", SSLClientAuth.REQUIRED) + .put("transport.profiles.default.xpack.security.ssl.client_authentication", SSLClientAuth.NONE) + .put(NetworkModule.HTTP_ENABLED.getKey(), true) + .build(); + } + + @Override + protected boolean transportSSLEnabled() { + return true; + } + + public void testThatHttpFailsWithoutSslClientAuth() throws IOException { + SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(SSLContexts.createDefault(), NoopHostnameVerifier.INSTANCE); + try (RestClient restClient = createRestClient(httpClientBuilder -> httpClientBuilder.setSSLStrategy(sessionStrategy), "https")) { + restClient.performRequest("GET", "/"); + fail("Expected SSLHandshakeException"); + } catch (SSLHandshakeException e) { + Throwable t = ExceptionsHelper.unwrap(e, CertPathBuilderException.class); + assertThat(t, instanceOf(CertPathBuilderException.class)); + assertThat(t.getMessage(), containsString("unable to find valid certification path to requested target")); + } + } + + public void testThatHttpWorksWithSslClientAuth() throws IOException { + SSLIOSessionStrategy sessionStrategy = new SSLIOSessionStrategy(getSSLContext(), NoopHostnameVerifier.INSTANCE); + try (RestClient restClient = createRestClient(httpClientBuilder -> httpClientBuilder.setSSLStrategy(sessionStrategy), "https")) { + Response response = restClient.performRequest("GET", "/", + new BasicHeader("Authorization", basicAuthHeaderValue(transportClientUsername(), transportClientPassword()))); + assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(EntityUtils.toString(response.getEntity()), containsString("You Know, for Search")); + } + } + + public void testThatTransportWorksWithoutSslClientAuth() throws IOException { + // specify an arbitrary keystore, that does not include the certs needed to connect to the transport protocol + Path store = getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testclient-client-profile.jks"); + + if (Files.notExists(store)) { + throw new ElasticsearchException("store path doesn't exist"); + } + + MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("xpack.ssl.keystore.secure_password", "testclient-client-profile"); + Settings settings = Settings.builder() + .put("xpack.security.transport.ssl.enabled", true) + .put("xpack.ssl.client_authentication", SSLClientAuth.NONE) + .put("xpack.ssl.keystore.path", store) + .setSecureSettings(secureSettings) + .put("cluster.name", internalCluster().getClusterName()) + .put(SecurityField.USER_SETTING.getKey(), + transportClientUsername() + ":" + new String(transportClientPassword().getChars())) + .build(); + try (TransportClient client = new TestXPackTransportClient(settings, LocalStateSecurity.class)) { + Transport transport = internalCluster().getDataNodeInstance(Transport.class); + TransportAddress transportAddress = transport.boundAddress().publishAddress(); + client.addTransportAddress(transportAddress); + + assertGreenClusterState(client); + } + } + + private SSLContext getSSLContext() { + try (InputStream in = + Files.newInputStream(getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testclient.jks"))) { + KeyStore keyStore = KeyStore.getInstance("jks"); + keyStore.load(in, "testclient".toCharArray()); + TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(keyStore); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(keyStore, "testclient".toCharArray()); + SSLContext context = SSLContext.getInstance("TLSv1.2"); + context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + return context; + } catch (Exception e) { + throw new ElasticsearchException("failed to initialize a TrustManagerFactory", e); + } + } +} diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/ssl/SSLReloadIntegTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/ssl/SSLReloadIntegTests.java new file mode 100644 index 0000000000000..c6746c49446d2 --- /dev/null +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/ssl/SSLReloadIntegTests.java @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ssl; + +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x509.Extension; +import org.bouncycastle.asn1.x509.Time; +import org.bouncycastle.cert.X509CertificateHolder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.cert.jcajce.JcaX509ExtensionUtils; +import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.network.InetAddressHelper; +import org.elasticsearch.common.settings.MockSecureSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.test.SecurityIntegTestCase; +import org.elasticsearch.test.SecuritySettingsSource; +import org.elasticsearch.test.SecuritySettingsSourceField; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.xpack.core.ssl.CertUtils; +import org.elasticsearch.xpack.core.ssl.SSLService; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; + +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.SocketException; +import java.nio.file.AtomicMoveNotSupportedException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.security.KeyPair; +import java.security.KeyStore; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; +import java.util.Locale; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +/** + * Integration tests for SSL reloading support + */ +public class SSLReloadIntegTests extends SecurityIntegTestCase { + + private Path nodeStorePath; + + @Override + public Settings nodeSettings(int nodeOrdinal) { + if (nodeStorePath == null) { + Path origPath = getDataPath("/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.jks"); + Path tempDir = createTempDir(); + nodeStorePath = tempDir.resolve("testnode.jks"); + try { + Files.copy(origPath, nodeStorePath); + } catch (IOException e) { + throw new ElasticsearchException("failed to copy keystore"); + } + } + Settings settings = super.nodeSettings(nodeOrdinal); + Settings.Builder builder = Settings.builder() + .put(settings.filter((s) -> s.startsWith("xpack.ssl.") == false)); + + + SecuritySettingsSource.addSSLSettingsForStore(builder, + "/org/elasticsearch/xpack/security/transport/ssl/certs/simple/testnode.jks", "testnode"); + builder.put("resource.reload.interval.high", "1s") + .put("xpack.ssl.keystore.path", nodeStorePath); + + if (builder.get("xpack.ssl.truststore.path") != null) { + builder.put("xpack.ssl.truststore.path", nodeStorePath); + } + + return builder.build(); + } + + @Override + protected boolean transportSSLEnabled() { + return true; + } + + public void testThatSSLConfigurationReloadsOnModification() throws Exception { + KeyPair keyPair = CertUtils.generateKeyPair(randomFrom(1024, 2048)); + X509Certificate certificate = getCertificate(keyPair); + KeyStore keyStore = KeyStore.getInstance("jks"); + keyStore.load(null, null); + keyStore.setKeyEntry("key", keyPair.getPrivate(), SecuritySettingsSourceField.TEST_PASSWORD.toCharArray(), + new Certificate[] { certificate }); + Path keystorePath = createTempDir().resolve("newcert.jks"); + try (OutputStream out = Files.newOutputStream(keystorePath)) { + keyStore.store(out, SecuritySettingsSourceField.TEST_PASSWORD.toCharArray()); + } + MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("xpack.ssl.keystore.secure_password", SecuritySettingsSourceField.TEST_PASSWORD); + secureSettings.setString("xpack.ssl.truststore.secure_password", "testnode"); + Settings settings = Settings.builder() + .put("path.home", createTempDir()) + .put("xpack.ssl.keystore.path", keystorePath) + .put("xpack.ssl.truststore.path", nodeStorePath) + .setSecureSettings(secureSettings) + .build(); + String node = randomFrom(internalCluster().getNodeNames()); + SSLService sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings)); + SSLSocketFactory sslSocketFactory = sslService.sslSocketFactory(settings); + TransportAddress address = internalCluster() + .getInstance(Transport.class, node).boundAddress().publishAddress(); + try (SSLSocket socket = (SSLSocket) sslSocketFactory.createSocket(address.getAddress(), address.getPort())) { + assertThat(socket.isConnected(), is(true)); + socket.startHandshake(); + fail("handshake should not have been successful!"); + } catch (SSLHandshakeException | SocketException expected) { + logger.trace("expected exception", expected); + } + + KeyStore nodeStore = KeyStore.getInstance("jks"); + try (InputStream in = Files.newInputStream(nodeStorePath)) { + nodeStore.load(in, "testnode".toCharArray()); + } + nodeStore.setCertificateEntry("newcert", certificate); + Path path = nodeStorePath.getParent().resolve("updated.jks"); + try (OutputStream out = Files.newOutputStream(path)) { + nodeStore.store(out, "testnode".toCharArray()); + } + try { + Files.move(path, nodeStorePath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE); + } catch (AtomicMoveNotSupportedException e) { + Files.move(path, nodeStorePath, StandardCopyOption.REPLACE_EXISTING); + } + + CountDownLatch latch = new CountDownLatch(1); + assertBusy(() -> { + try (SSLSocket socket = (SSLSocket) sslSocketFactory.createSocket(address.getAddress(), address.getPort())) { + logger.info("opened socket for reloading [{}]", socket); + socket.addHandshakeCompletedListener(event -> { + try { + assertThat(event.getPeerPrincipal().getName(), containsString("Test Node")); + logger.info("ssl handshake completed on port [{}]", event.getSocket().getLocalPort()); + latch.countDown(); + } catch (Exception e) { + fail("caught exception in listener " + e.getMessage()); + } + }); + socket.startHandshake(); + + } catch (Exception e) { + fail("caught exception " + e.getMessage()); + } + }); + latch.await(); + } + + private X509Certificate getCertificate(KeyPair keyPair) throws Exception { + final DateTime notBefore = new DateTime(DateTimeZone.UTC); + final DateTime notAfter = notBefore.plusYears(1); + X500Name subject = new X500Name("CN=random cert"); + JcaX509v3CertificateBuilder builder = + new JcaX509v3CertificateBuilder(subject, CertUtils.getSerial(), + new Time(notBefore.toDate(), Locale.ROOT), new Time(notAfter.toDate(), Locale.ROOT), subject, keyPair.getPublic()); + + JcaX509ExtensionUtils extUtils = new JcaX509ExtensionUtils(); + builder.addExtension(Extension.subjectKeyIdentifier, false, extUtils.createSubjectKeyIdentifier(keyPair.getPublic())); + builder.addExtension(Extension.authorityKeyIdentifier, false, extUtils.createAuthorityKeyIdentifier(keyPair.getPublic())); + builder.addExtension(Extension.subjectAlternativeName, false, + CertUtils.getSubjectAlternativeNames(true, Sets.newHashSet(InetAddressHelper.getAllAddresses()))); + + ContentSigner signer = new JcaContentSignerBuilder("SHA256withRSA").build(keyPair.getPrivate()); + X509CertificateHolder certificateHolder = builder.build(signer); + return new JcaX509CertificateConverter().getCertificate(certificateHolder); + } +} diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/ssl/SSLTrustRestrictionsTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/ssl/SSLTrustRestrictionsTests.java new file mode 100644 index 0000000000000..b97a190b86890 --- /dev/null +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/ssl/SSLTrustRestrictionsTests.java @@ -0,0 +1,257 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ssl; + +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import javax.security.auth.x500.X500Principal; +import java.io.BufferedWriter; +import java.io.IOException; +import java.net.SocketException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyPair; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import org.bouncycastle.asn1.x509.GeneralNames; +import org.bouncycastle.openssl.jcajce.JcaPEMWriter; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.SecurityIntegTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.xpack.core.ssl.CertUtils; +import org.elasticsearch.xpack.core.ssl.SSLService; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import static org.elasticsearch.xpack.core.ssl.CertUtils.generateSignedCertificate; +import static org.hamcrest.Matchers.is; + +/** + * Integration tests for SSL trust restrictions + * + * @see RestrictedTrustManager + */ +@ESIntegTestCase.ClusterScope(numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) +@TestLogging("org.elasticsearch.xpack.ssl.RestrictedTrustManager:DEBUG") +public class SSLTrustRestrictionsTests extends SecurityIntegTestCase { + + /** + * Use a small keysize for performance, since the keys are only used in this test, but a large enough keysize + * to get past the SSL algorithm checker + */ + private static final int KEYSIZE = 1024; + + private static final int RESOURCE_RELOAD_MILLIS = 3; + private static final TimeValue MAX_WAIT_RELOAD = TimeValue.timeValueSeconds(1); + + private static Path configPath; + private static Settings nodeSSL; + + private static CertificateInfo ca; + private static CertificateInfo trustedCert; + private static CertificateInfo untrustedCert; + private static Path restrictionsPath; + + @Override + protected int maxNumberOfNodes() { + // We are trying to test the SSL configuration for which clients/nodes may join a cluster + // We prefer the cluster to only have 1 node, so that the SSL checking doesn't happen until the test methods run + // (That's not _quite_ true, because the base setup code checks the cluster using transport client, but it's the best we can do) + return 1; + } + + @BeforeClass + public static void setupCertificates() throws Exception { + configPath = createTempDir(); + + final KeyPair caPair = CertUtils.generateKeyPair(KEYSIZE); + final X509Certificate caCert = CertUtils.generateCACertificate(new X500Principal("cn=CertAuth"), caPair, 30); + ca = writeCertificates("ca", caPair.getPrivate(), caCert); + + trustedCert = generateCertificate("trusted", "node.trusted"); + untrustedCert = generateCertificate("untrusted", "someone.else"); + + nodeSSL = Settings.builder() + .put("xpack.security.transport.ssl.enabled", true) + .put("xpack.security.transport.ssl.verification_mode", "certificate") + .putList("xpack.ssl.certificate_authorities", ca.getCertPath().toString()) + .put("xpack.ssl.key", trustedCert.getKeyPath()) + .put("xpack.ssl.certificate", trustedCert.getCertPath()) + .build(); + } + + @AfterClass + public static void cleanup() { + configPath = null; + nodeSSL = null; + ca = null; + trustedCert = null; + untrustedCert = null; + } + + @Override + public Settings nodeSettings(int nodeOrdinal) { + + Settings parentSettings = super.nodeSettings(nodeOrdinal); + Settings.Builder builder = Settings.builder() + .put(parentSettings.filter((s) -> s.startsWith("xpack.ssl.") == false)) + .put(nodeSSL); + + restrictionsPath = configPath.resolve("trust_restrictions.yml"); + writeRestrictions("*.trusted"); + builder.put("xpack.ssl.trust_restrictions.path", restrictionsPath); + builder.put("resource.reload.interval.high", RESOURCE_RELOAD_MILLIS + "ms"); + + return builder.build(); + } + + private void writeRestrictions(String trustedPattern) { + try { + Files.write(restrictionsPath, Collections.singleton("trust.subject_name: \"" + trustedPattern + "\"")); + } catch (IOException e) { + throw new ElasticsearchException("failed to write restrictions", e); + } + } + + @Override + protected Settings transportClientSettings() { + Settings parentSettings = super.transportClientSettings(); + Settings.Builder builder = Settings.builder() + .put(parentSettings.filter((s) -> s.startsWith("xpack.ssl.") == false)) + .put(nodeSSL); + return builder.build(); + } + + @Override + protected boolean transportSSLEnabled() { + return true; + } + + public void testCertificateWithTrustedNameIsAccepted() throws Exception { + writeRestrictions("*.trusted"); + try { + tryConnect(trustedCert); + } catch (SSLHandshakeException | SocketException ex) { + fail("handshake should have been successful, but failed with " + ex); + } + } + + public void testCertificateWithUntrustedNameFails() throws Exception { + writeRestrictions("*.trusted"); + try { + tryConnect(untrustedCert); + fail("handshake should have failed, but was successful"); + } catch (SSLHandshakeException | SocketException ex) { + // expected + } + } + + public void testRestrictionsAreReloaded() throws Exception { + writeRestrictions("*"); + assertBusy(() -> { + try { + tryConnect(untrustedCert); + } catch (SSLHandshakeException | SocketException ex) { + fail("handshake should have been successful, but failed with " + ex); + } + }, MAX_WAIT_RELOAD.millis(), TimeUnit.MILLISECONDS); + + writeRestrictions("*.trusted"); + assertBusy(() -> { + try { + tryConnect(untrustedCert); + fail("handshake should have failed, but was successful"); + } catch (SSLHandshakeException | SocketException ex) { + // expected + } + }, MAX_WAIT_RELOAD.millis(), TimeUnit.MILLISECONDS); + } + + private void tryConnect(CertificateInfo certificate) throws Exception { + Settings settings = Settings.builder() + .put("path.home", createTempDir()) + .put("xpack.ssl.key", certificate.getKeyPath()) + .put("xpack.ssl.certificate", certificate.getCertPath()) + .putList("xpack.ssl.certificate_authorities", ca.getCertPath().toString()) + .put("xpack.ssl.verification_mode", "certificate") + .build(); + + String node = randomFrom(internalCluster().getNodeNames()); + SSLService sslService = new SSLService(settings, TestEnvironment.newEnvironment(settings)); + SSLSocketFactory sslSocketFactory = sslService.sslSocketFactory(settings); + TransportAddress address = internalCluster().getInstance(Transport.class, node).boundAddress().publishAddress(); + try (SSLSocket socket = (SSLSocket) sslSocketFactory.createSocket(address.getAddress(), address.getPort())) { + assertThat(socket.isConnected(), is(true)); + // The test simply relies on this (synchronously) connecting (or not), so we don't need a handshake handler + socket.startHandshake(); + } + } + + + private static CertificateInfo generateCertificate(String name, String san) throws Exception { + final KeyPair keyPair = CertUtils.generateKeyPair(KEYSIZE); + final X500Principal principal = new X500Principal("cn=" + name); + final GeneralNames altNames = new GeneralNames(CertUtils.createCommonName(san)); + final X509Certificate cert = generateSignedCertificate(principal, altNames, keyPair, ca.getCertificate(), ca.getKey(), 30); + return writeCertificates(name, keyPair.getPrivate(), cert); + } + + private static CertificateInfo writeCertificates(String name, PrivateKey key, X509Certificate cert) throws IOException { + final Path keyPath = writePem(key, name + ".key"); + final Path certPath = writePem(cert, name + ".crt"); + return new CertificateInfo(key, keyPath, cert, certPath); + } + + private static Path writePem(Object obj, String filename) throws IOException { + Path path = configPath.resolve(filename); + Files.deleteIfExists(path); + try (BufferedWriter out = Files.newBufferedWriter(path); + JcaPEMWriter pemWriter = new JcaPEMWriter(out)) { + pemWriter.writeObject(obj); + } + return path; + } + + private static class CertificateInfo { + private final PrivateKey key; + private final Path keyPath; + private final X509Certificate certificate; + private final Path certPath; + + private CertificateInfo(PrivateKey key, Path keyPath, X509Certificate certificate, Path certPath) { + this.key = key; + this.keyPath = keyPath; + this.certificate = certificate; + this.certPath = certPath; + } + + private PrivateKey getKey() { + return key; + } + + private Path getKeyPath() { + return keyPath; + } + + private X509Certificate getCertificate() { + return certificate; + } + + private Path getCertPath() { + return certPath; + } + } +}