Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory usage in field-caps responses #88042

Merged
merged 5 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/88042.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88042
summary: Reduce memory usage in field-caps responses
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public FieldCapabilitiesIndexResponse fetch(final FieldCapabilitiesIndexRequest
);

if (canMatchShard(request, searchExecutionContext) == false) {
return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false);
return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyList(), false);
}

Set<String> fieldNames = new HashSet<>();
Expand Down Expand Up @@ -118,7 +118,7 @@ public FieldCapabilitiesIndexResponse fetch(final FieldCapabilitiesIndexRequest
}
}
}
return new FieldCapabilitiesIndexResponse(request.index(), responseMap, true);
return new FieldCapabilitiesIndexResponse(request.index(), responseMap.values(), true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,29 @@
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class FieldCapabilitiesIndexResponse extends ActionResponse implements Writeable {
private final String indexName;
private final Map<String, IndexFieldCapabilities> responseMap;
private final Collection<IndexFieldCapabilities> fields;
private final boolean canMatch;
private final transient Version originVersion;

FieldCapabilitiesIndexResponse(String indexName, Map<String, IndexFieldCapabilities> responseMap, boolean canMatch) {
FieldCapabilitiesIndexResponse(String indexName, Collection<IndexFieldCapabilities> fields, boolean canMatch) {
this.indexName = indexName;
this.responseMap = responseMap;
this.fields = fields;
this.canMatch = canMatch;
this.originVersion = Version.CURRENT;
}

FieldCapabilitiesIndexResponse(StreamInput in) throws IOException {
super(in);
this.indexName = in.readString();
this.responseMap = in.readMap(StreamInput::readString, IndexFieldCapabilities::new);
this.fields = readFields(in);
this.canMatch = in.getVersion().onOrAfter(Version.V_7_9_0) ? in.readBoolean() : true;
this.originVersion = in.getVersion();
}
Expand All @@ -51,18 +54,35 @@ public boolean canMatch() {
}

/**
* Get the field capabilities map
* Get the field capabilities
*/
public Map<String, IndexFieldCapabilities> get() {
return responseMap;
public Collection<IndexFieldCapabilities> getFields() {
return fields;
}

/**
*
* Get the field capabilities for the provided {@code field}
*/
public IndexFieldCapabilities getField(String field) {
return responseMap.get(field);
private static Collection<IndexFieldCapabilities> readFields(StreamInput in) throws IOException {
// Previously, we serialize fields as a map from field name to field-caps
final int size = in.readVInt();
if (size == 0) {
return Collections.emptyList();
}
final List<IndexFieldCapabilities> fields = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
final String fieldName = in.readString(); // the fieldName will be discarded - it's used in assertions only
final IndexFieldCapabilities fieldCaps = new IndexFieldCapabilities(in);
assert fieldName.equals(fieldCaps.getName()) : fieldName + " != " + fieldCaps.getName();
fields.add(fieldCaps);
}
return fields;
}

private static void writeFields(StreamOutput out, Collection<IndexFieldCapabilities> fields) throws IOException {
// Previously, we serialize fields as a map from field name to field-caps
out.writeVInt(fields.size());
for (IndexFieldCapabilities field : fields) {
out.writeString(field.getName());
field.writeTo(out);
}
}

Version getOriginVersion() {
Expand All @@ -72,7 +92,7 @@ Version getOriginVersion() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(indexName);
out.writeMap(responseMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
writeFields(out, fields);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
if (out.getVersion().onOrAfter(Version.V_7_9_0)) {
out.writeBoolean(canMatch);
}
Expand All @@ -83,11 +103,11 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FieldCapabilitiesIndexResponse that = (FieldCapabilitiesIndexResponse) o;
return canMatch == that.canMatch && Objects.equals(indexName, that.indexName) && Objects.equals(responseMap, that.responseMap);
return canMatch == that.canMatch && Objects.equals(indexName, that.indexName) && Objects.equals(fields, that.fields);
}

@Override
public int hashCode() {
return Objects.hash(indexName, responseMap, canMatch);
return Objects.hash(indexName, fields, canMatch);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
String indexName = RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName());
indexResponses.putIfAbsent(indexName, new FieldCapabilitiesIndexResponse(indexName, resp.get(), resp.canMatch()));
indexResponses.putIfAbsent(indexName, new FieldCapabilitiesIndexResponse(indexName, resp.getFields(), resp.canMatch()));
}
for (FieldCapabilitiesFailure failure : response.getFailures()) {
Exception ex = failure.getException();
Expand Down Expand Up @@ -265,17 +265,16 @@ private void innerMerge(
Map<String, Map<String, FieldCapabilities.Builder>> responseMapBuilder,
FieldCapabilitiesIndexResponse response
) {
for (Map.Entry<String, IndexFieldCapabilities> entry : response.get().entrySet()) {
final String field = entry.getKey();
for (IndexFieldCapabilities fieldCap : response.getFields()) {
final String fieldName = fieldCap.getName();
// best effort to detect metadata field coming from older nodes
final boolean isMetadataField = response.getOriginVersion().onOrAfter(Version.V_7_13_0)
? entry.getValue().isMetadatafield()
: metadataFieldPred.test(field);
final IndexFieldCapabilities fieldCap = entry.getValue();
Map<String, FieldCapabilities.Builder> typeMap = responseMapBuilder.computeIfAbsent(field, f -> new HashMap<>());
? fieldCap.isMetadatafield()
: metadataFieldPred.test(fieldName);
Map<String, FieldCapabilities.Builder> typeMap = responseMapBuilder.computeIfAbsent(fieldName, f -> new HashMap<>());
FieldCapabilities.Builder builder = typeMap.computeIfAbsent(
fieldCap.getType(),
key -> new FieldCapabilities.Builder(field, key)
key -> new FieldCapabilities.Builder(fieldName, key)
);
builder.add(response.getIndexName(), isMetadataField, fieldCap.isSearchable(), fieldCap.isAggregatable(), fieldCap.meta());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.fieldcaps;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponseTests.randomFieldCaps;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;

public class FieldCapabilitiesIndexResponseTests extends AbstractWireSerializingTestCase<FieldCapabilitiesIndexResponse> {
@Override
protected Writeable.Reader<FieldCapabilitiesIndexResponse> instanceReader() {
return FieldCapabilitiesIndexResponse::new;
}

static FieldCapabilitiesIndexResponse randomIndexResponse() {
return randomIndexResponse(randomAsciiLettersOfLength(10), randomBoolean());
}

static FieldCapabilitiesIndexResponse randomIndexResponse(String index, boolean canMatch) {
List<IndexFieldCapabilities> fields = new ArrayList<>();
if (canMatch) {
String[] fieldNames = generateRandomStringArray(5, 10, false, true);
assertNotNull(fieldNames);
for (String fieldName : fieldNames) {
fields.add(randomFieldCaps(fieldName));
}
}
return new FieldCapabilitiesIndexResponse(index, fields, canMatch);
}

@Override
protected FieldCapabilitiesIndexResponse createTestInstance() {
return randomIndexResponse();
}

public void testDeserializeFromBase64() throws Exception {
String base64 = "CWxvZ3MtMTAwMQMGcGVyaW9kBnBlcmlvZARsb25nAQABAQR1bml0BnNlY29uZApAdGltZXN0"
+ "YW1wCkB0aW1lc3RhbXAEZGF0ZQEBAAAHbWVzc2FnZQdtZXNzYWdlBHRleHQAAQAAAQAAAAAAAAAAAA==";
StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(base64));
FieldCapabilitiesIndexResponse resp = new FieldCapabilitiesIndexResponse(in);
assertTrue(resp.canMatch());
assertThat(resp.getIndexName(), equalTo("logs-1001"));
assertThat(
resp.getFields(),
containsInAnyOrder(
new IndexFieldCapabilities("@timestamp", "date", true, true, false, Collections.emptyMap()),
new IndexFieldCapabilities("message", "text", false, true, false, Collections.emptyMap()),
new IndexFieldCapabilities("period", "long", true, false, true, Collections.singletonMap("unit", "second"))
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
import java.util.List;
import java.util.Set;

import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponseTests.randomIndexResponse;

public class FieldCapabilitiesNodeResponseTests extends AbstractWireSerializingTestCase<FieldCapabilitiesNodeResponse> {

@Override
protected FieldCapabilitiesNodeResponse createTestInstance() {
List<FieldCapabilitiesIndexResponse> responses = new ArrayList<>();
int numResponse = randomIntBetween(0, 10);
for (int i = 0; i < numResponse; i++) {
responses.add(FieldCapabilitiesResponseTests.createRandomIndexResponse());
responses.add(randomIndexResponse());
}
int numUnmatched = randomIntBetween(0, 3);
Set<ShardId> shardIds = new HashSet<>();
Expand All @@ -46,15 +48,15 @@ protected FieldCapabilitiesNodeResponse mutateInstance(FieldCapabilitiesNodeResp
int mutation = response.getIndexResponses().isEmpty() ? 0 : randomIntBetween(0, 2);
switch (mutation) {
case 0:
newResponses.add(FieldCapabilitiesResponseTests.createRandomIndexResponse());
newResponses.add(randomIndexResponse());
break;
case 1:
int toRemove = randomInt(newResponses.size() - 1);
newResponses.remove(toRemove);
break;
case 2:
int toReplace = randomInt(newResponses.size() - 1);
newResponses.set(toReplace, FieldCapabilitiesResponseTests.createRandomIndexResponse());
newResponses.set(toReplace, randomIndexResponse());
break;
}
return new FieldCapabilitiesNodeResponse(newResponses, Collections.emptyMap(), response.getUnmatchedShardIds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import java.util.List;
import java.util.Map;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;

public class FieldCapabilitiesResponseTests extends AbstractWireSerializingTestCase<FieldCapabilitiesResponse> {

@Override
Expand All @@ -40,7 +38,7 @@ protected FieldCapabilitiesResponse createTestInstance() {
int numResponse = randomIntBetween(0, 10);

for (int i = 0; i < numResponse; i++) {
responses.add(createRandomIndexResponse());
responses.add(FieldCapabilitiesIndexResponseTests.randomIndexResponse());
}
randomResponse = new FieldCapabilitiesResponse(responses, Collections.emptyList());
return randomResponse;
Expand All @@ -51,22 +49,6 @@ protected Writeable.Reader<FieldCapabilitiesResponse> instanceReader() {
return FieldCapabilitiesResponse::new;
}

public static FieldCapabilitiesIndexResponse createRandomIndexResponse() {
return randomIndexResponse(randomAsciiLettersOfLength(10), randomBoolean());
}

public static FieldCapabilitiesIndexResponse randomIndexResponse(String index, boolean canMatch) {
Map<String, IndexFieldCapabilities> responses = new HashMap<>();

String[] fields = generateRandomStringArray(5, 10, false, true);
assertNotNull(fields);

for (String field : fields) {
responses.put(field, randomFieldCaps(field));
}
return new FieldCapabilitiesIndexResponse(index, responses, canMatch);
}

public static IndexFieldCapabilities randomFieldCaps(String fieldName) {
Map<String, String> meta;
switch (randomInt(2)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponseTests.randomIndexResponse;
import static org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponseTests.randomIndexResponse;
import static org.elasticsearch.action.fieldcaps.RequestDispatcher.GROUP_REQUESTS_VERSION;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
Expand Down