Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DerivedFieldQuery to support concurrent search. #15326

Merged
merged 8 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Support filtering on a large list encoded by bitmap ([#14774](https://github.com/opensearch-project/OpenSearch/pull/14774))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,6 @@ public void testTermsValuesSource() throws Exception {
}

public void testSimpleDerivedFieldsQuery() {
assumeFalse(
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
SearchRequest searchRequest = new SearchRequest("test-df").source(
SearchSourceBuilder.searchSource()
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
Expand All @@ -204,10 +200,6 @@ public void testSimpleDerivedFieldsQuery() {
}

public void testSimpleDerivedFieldsAgg() {
assumeFalse(
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
SearchRequest searchRequest = new SearchRequest("test-df").source(
SearchSourceBuilder.searchSource()
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,9 @@
@Override
public Query termQuery(Object value, QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.termQuery(value, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -176,10 +175,9 @@
@Override
public Query termQueryCaseInsensitive(Object value, @Nullable QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.termQueryCaseInsensitive(value, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -195,10 +193,9 @@
@Override
public Query termsQuery(List<?> values, @Nullable QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.termsQuery(values, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand Down Expand Up @@ -230,10 +227,9 @@
parser,
context
);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
return new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -251,10 +247,9 @@
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.fuzzyQuery(value, fuzziness, prefixLength, maxExpansions, transpositions, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),

Check warning on line 252 in server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java#L252

Added line #L252 was not covered by tests
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand Down Expand Up @@ -289,10 +284,9 @@
method,
context
);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),

Check warning on line 289 in server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java#L289

Added line #L289 was not covered by tests
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -316,10 +310,9 @@
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.prefixQuery(value, method, caseInsensitive, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -343,10 +336,9 @@
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.wildcardQuery(value, method, caseInsensitive, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -365,10 +357,9 @@
@Override
public Query normalizedWildcardQuery(String value, @Nullable MultiTermQuery.RewriteMethod method, QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.normalizedWildcardQuery(value, method, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),

Check warning on line 362 in server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java#L362

Added line #L362 was not covered by tests
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -394,10 +385,9 @@
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.regexpQuery(value, syntaxFlags, matchFlags, maxDeterminizedStates, method, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -416,10 +406,9 @@
@Override
public Query phraseQuery(TokenStream stream, int slop, boolean enablePositionIncrements, QueryShardContext context) throws IOException {
Query query = typeFieldMapper.mappedFieldType.phraseQuery(stream, slop, enablePositionIncrements, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -441,10 +430,9 @@
public Query multiPhraseQuery(TokenStream stream, int slop, boolean enablePositionIncrements, QueryShardContext context)
throws IOException {
Query query = typeFieldMapper.mappedFieldType.multiPhraseQuery(stream, slop, enablePositionIncrements, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),

Check warning on line 435 in server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java#L435

Added line #L435 was not covered by tests
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -465,10 +453,9 @@
@Override
public Query phrasePrefixQuery(TokenStream stream, int slop, int maxExpansions, QueryShardContext context) throws IOException {
Query query = typeFieldMapper.mappedFieldType.phrasePrefixQuery(stream, slop, maxExpansions, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -493,10 +480,9 @@
@Override
public Query distanceFeatureQuery(Object origin, String pivot, float boost, QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.distanceFeatureQuery(origin, pivot, boost, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
return new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),

Check warning on line 485 in server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/mapper/DerivedFieldType.java#L485

Added line #L485 was not covered by tests
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -507,10 +493,9 @@
@Override
public Query geoShapeQuery(Geometry shape, String fieldName, ShapeRelation relation, QueryShardContext context) {
Query query = ((GeoShapeQueryable) (typeFieldMapper.mappedFieldType)).geoShapeQuery(shape, fieldName, relation, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
return new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* DerivedFieldQuery used for querying derived fields. It contains the logic to execute an input lucene query against
* DerivedField. It also accepts DerivedFieldValueFetcher and SearchLookup as an input.
*/
public final class DerivedFieldQuery extends Query {
private final Query query;
private final DerivedFieldValueFetcher valueFetcher;
private final Supplier<DerivedFieldValueFetcher> valueFetcherSupplier;
private final SearchLookup searchLookup;
private final Analyzer indexAnalyzer;
private final boolean ignoreMalformed;
Expand All @@ -46,20 +47,19 @@ public final class DerivedFieldQuery extends Query {

/**
* @param query lucene query to be executed against the derived field
* @param valueFetcher DerivedFieldValueFetcher ValueFetcher to fetch the value of a derived field from _source
* using LeafSearchLookup
* @param valueFetcherSupplier Supplier of a DerivedFieldValueFetcher that will be reconstructed per leaf
* @param searchLookup SearchLookup to get the LeafSearchLookup look used by valueFetcher to fetch the _source
*/
public DerivedFieldQuery(
Query query,
DerivedFieldValueFetcher valueFetcher,
Supplier<DerivedFieldValueFetcher> valueFetcherSupplier,
SearchLookup searchLookup,
Analyzer indexAnalyzer,
Function<Object, IndexableField> indexableFieldGenerator,
boolean ignoreMalformed
) {
this.query = query;
this.valueFetcher = valueFetcher;
this.valueFetcherSupplier = valueFetcherSupplier;
this.searchLookup = searchLookup;
this.indexAnalyzer = indexAnalyzer;
this.indexableFieldGenerator = indexableFieldGenerator;
Expand All @@ -77,7 +77,15 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException {
if (rewritten == query) {
return this;
}
return new DerivedFieldQuery(rewritten, valueFetcher, searchLookup, indexAnalyzer, indexableFieldGenerator, ignoreMalformed);
;
return new DerivedFieldQuery(
rewritten,
valueFetcherSupplier,
searchLookup,
indexAnalyzer,
indexableFieldGenerator,
ignoreMalformed
);
}

@Override
Expand All @@ -88,6 +96,11 @@ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float bo
public Scorer scorer(LeafReaderContext context) {
DocIdSetIterator approximation;
approximation = DocIdSetIterator.all(context.reader().maxDoc());

// Create a new ValueFetcher per thread.
// ValueFetcher.setNextReader creates a DerivedFieldScript and internally SourceLookup and these objects are not
// thread safe.
final DerivedFieldValueFetcher valueFetcher = valueFetcherSupplier.get();
valueFetcher.setNextReader(context);
LeafSearchLookup leafSearchLookup = searchLookup.getLeafSearchLookup(context);
TwoPhaseIterator twoPhase = new TwoPhaseIterator(approximation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
Expand All @@ -24,9 +25,12 @@
import org.opensearch.common.lucene.Lucene;
import org.opensearch.core.index.Index;
import org.opensearch.geometry.Rectangle;
import org.opensearch.index.query.MatchPhrasePrefixQueryBuilder;
import org.opensearch.index.query.MultiMatchQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.search.QueryStringQueryParser;
import org.opensearch.script.DerivedFieldScript;

import java.io.IOException;
Expand Down Expand Up @@ -435,7 +439,7 @@ public void execute() {
}
}

public void testObjectDerivedFields() throws IOException {
public void testObjectDerivedFields() throws IOException, ParseException {
MapperService mapperService = createMapperService(topMapping(b -> {
b.startObject("properties");
{
Expand Down Expand Up @@ -545,6 +549,17 @@ public void execute() {
topDocs = searcher.search(query, 10);
assertEquals(0, topDocs.totalHits.value);

query = new MatchPhrasePrefixQueryBuilder("object_field.text_field", "document number").toQuery(queryShardContext);
topDocs = searcher.search(query, 10);
assertEquals(0, topDocs.totalHits.value);

// Multi Phrase Query
query = QueryBuilders.multiMatchQuery("GET", "object_field.nested_field.sub_field_1", "object_field.keyword_field")
.type(MultiMatchQueryBuilder.Type.PHRASE)
.toQuery(queryShardContext);
topDocs = searcher.search(query, 10);
assertEquals(7, topDocs.totalHits.value);

// Range queries of types - date, long and double
query = QueryBuilders.rangeQuery("object_field.date_field").from("2024-03-20T14:20:50").toQuery(queryShardContext);
topDocs = searcher.search(query, 10);
Expand All @@ -567,6 +582,11 @@ public void execute() {
topDocs = searcher.search(query, 10);
assertEquals(7, topDocs.totalHits.value);

QueryStringQueryParser queryParser = new QueryStringQueryParser(queryShardContext, "object_field.keyword_field");
queryParser.parse("GE?");
topDocs = searcher.search(query, 10);
assertEquals(7, topDocs.totalHits.value);

// Regexp Query
query = QueryBuilders.regexpQuery("object_field.keyword_field", ".*let.*").toQuery(queryShardContext);
topDocs = searcher.search(query, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.queries.spans.SpanMultiTermQueryWrapper;
import org.apache.lucene.util.BytesRef;
import org.opensearch.OpenSearchException;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -59,6 +60,7 @@ public void testBooleanType() {
assertTrue(dft.getFieldMapper() instanceof BooleanFieldMapper);
assertTrue(dft.getIndexableFieldGenerator().apply(true) instanceof Field);
assertTrue(dft.getIndexableFieldGenerator().apply(false) instanceof Field);
assertEquals("derived", dft.typeName());
}

public void testDateType() {
Expand Down Expand Up @@ -159,6 +161,22 @@ public void testGetAggregationScript_ip() throws IOException {
assertEquals(new BytesRef(InetAddressPoint.encode(InetAddresses.forString((String) expected.get(0)))), result.get(0));
}

public void testDerivedFieldValueFetcherDoesNotSupportCustomFormats() {
DerivedFieldType dft = createDerivedFieldType("boolean");
expectThrows(
IllegalArgumentException.class,
() -> dft.valueFetcher(mock(QueryShardContext.class), mock(SearchLookup.class), "yyyy-MM-dd")
);
}

public void testSpanPrefixQueryNotSupported() {
DerivedFieldType dft = createDerivedFieldType("boolean");
expectThrows(
IllegalArgumentException.class,
() -> dft.spanPrefixQuery("value", mock(SpanMultiTermQueryWrapper.SpanRewriteMethod.class), mock(QueryShardContext.class))
);
}

private static LeafSearchLookup mockValueFetcherForAggs(QueryShardContext mockContext, DerivedFieldType dft, List<Object> expected) {
SearchLookup searchLookup = mock(SearchLookup.class);
LeafSearchLookup leafLookup = mock(LeafSearchLookup.class);
Expand Down
Loading
Loading