Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search Pipelines] Add request-scoped state shared between processors #9405

Merged
merged 10 commits into from
Dec 5, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Admission control] Add Resource usage collector service and resource usage tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890))
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10541](https://github.com/opensearch-project/OpenSearch/pull/10541))
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Search Pipelines] Add request-scoped state shared between processors (and three new processors) ([#9405](https://github.com/opensearch-project/OpenSearch/pull/9405))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Helper for map abstractions passed to scripting processors. Throws {@link UnsupportedOperationException} for almost
* all methods. Subclasses just need to implement get and put.
*/
abstract class BasicMap implements Map<String, Object> {

/**
* No-args constructor.
*/
protected BasicMap() {}

private static final String UNSUPPORTED_OP_ERR = " Method not supported in Search pipeline script";

@Override
public boolean isEmpty() {
throw new UnsupportedOperationException("isEmpty" + UNSUPPORTED_OP_ERR);

Check warning on line 33 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/BasicMap.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/BasicMap.java#L33

Added line #L33 was not covered by tests
}

public int size() {
throw new UnsupportedOperationException("size" + UNSUPPORTED_OP_ERR);
}

public boolean containsKey(Object key) {
return get(key) != null;
}

public boolean containsValue(Object value) {
throw new UnsupportedOperationException("containsValue" + UNSUPPORTED_OP_ERR);
}

public Object remove(Object key) {
throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR);
}

public void putAll(Map<? extends String, ?> m) {
throw new UnsupportedOperationException("putAll" + UNSUPPORTED_OP_ERR);
}

public void clear() {
throw new UnsupportedOperationException("clear" + UNSUPPORTED_OP_ERR);
}

public Set<String> keySet() {
throw new UnsupportedOperationException("keySet" + UNSUPPORTED_OP_ERR);
}

public Collection<Object> values() {
throw new UnsupportedOperationException("values" + UNSUPPORTED_OP_ERR);
}

public Set<Map.Entry<String, Object>> entrySet() {
throw new UnsupportedOperationException("entrySet" + UNSUPPORTED_OP_ERR);
}

@Override
public Object getOrDefault(Object key, Object defaultValue) {
throw new UnsupportedOperationException("getOrDefault" + UNSUPPORTED_OP_ERR);
}

@Override
public void forEach(BiConsumer<? super String, ? super Object> action) {
throw new UnsupportedOperationException("forEach" + UNSUPPORTED_OP_ERR);
}

@Override
public void replaceAll(BiFunction<? super String, ? super Object, ?> function) {
throw new UnsupportedOperationException("replaceAll" + UNSUPPORTED_OP_ERR);
}

@Override
public Object putIfAbsent(String key, Object value) {
throw new UnsupportedOperationException("putIfAbsent" + UNSUPPORTED_OP_ERR);
}

@Override
public boolean remove(Object key, Object value) {
throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR);
}

@Override
public boolean replace(String key, Object oldValue, Object newValue) {
throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR);
}

@Override
public Object replace(String key, Object value) {
throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR);
}

@Override
public Object computeIfAbsent(String key, Function<? super String, ?> mappingFunction) {
throw new UnsupportedOperationException("computeIfAbsent" + UNSUPPORTED_OP_ERR);
}

@Override
public Object computeIfPresent(String key, BiFunction<? super String, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("computeIfPresent" + UNSUPPORTED_OP_ERR);
}

@Override
public Object compute(String key, BiFunction<? super String, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("compute" + UNSUPPORTED_OP_ERR);
}

@Override
public Object merge(String key, Object value, BiFunction<? super Object, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("merge" + UNSUPPORTED_OP_ERR);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.document.DocumentField;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.search.pipeline.common.helpers.SearchResponseUtil;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A simple implementation of field collapsing on search responses. Note that this is not going to work as well as
* field collapsing at the shard level, as implemented with the "collapse" parameter in a search request. Mostly
* just using this to demo the oversample / truncate_hits processors.
*/
public class CollapseResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "collapse";
static final String COLLAPSE_FIELD = "field";
private final String collapseField;

private CollapseResponseProcessor(String tag, String description, boolean ignoreFailure, String collapseField) {
super(tag, description, ignoreFailure);
this.collapseField = Objects.requireNonNull(collapseField);
}

@Override
public String getType() {
return TYPE;

Check warning on line 48 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java#L48

Added line #L48 was not covered by tests
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) {

if (response.getHits() != null) {
if (response.getHits().getCollapseField() != null) {
throw new IllegalStateException(
"Cannot collapse on " + collapseField + ". Results already collapsed on " + response.getHits().getCollapseField()

Check warning on line 57 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java#L56-L57

Added lines #L56 - L57 were not covered by tests
);
}
Map<String, SearchHit> collapsedHits = new LinkedHashMap<>();
List<Object> collapseValues = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
Object fieldValue = null;
DocumentField docField = hit.getFields().get(collapseField);
if (docField != null) {
if (docField.getValues().size() > 1) {
throw new IllegalStateException(
"Failed to collapse " + hit.getId() + ": doc has multiple values for field " + collapseField

Check warning on line 68 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java#L67-L68

Added lines #L67 - L68 were not covered by tests
);
}
fieldValue = docField.getValues().get(0);
} else if (hit.getSourceAsMap() != null) {
fieldValue = hit.getSourceAsMap().get(collapseField);
}
String fieldValueString;
if (fieldValue == null) {
fieldValueString = "__missing__";

Check warning on line 77 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java#L77

Added line #L77 was not covered by tests
} else {
fieldValueString = fieldValue.toString();
}

// Results are already sorted by sort criterion. Only keep the first hit for each field.
if (collapsedHits.containsKey(fieldValueString) == false) {
collapsedHits.put(fieldValueString, hit);
collapseValues.add(fieldValue);
}
}
SearchHit[] newHits = new SearchHit[collapsedHits.size()];
int i = 0;
for (SearchHit collapsedHit : collapsedHits.values()) {
newHits[i++] = collapsedHit;
}
SearchHits searchHits = new SearchHits(
newHits,
response.getHits().getTotalHits(),
response.getHits().getMaxScore(),
response.getHits().getSortFields(),
collapseField,
collapseValues.toArray()
);
return SearchResponseUtil.replaceHits(searchHits, response);
}
return response;

Check warning on line 103 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/CollapseResponseProcessor.java#L103

Added line #L103 was not covered by tests
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public CollapseResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String collapseField = ConfigurationUtils.readStringProperty(TYPE, tag, config, COLLAPSE_FIELD);
return new CollapseResponseProcessor(tag, description, ignoreFailure, collapseField);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchService;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.PipelineProcessingContext;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.StatefulSearchRequestProcessor;
import org.opensearch.search.pipeline.common.helpers.ContextUtils;

import java.util.Map;

import static org.opensearch.search.pipeline.common.helpers.ContextUtils.applyContextPrefix;

/**
* Multiplies the "size" parameter on the {@link SearchRequest} by the given scaling factor, storing the original value
* in the request context as "original_size".
*/
public class OversampleRequestProcessor extends AbstractProcessor implements StatefulSearchRequestProcessor {

/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "oversample";
static final String SAMPLE_FACTOR = "sample_factor";
static final String ORIGINAL_SIZE = "original_size";
private final double sampleFactor;
private final String contextPrefix;

private OversampleRequestProcessor(String tag, String description, boolean ignoreFailure, double sampleFactor, String contextPrefix) {
super(tag, description, ignoreFailure);
this.sampleFactor = sampleFactor;
this.contextPrefix = contextPrefix;
}

@Override
public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) {
if (request.source() != null) {
msfroh marked this conversation as resolved.
Show resolved Hide resolved
int originalSize = request.source().size();
msfroh marked this conversation as resolved.
Show resolved Hide resolved
if (originalSize == -1) {
originalSize = SearchService.DEFAULT_SIZE;

Check warning on line 51 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java#L51

Added line #L51 was not covered by tests
}
requestContext.setAttribute(applyContextPrefix(contextPrefix, ORIGINAL_SIZE), originalSize);
int newSize = (int) Math.ceil(originalSize * sampleFactor);
msfroh marked this conversation as resolved.
Show resolved Hide resolved
request.source().size(newSize);
}
return request;
}

@Override
public String getType() {
return TYPE;

Check warning on line 62 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java#L62

Added line #L62 was not covered by tests
}

static class Factory implements Processor.Factory<SearchRequestProcessor> {
@Override
public OversampleRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
double sampleFactor = ConfigurationUtils.readDoubleProperty(TYPE, tag, config, SAMPLE_FACTOR);
if (sampleFactor < 1.0) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, SAMPLE_FACTOR, "Value must be >= 1.0");

Check warning on line 77 in modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java#L77

Added line #L77 was not covered by tests
}
String contextPrefix = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, ContextUtils.CONTEXT_PREFIX_PARAMETER);
return new OversampleRequestProcessor(tag, description, ignoreFailure, sampleFactor, contextPrefix);
}
}
}
Loading
Loading