diff --git a/CHANGELOG.md b/CHANGELOG.md index 046693421d07b..948189c255c40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814)) - Added cluster setting cluster.restrict.index.replication_type to restrict setting of index setting replication type ([#10866](https://github.com/opensearch-project/OpenSearch/pull/10866)) - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) +- Enable Fuzzy codec for doc id fields using a bloom filter ([#11022](https://github.com/opensearch-project/OpenSearch/pull/11022)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298)) @@ -141,4 +142,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD -[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x \ No newline at end of file +[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.12...2.x diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterConstructionBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterConstructionBenchmark.java new file mode 100644 index 0000000000000..4e995f5a5067c --- /dev/null +++ b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterConstructionBenchmark.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.benchmark.index.codec.fuzzy; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.UUIDs; +import org.opensearch.index.codec.fuzzy.FuzzySet; +import org.opensearch.index.codec.fuzzy.FuzzySetFactory; +import org.opensearch.index.codec.fuzzy.FuzzySetParameters; +import org.opensearch.index.mapper.IdFieldMapper; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Fork(3) +@Warmup(iterations = 2) +@Measurement(iterations = 5, time = 60, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FilterConstructionBenchmark { + + private List items; + + @Param({ "1000000", "10000000", "50000000" }) + private int numIds; + + @Param({ "0.0511", "0.1023", "0.2047" }) + private double fpp; + + private FuzzySetFactory fuzzySetFactory; + private String fieldName; + + @Setup + public void setupIds() { + this.fieldName = IdFieldMapper.NAME; + this.items = IntStream.range(0, numIds).mapToObj(i -> new BytesRef(UUIDs.base64UUID())).collect(Collectors.toList()); + FuzzySetParameters parameters = new FuzzySetParameters(() -> fpp); + this.fuzzySetFactory = new FuzzySetFactory(Map.of(fieldName, parameters)); + } + + @Benchmark + public FuzzySet buildFilter() throws IOException { + return fuzzySetFactory.createFuzzySet(items.size(), fieldName, () -> items.iterator()); + } +} diff --git a/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterLookupBenchmark.java b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterLookupBenchmark.java new file mode 100644 index 0000000000000..383539219830e --- /dev/null +++ b/benchmarks/src/main/java/org/opensearch/benchmark/index/codec/fuzzy/FilterLookupBenchmark.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.benchmark.index.codec.fuzzy; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.UUIDs; +import org.opensearch.index.codec.fuzzy.FuzzySet; +import org.opensearch.index.codec.fuzzy.FuzzySetFactory; +import org.opensearch.index.codec.fuzzy.FuzzySetParameters; +import org.opensearch.index.mapper.IdFieldMapper; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@Fork(3) +@Warmup(iterations = 2) +@Measurement(iterations = 5, time = 60, timeUnit = TimeUnit.SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +public class FilterLookupBenchmark { + + @Param({ "50000000", "1000000" }) + private int numItems; + + @Param({ "1000000" }) + private int searchKeyCount; + + @Param({ "0.0511", "0.1023", "0.2047" }) + private double fpp; + + private FuzzySet fuzzySet; + private List items; + private Random random = new Random(); + + @Setup + public void setupFilter() throws IOException { + String fieldName = IdFieldMapper.NAME; + items = IntStream.range(0, numItems).mapToObj(i -> new BytesRef(UUIDs.base64UUID())).collect(Collectors.toList()); + FuzzySetParameters parameters = new FuzzySetParameters(() -> fpp); + fuzzySet = new FuzzySetFactory(Map.of(fieldName, parameters)).createFuzzySet(numItems, fieldName, () -> items.iterator()); + } + + @Benchmark + public void contains_withExistingKeys(Blackhole blackhole) throws IOException { + for (int i = 0; i < searchKeyCount; i++) { + blackhole.consume(fuzzySet.contains(items.get(random.nextInt(items.size()))) == FuzzySet.Result.MAYBE); + } + } + + @Benchmark + public void contains_withRandomKeys(Blackhole blackhole) throws IOException { + for (int i = 0; i < searchKeyCount; i++) { + blackhole.consume(fuzzySet.contains(new BytesRef(UUIDs.base64UUID()))); + } + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 62e8faf33e1fa..ff04a8bc9eca1 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -228,6 +228,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING, IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING, + IndexSettings.INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING, + IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, + // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 00e765d73f77f..d0834ab079609 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -66,6 +66,7 @@ import static org.opensearch.Version.V_2_7_0; import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY; +import static org.opensearch.index.codec.fuzzy.FuzzySetParameters.DEFAULT_FALSE_POSITIVE_PROBABILITY; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING; @@ -658,6 +659,22 @@ public static IndexMergePolicy fromString(String text) { Property.Dynamic ); + public static final Setting INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING = Setting.boolSetting( + "index.doc_id_fuzzy_set.enabled", + false, + Property.IndexScope, + Property.Dynamic + ); + + public static final Setting INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING = Setting.doubleSetting( + "index.doc_id_fuzzy_set.false_positive_probability", + DEFAULT_FALSE_POSITIVE_PROBABILITY, + 0.01, + 0.50, + Property.IndexScope, + Property.Dynamic + ); + public static final TimeValue DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL = new TimeValue(650, TimeUnit.MILLISECONDS); public static final TimeValue MINIMUM_REMOTE_TRANSLOG_BUFFER_INTERVAL = TimeValue.ZERO; public static final Setting INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting( @@ -787,6 +804,17 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { */ private volatile UnaryOperator mergeOnFlushPolicy; + /** + * Is fuzzy set enabled for doc id + */ + private volatile boolean enableFuzzySetForDocId; + + /** + * False positive probability to use while creating fuzzy set. + */ + private volatile double docIdFuzzySetFalsePositiveProbability; + + /** * Returns the default search fields for this index. */ @@ -926,6 +954,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti * Now this sortField (IndexSort) is stored in SegmentInfo and we need to maintain backward compatibility for them. */ widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0); + + enableFuzzySetForDocId = scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING); + docIdFuzzySetFalsePositiveProbability = scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING); + scopedSettings.addSettingsUpdateConsumer( TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING, tieredMergePolicyProvider::setNoCFSRatio @@ -1032,6 +1064,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this::setRemoteTranslogUploadBufferInterval ); scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen); + scopedSettings.addSettingsUpdateConsumer(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING, this::setEnableFuzzySetForDocId); + scopedSettings.addSettingsUpdateConsumer(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, this::setDocIdFuzzySetFalsePositiveProbability); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -1801,4 +1835,20 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) { public boolean shouldWidenIndexSortType() { return this.widenIndexSortType; } + + public boolean isEnableFuzzySetForDocId() { + return enableFuzzySetForDocId; + } + + public void setEnableFuzzySetForDocId(boolean enableFuzzySetForDocId) { + this.enableFuzzySetForDocId = enableFuzzySetForDocId; + } + + public double getDocIdFuzzySetFalsePositiveProbability() { + return docIdFuzzySetFalsePositiveProbability; + } + + public void setDocIdFuzzySetFalsePositiveProbability(double docIdFuzzySetFalsePositiveProbability) { + this.docIdFuzzySetFalsePositiveProbability = docIdFuzzySetFalsePositiveProbability; + } } diff --git a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java index d3207557273a5..6ab479a2cf16b 100644 --- a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java @@ -39,10 +39,16 @@ import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat; import org.apache.lucene.codecs.lucene95.Lucene95Codec; import org.opensearch.common.lucene.Lucene; +import org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat; +import org.opensearch.index.codec.fuzzy.FuzzySetFactory; +import org.opensearch.index.codec.fuzzy.FuzzySetParameters; import org.opensearch.index.mapper.CompletionFieldMapper; +import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.MappedFieldType; import org.opensearch.index.mapper.MapperService; +import java.util.Map; + /** * {@link PerFieldMappingPostingFormatCodec This postings format} is the default * {@link PostingsFormat} for OpenSearch. It utilizes the @@ -57,6 +63,8 @@ public class PerFieldMappingPostingFormatCodec extends Lucene95Codec { private final Logger logger; private final MapperService mapperService; private final DocValuesFormat dvFormat = new Lucene90DocValuesFormat(); + private FuzzySetFactory fuzzySetFactory; + private FuzzyFilterPostingsFormat docIdPostingsFormat; static { assert Codec.forName(Lucene.LATEST_CODEC).getClass().isAssignableFrom(PerFieldMappingPostingFormatCodec.class) @@ -67,6 +75,8 @@ public PerFieldMappingPostingFormatCodec(Mode compressionMode, MapperService map super(compressionMode); this.mapperService = mapperService; this.logger = logger; + fuzzySetFactory = new FuzzySetFactory(Map.of(IdFieldMapper.NAME, + new FuzzySetParameters(() -> mapperService.getIndexSettings().getDocIdFuzzySetFalsePositiveProbability()))); } @Override @@ -76,6 +86,11 @@ public PostingsFormat getPostingsFormatForField(String field) { logger.warn("no index mapper found for field: [{}] returning default postings format", field); } else if (fieldType instanceof CompletionFieldMapper.CompletionFieldType) { return CompletionFieldMapper.CompletionFieldType.postingsFormat(); + } else if (IdFieldMapper.NAME.equals(field) && mapperService.getIndexSettings().isEnableFuzzySetForDocId()) { + if (docIdPostingsFormat == null) { + docIdPostingsFormat = new FuzzyFilterPostingsFormat(super.getPostingsFormatForField(field), fuzzySetFactory); + } + return docIdPostingsFormat; } return super.getPostingsFormatForField(field); } diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/AbstractFuzzySet.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/AbstractFuzzySet.java new file mode 100644 index 0000000000000..5133576eb177b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/AbstractFuzzySet.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedSupplier; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Encapsulates common behaviour implementation for a fuzzy set. + */ +public abstract class AbstractFuzzySet implements FuzzySet { + + /** + * Add an item to this fuzzy set. + * @param value The value to be added + */ + protected abstract void add(BytesRef value); + + /** + * Add all items to the underlying set. + * Implementations can choose to perform this using an optimized strategy based on the type of set. + * @param valuesIteratorProvider Supplier for an iterator over All values which should be added to the set. + */ + protected void addAll(CheckedSupplier, IOException> valuesIteratorProvider) throws IOException { + Iterator values = valuesIteratorProvider.get(); + while (values.hasNext()) { + add(values.next()); + } + } + + protected long generateKey(BytesRef value) { + return MurmurHash64.INSTANCE.hash(value); + } + + protected void assertAllElementsExist(CheckedSupplier, IOException> iteratorProvider) throws IOException { + Iterator iter = iteratorProvider.get(); + int cnt = 0; + while (iter.hasNext()) { + BytesRef item = iter.next(); + assert contains(item) == Result.MAYBE : "Expected Filter to return positive response for elements added to it. Elements matched: " + cnt; + cnt ++; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java new file mode 100644 index 0000000000000..d2b74c7cb4a09 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/BloomFilter.java @@ -0,0 +1,146 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.Assertions; + +import java.io.IOException; +import java.util.Iterator; + +/** + * The code is based on Lucene's implementation of Bloom Filter. + * It represents a subset of the Lucene implementation needed for OpenSearch use cases. + * Since the Lucene implementation is marked experimental, + * this aims to ensure we can provide a bwc implementation during upgrades. + */ +public class BloomFilter extends AbstractFuzzySet { + + private static final Logger logger = LogManager.getLogger(BloomFilter.class); + + // The sizes of BitSet used are all numbers that, when expressed in binary form, + // are all ones. This is to enable fast downsizing from one bitset to another + // by simply ANDing each set index in one bitset with the size of the target bitset + // - this provides a fast modulo of the number. Values previously accumulated in + // a large bitset and then mapped to a smaller set can be looked up using a single + // AND operation of the query term's hash rather than needing to perform a 2-step + // translation of the query term that mirrors the stored content's reprojections. + static final int[] usableBitSetSizes; + + static { + usableBitSetSizes = new int[26]; + for (int i = 0; i < usableBitSetSizes.length; i++) { + usableBitSetSizes[i] = (1 << (i + 6)) - 1; + } + } + + private final LongArrayBackedBitSet bitset; + private final int setSize; + private final int hashCount; + + BloomFilter(long maxDocs, double maxFpp, CheckedSupplier, IOException> fieldIteratorProvider) throws IOException { + int setSize = + (int) + Math.ceil( + (maxDocs * Math.log(maxFpp)) + / Math.log(1 / Math.pow(2, Math.log(2)))); + setSize = getNearestSetSize(2 * setSize); + int optimalK = (int) Math.round(((double) setSize / maxDocs) * Math.log(2)); + this.bitset = new LongArrayBackedBitSet(setSize + 1); + this.setSize = setSize; + this.hashCount = optimalK; + addAll(fieldIteratorProvider); + if (Assertions.ENABLED) { + assertAllElementsExist(fieldIteratorProvider); + } + logger.trace("Bloom filter created with fpp: {}, setSize: {}, hashCount: {}", maxFpp, setSize, hashCount); + } + + BloomFilter(IndexInput in) throws IOException { + hashCount = in.readInt(); + setSize = in.readInt(); + this.bitset = new LongArrayBackedBitSet(in); + } + + @Override + public void writeTo(DataOutput out) throws IOException { + out.writeInt(hashCount); + out.writeInt(setSize); + bitset.writeTo(out); + } + + static int getNearestSetSize(int maxNumberOfBits) { + int result = usableBitSetSizes[0]; + for (int i = 0; i < usableBitSetSizes.length; i++) { + if (usableBitSetSizes[i] <= maxNumberOfBits) { + result = usableBitSetSizes[i]; + } + } + return result; + } + + @Override + public SetType setType() { + return SetType.BLOOM_FILTER_V1; + } + + @Override + public Result contains(BytesRef value) { + long hash = generateKey(value); + int msb = (int) (hash >>> Integer.SIZE); + int lsb = (int) hash; + for (int i = 0; i < hashCount; i++) { + int bloomPos = (lsb + i * msb); + if (!mayContainValue(bloomPos)) { + return Result.NO; + } + } + return Result.MAYBE; + } + + protected void add(BytesRef value) { + long hash = generateKey(value); + int msb = (int) (hash >>> Integer.SIZE); + int lsb = (int) hash; + for (int i = 0; i < hashCount; i++) { + // Bitmasking using bloomSize is effectively a modulo operation since set sizes are always power of 2 + int bloomPos = (lsb + i * msb) & setSize; + bitset.set(bloomPos); + } + } + + @Override + public boolean isSaturated() { + long numBitsSet = bitset.cardinality(); + return (float) numBitsSet / (float) setSize > 0.9f; + } + + @Override + public long ramBytesUsed() { + return RamUsageEstimator.sizeOf(bitset.ramBytesUsed()); + } + + private boolean mayContainValue(int aHash) { + // Bloom sizes are always base 2 and so can be ANDed for a fast modulo + int pos = aHash & setSize; + return bitset.isSet(pos); + } + + @Override + public void close() throws IOException { + IOUtils.close(bitset); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormat.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormat.java new file mode 100644 index 0000000000000..576a49432a249 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormat.java @@ -0,0 +1,467 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.codecs.FieldsConsumer; +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.codecs.NormsProducer; +import org.apache.lucene.codecs.PostingsFormat; +import org.apache.lucene.index.BaseTermsEnum; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.Fields; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.automaton.CompiledAutomaton; +import org.opensearch.common.util.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Based on Lucene's BloomFilterPostingsFormat. + */ +public final class FuzzyFilterPostingsFormat extends PostingsFormat { + + private static final Logger logger = LogManager.getLogger(FuzzyFilterPostingsFormat.class); + + public static final String FUZZY_SET_CODEC_NAME = "FuzzySetCodec"; + public static final int VERSION_START = 0; + public static final int VERSION_CURRENT = VERSION_START; + + /** Extension of Fuzzy Filters file */ + public static final String FUZZY_FILTER_FILE_EXTENSION = "fzd"; + + private final PostingsFormat delegatePostingsFormat; + private final FuzzySetFactory fuzzySetFactory; + + public FuzzyFilterPostingsFormat( + PostingsFormat delegatePostingsFormat, FuzzySetFactory fuzzySetFactory) { + super(FUZZY_SET_CODEC_NAME); + this.delegatePostingsFormat = delegatePostingsFormat; + this.fuzzySetFactory = fuzzySetFactory; + } + + // Needed for SPI + public FuzzyFilterPostingsFormat() { + this(null, null); + } + + @Override + public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException { + if (delegatePostingsFormat == null) { + throw new UnsupportedOperationException( + "Error - " + + getClass().getName() + + " has been constructed without a choice of PostingsFormat"); + } + FieldsConsumer fieldsConsumer = delegatePostingsFormat.fieldsConsumer(state); + return new FuzzySetFieldsConsumer(fieldsConsumer, state); + } + + @Override + public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException { + return new FuzzySetFieldsProducer(state); + } + + static class FuzzySetFieldsProducer extends FieldsProducer { + private FieldsProducer delegateFieldsProducer; + HashMap fuzzySetsByFieldName = new HashMap<>(); + private List closeables = new ArrayList<>(); + + public FuzzySetFieldsProducer(SegmentReadState state) throws IOException { + String fuzzySetFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, FUZZY_FILTER_FILE_EXTENSION); + IndexInput filterIn = null; + boolean success = false; + try { + filterIn = state.directory.openInput(fuzzySetFileName, IOContext.READONCE); + CodecUtil.checkIndexHeader( + filterIn, + FUZZY_SET_CODEC_NAME, + VERSION_START, + VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + // // Load the hash function used in the Fuzzy filter + // hashFunction = HashFunction.forName(filterIn.readString()); + // Load the delegate postings format + PostingsFormat delegatePostingsFormat = PostingsFormat.forName(filterIn.readString()); + this.delegateFieldsProducer = delegatePostingsFormat.fieldsProducer(state); + int numFilters = filterIn.readInt(); + for (int i = 0; i < numFilters; i++) { + int fieldNum = filterIn.readInt(); + FuzzySet set = FuzzySetFactory.deserializeFuzzySet(filterIn); + closeables.add(set); + FieldInfo fieldInfo = state.fieldInfos.fieldInfo(fieldNum); + fuzzySetsByFieldName.put(fieldInfo.name, set); + } + CodecUtil.retrieveChecksum(filterIn); + CodecUtil.checksumEntireFile(filterIn); + success = true; + closeables.add(filterIn); + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(filterIn, delegateFieldsProducer); + } + } + } + + @Override + public Iterator iterator() { + return delegateFieldsProducer.iterator(); + } + + @Override + public void close() throws IOException { + IOUtils.closeWhileHandlingException(closeables); + delegateFieldsProducer.close(); + } + + @Override + public Terms terms(String field) throws IOException { + FuzzySet filter = fuzzySetsByFieldName.get(field); + if (filter == null) { + return delegateFieldsProducer.terms(field); + } else { + Terms result = delegateFieldsProducer.terms(field); + if (result == null) { + return null; + } + return new FuzzySetFieldsProducer.FuzzyFilterFrontedTerms(result, filter); + } + } + + @Override + public int size() { + return delegateFieldsProducer.size(); + } + + static class FuzzyFilterFrontedTerms extends Terms { + private Terms delegateTerms; + private FuzzySet filter; + + public FuzzyFilterFrontedTerms(Terms terms, FuzzySet filter) { + this.delegateTerms = terms; + this.filter = filter; + } + + @Override + public TermsEnum intersect(CompiledAutomaton compiled, final BytesRef startTerm) + throws IOException { + return delegateTerms.intersect(compiled, startTerm); + } + + @Override + public TermsEnum iterator() throws IOException { + return new FilterAppliedTermsEnum(delegateTerms, filter); + } + + @Override + public long size() throws IOException { + return delegateTerms.size(); + } + + @Override + public long getSumTotalTermFreq() throws IOException { + return delegateTerms.getSumTotalTermFreq(); + } + + @Override + public long getSumDocFreq() throws IOException { + return delegateTerms.getSumDocFreq(); + } + + @Override + public int getDocCount() throws IOException { + return delegateTerms.getDocCount(); + } + + @Override + public boolean hasFreqs() { + return delegateTerms.hasFreqs(); + } + + @Override + public boolean hasOffsets() { + return delegateTerms.hasOffsets(); + } + + @Override + public boolean hasPositions() { + return delegateTerms.hasPositions(); + } + + @Override + public boolean hasPayloads() { + return delegateTerms.hasPayloads(); + } + + @Override + public BytesRef getMin() throws IOException { + return delegateTerms.getMin(); + } + + @Override + public BytesRef getMax() throws IOException { + return delegateTerms.getMax(); + } + } + + static final class FilterAppliedTermsEnum extends BaseTermsEnum { + private Terms delegateTerms; + private TermsEnum delegateTermsEnum; + private final FuzzySet filter; + + public FilterAppliedTermsEnum(Terms delegateTerms, FuzzySet filter) throws IOException { + this.delegateTerms = delegateTerms; + this.filter = filter; + } + + void reset(Terms delegateTerms) throws IOException { + this.delegateTerms = delegateTerms; + this.delegateTermsEnum = null; + } + + private TermsEnum delegate() throws IOException { + if (delegateTermsEnum == null) { + /* pull the iterator only if we really need it - + * this can be a relativly heavy operation depending on the + * delegate postings format and they underlying directory + * (clone IndexInput) */ + delegateTermsEnum = delegateTerms.iterator(); + } + return delegateTermsEnum; + } + + @Override + public BytesRef next() throws IOException { + return delegate().next(); + } + + @Override + public boolean seekExact(BytesRef text) throws IOException { + // The magical fail-fast speed up that is the entire point of all of + // this code - save a disk seek if there is a match on an in-memory + // structure + // that may occasionally give a false positive but guaranteed no false + // negatives + if (filter.contains(text) == FuzzySet.Result.NO) { + return false; + } + return delegate().seekExact(text); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + return delegate().seekCeil(text); + } + + @Override + public void seekExact(long ord) throws IOException { + delegate().seekExact(ord); + } + + @Override + public BytesRef term() throws IOException { + return delegate().term(); + } + + @Override + public long ord() throws IOException { + return delegate().ord(); + } + + @Override + public int docFreq() throws IOException { + return delegate().docFreq(); + } + + @Override + public long totalTermFreq() throws IOException { + return delegate().totalTermFreq(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + return delegate().postings(reuse, flags); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + return delegate().impacts(flags); + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(filter=" + filter.toString() + ")"; + } + } + + @Override + public void checkIntegrity() throws IOException { + delegateFieldsProducer.checkIntegrity(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + + "(fields=" + + fuzzySetsByFieldName.size() + + ",delegate=" + + delegateFieldsProducer + + ")"; + } + } + + class FuzzySetFieldsConsumer extends FieldsConsumer { + private FieldsConsumer delegateFieldsConsumer; + private Map fuzzySets = new HashMap<>(); + private SegmentWriteState state; + private List closeables = new ArrayList<>(); + + public FuzzySetFieldsConsumer(FieldsConsumer fieldsConsumer, SegmentWriteState state) { + this.delegateFieldsConsumer = fieldsConsumer; + this.state = state; + } + + @Override + public void write(Fields fields, NormsProducer norms) throws IOException { + + // Delegate must write first: it may have opened files + // on creating the class + // (e.g. Lucene41PostingsConsumer), and write() will + // close them; alternatively, if we delayed pulling + // the fields consumer until here, we could do it + // afterwards: + delegateFieldsConsumer.write(fields, norms); + + for (String field : fields) { + Terms terms = fields.terms(field); + if (terms == null) { + continue; + } + FieldInfo fieldInfo = state.fieldInfos.fieldInfo(field); + FuzzySet fuzzySet = fuzzySetFactory.createFuzzySet(state.segmentInfo.maxDoc(), fieldInfo.name, () -> iterator(terms)); + if (fuzzySet == null) { + break; + } + assert fuzzySets.containsKey(fieldInfo) == false; + closeables.add(fuzzySet); + fuzzySets.put(fieldInfo, fuzzySet); + } + } + + private Iterator iterator(Terms terms) throws IOException { + TermsEnum termIterator = terms.iterator(); + return new Iterator<>() { + + private BytesRef currentTerm; + private PostingsEnum postingsEnum; + @Override + public boolean hasNext() { + try { + do { + currentTerm = termIterator.next(); + if (currentTerm == null) { + return false; + } + postingsEnum = termIterator.postings(postingsEnum, 0); + if (postingsEnum.nextDoc() != PostingsEnum.NO_MORE_DOCS) { + return true; + } + } while (true); + } catch (IOException ex) { + throw new IllegalStateException("Cannot read terms: " + termIterator.attributes()); + } + } + + @Override + public BytesRef next() { + return currentTerm; + } + }; + } + + private boolean closed; + @Override + public void close() throws IOException { + if (closed) { + return; + } + closed = true; + delegateFieldsConsumer.close(); + + // Now we are done accumulating values for these fields + List> nonSaturatedSets = new ArrayList<>(); + + for (Map.Entry entry : fuzzySets.entrySet()) { + FuzzySet fuzzySet = entry.getValue(); + if (!fuzzySet.isSaturated()) { + nonSaturatedSets.add(entry); + } + } + String fuzzyFilterFileName = + IndexFileNames.segmentFileName( + state.segmentInfo.name, state.segmentSuffix, FUZZY_FILTER_FILE_EXTENSION); + try (IndexOutput fuzzyFilterFileOutput = state.directory.createOutput(fuzzyFilterFileName, state.context)) { + logger.trace("Writing fuzzy filter postings with version: {} for segment: {}", VERSION_CURRENT, state.segmentInfo.toString()); + CodecUtil.writeIndexHeader( + fuzzyFilterFileOutput, + FUZZY_SET_CODEC_NAME, + VERSION_CURRENT, + state.segmentInfo.getId(), + state.segmentSuffix); + // remember the name of the postings format we will delegate to + fuzzyFilterFileOutput.writeString(delegatePostingsFormat.getName()); + + // First field in the output file is the number of fields+sets saved + fuzzyFilterFileOutput.writeInt(nonSaturatedSets.size()); + for (Map.Entry entry : nonSaturatedSets) { + FieldInfo fieldInfo = entry.getKey(); + FuzzySet fuzzySet = entry.getValue(); + fuzzyFilterFileOutput.writeInt(fieldInfo.number); + saveAppropriatelySizedFuzzySet(fuzzyFilterFileOutput, fuzzySet, fieldInfo); + } + CodecUtil.writeFooter(fuzzyFilterFileOutput); + } + // We are done with large bitsets so no need to keep them hanging around + fuzzySets.clear(); + IOUtils.closeWhileHandlingException(closeables); + } + + private void saveAppropriatelySizedFuzzySet( + IndexOutput fileOutput, FuzzySet fuzzySet, FieldInfo fieldInfo) throws IOException { + fileOutput.writeString(fuzzySet.setType().getSetName()); + fuzzySet.writeTo(fileOutput); + } + } + + @Override + public String toString() { + return "FuzzyFilterPostingsFormat(" + delegatePostingsFormat + ")"; + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySet.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySet.java new file mode 100644 index 0000000000000..9a5b0a15e3043 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySet.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.Accountable; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedBiFunction; +import org.opensearch.common.CheckedFunction; + +import java.io.Closeable; +import java.io.IOException; + +import java.util.List; +import java.util.Optional; + +/** + * Fuzzy Filter interface + */ +public interface FuzzySet extends Accountable, Closeable { + + /** + * Name used for a codec to be aware of what fuzzy set has been used. + */ + SetType setType(); + + /** + * @param value the item whose membership needs to be checked. + * @return Result see {@Result} + */ + Result contains(BytesRef value); + + boolean isSaturated(); + + void writeTo(DataOutput out) throws IOException; + + /** + * Enum to represent result of membership check on a fuzzy set. + */ + enum Result { + /** + * A definite no for the set membership of an item. + */ + NO, + + /** + * Fuzzy sets cannot guarantee that a given item is present in the set or not due the data being stored in + * a lossy format (e.g. fingerprint, hash). + * Hence, we return a response denoting that the item maybe present. + */ + MAYBE + } + + /** + * Enum to declare supported properties and mappings for a fuzzy set implementation. + */ + enum SetType { + BLOOM_FILTER_V1("bloom_filter_v1", BloomFilter::new, List.of("bloom_filter")); + + private final String setName; + private final CheckedFunction deserializer; + + SetType(String setName, + CheckedFunction deserializer, + List aliases) { + if (aliases.size() < 1) { + throw new IllegalArgumentException("Alias list is empty. Could not create Set Type: " + setName); + } + this.setName = setName; + this.deserializer = deserializer; + } + + public String getSetName() { + return setName; + } + + public CheckedFunction getDeserializer() { + return deserializer; + } + + public static SetType from(String name) { + for (SetType type: SetType.values()) { + if (type.setName.equals(name)) { + return type; + } + } + throw new IllegalArgumentException("There is no implementation for fuzzy set: " + name); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetFactory.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetFactory.java new file mode 100644 index 0000000000000..3938cfb08b632 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetFactory.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.CheckedSupplier; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Factory class to create fuzzy set. + * Supports bloom filters for now. More sets can be added as required. + */ +public class FuzzySetFactory { + + private final Map setTypeForField; + + public FuzzySetFactory(Map setTypeForField) { + this.setTypeForField = setTypeForField; + } + + public FuzzySet createFuzzySet(int maxDocs, String fieldName, CheckedSupplier, IOException> iteratorProvider) throws IOException { + FuzzySetParameters params = setTypeForField.get(fieldName); + if (params == null) { + throw new IllegalArgumentException("No fuzzy set defined for field: " + fieldName); + } + switch (params.getSetType()) { + case BLOOM_FILTER_V1: + return new BloomFilter(maxDocs, params.getFalsePositiveProbability(), iteratorProvider); + default: + throw new IllegalArgumentException("No Implementation for set type: " + params.getSetType()); + } + } + + public static FuzzySet deserializeFuzzySet(IndexInput in) throws IOException { + FuzzySet.SetType setType = FuzzySet.SetType.from(in.readString()); + return setType.getDeserializer().apply(in); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetParameters.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetParameters.java new file mode 100644 index 0000000000000..7bb96e7c34f0b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/FuzzySetParameters.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import java.util.function.Supplier; + +/** + * Wrapper for params to create a fuzzy set. + */ +public class FuzzySetParameters { + private final Supplier falsePositiveProbabilityProvider; + private final FuzzySet.SetType setType; + + public static final double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.2047d; + + public FuzzySetParameters(Supplier falsePositiveProbabilityProvider) { + this.falsePositiveProbabilityProvider = falsePositiveProbabilityProvider; + this.setType = FuzzySet.SetType.BLOOM_FILTER_V1; + } + + public double getFalsePositiveProbability() { + return falsePositiveProbabilityProvider.get(); + } + + public FuzzySet.SetType getSetType() { + return setType; + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/IndexInputLongArray.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/IndexInputLongArray.java new file mode 100644 index 0000000000000..67de4a1964653 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/IndexInputLongArray.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.RandomAccessInput; +import org.opensearch.OpenSearchException; +import org.opensearch.common.CheckedSupplier; +import org.opensearch.common.util.LongArray; + +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A Long array backed by RandomAccessInput. + */ +public class IndexInputLongArray implements LongArray { + + public RandomAccessInput input; + private long size; + + public IndexInputLongArray(long size, RandomAccessInput input) { + this.size = size; + this.input = input; + } + + @Override + public void close() { + } + + @Override + public long size() { + return size; + } + + @Override + public long get(long index) { + return wrapException(() -> input.readLong(index << 3)); + } + + @Override + public long set(long index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public long increment(long index, long inc) { + throw new UnsupportedOperationException(); + } + + @Override + public void fill(long fromIndex, long toIndex, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public long ramBytesUsed() { + return 128; + } + + private T wrapException(CheckedSupplier supplier) { + try { + return supplier.get(); + } catch (IOException ex) { + throw new OpenSearchException(ex); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/LongArrayBackedBitSet.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/LongArrayBackedBitSet.java new file mode 100644 index 0000000000000..7465943abb129 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/LongArrayBackedBitSet.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.Accountable; +import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.LongArray; +import org.opensearch.common.util.io.IOUtils; + +import java.io.Closeable; +import java.io.IOException; + +/** + * A bitset backed by a long-indexed array. + */ +public class LongArrayBackedBitSet implements Accountable, Closeable { + + private long underlyingArrayLength = 0L; + private LongArray longArray; + + LongArrayBackedBitSet(long capacity) { + underlyingArrayLength = ((capacity - 1L) >> 6) + 1; + this.longArray = BigArrays.NON_RECYCLING_INSTANCE.withCircuitBreaking().newLongArray(underlyingArrayLength); + } + + LongArrayBackedBitSet(IndexInput in) throws IOException { + underlyingArrayLength = in.readLong(); + long streamLength = underlyingArrayLength << 3; + this.longArray = new IndexInputLongArray(underlyingArrayLength, in.randomAccessSlice(in.getFilePointer(), streamLength)); + in.skipBytes(streamLength); + } + + public void writeTo(DataOutput out) throws IOException { + out.writeLong(underlyingArrayLength); + for (int idx = 0; idx < underlyingArrayLength; idx ++) { + out.writeLong(longArray.get(idx)); + } + } + + public long cardinality() { + long tot = 0; + for (int i = 0; i < underlyingArrayLength; ++i) { + tot += Long.bitCount(longArray.get(i)); + } + return tot; + } + + public boolean isSet(long index) { + long i = index >> 6; // div 64 + long val = longArray.get(i); + // signed shift will keep a negative index and force an + // array-index-out-of-bounds-exception, removing the need for an explicit check. + long bitmask = 1L << index; + return (val & bitmask) != 0; + } + + public void set(long index) { + long wordNum = index >> 6; // div 64 + long bitmask = 1L << index; + long val = longArray.get(wordNum); + longArray.set(wordNum, val | bitmask); + } + + @Override + public long ramBytesUsed() { + return 128L + longArray.ramBytesUsed(); + } + + @Override + public void close() throws IOException { + IOUtils.close(longArray); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/MurmurHash64.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/MurmurHash64.java new file mode 100644 index 0000000000000..33e263a172777 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/MurmurHash64.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.util.BitUtil; +import org.apache.lucene.util.BytesRef; + +/** + * Utility to calculate hash for incoming bytes. + */ +public class MurmurHash64 { + private static final long M64 = 0xc6a4a7935bd1e995L; + private static final int R64 = 47; + public static final MurmurHash64 INSTANCE = new MurmurHash64(); + + /** + * Generates a 64-bit hash from byte array of the given length and seed. + * + * @param data The input byte array + * @param seed The initial seed value + * @param length The length of the array + * @return The 64-bit hash of the given array + */ + public static long hash64(byte[] data, int seed, int offset, int length) { + long h = (seed & 0xffffffffL) ^ (length * M64); + + final int nblocks = length >> 3; + + // body + for (int i = 0; i < nblocks; i++) { + + long k = (long) BitUtil.VH_LE_LONG.get(data, offset); + k *= M64; + k ^= k >>> R64; + k *= M64; + + h ^= k; + h *= M64; + + offset += Long.BYTES; + } + + int remaining = length & 0x07; + if (0 < remaining) { + for (int i = 0; i < remaining; i++) { + h ^= ((long) data[offset + i] & 0xff) << (Byte.SIZE * i); + } + h *= M64; + } + + h ^= h >>> R64; + h *= M64; + h ^= h >>> R64; + + return h; + } + + public final long hash(BytesRef br) { + return hash64(br.bytes, 0xe17a1465, br.offset, br.length); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/fuzzy/package-info.java b/server/src/main/java/org/opensearch/index/codec/fuzzy/package-info.java new file mode 100644 index 0000000000000..7aeac68cd192a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/fuzzy/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** classes responsible for handling all fuzzy codecs and operations */ +package org.opensearch.index.codec.fuzzy; diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index f4fd2490c7abe..11204c1a9db15 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -40,6 +40,7 @@ import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.ReplicationStats; +import org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat; import org.opensearch.index.remote.RemoteSegmentStats; import java.io.IOException; @@ -93,7 +94,8 @@ public class SegmentsStats implements Writeable, ToXContentFragment { Map.entry("tvx", "Term Vector Index"), Map.entry("tvd", "Term Vector Documents"), Map.entry("tvf", "Term Vector Fields"), - Map.entry("liv", "Live Documents") + Map.entry("liv", "Live Documents"), + Map.entry(FuzzyFilterPostingsFormat.FUZZY_FILTER_FILE_EXTENSION, "Fuzzy Filter") ); public SegmentsStats() { diff --git a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat index 2c92f0ecd3f51..80b1d25064885 100644 --- a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat +++ b/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.PostingsFormat @@ -1 +1,2 @@ org.apache.lucene.search.suggest.document.Completion50PostingsFormat +org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat diff --git a/server/src/test/java/org/opensearch/index/codec/fuzzy/BloomFilterTests.java b/server/src/test/java/org/opensearch/index/codec/fuzzy/BloomFilterTests.java new file mode 100644 index 0000000000000..30bba00c6e24c --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/fuzzy/BloomFilterTests.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.store.ByteArrayDataOutput; +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class BloomFilterTests extends OpenSearchTestCase { + + public void testBloomFilterAbsence() { + + } + + public void testBloomFilterSerializationDeserialization() throws IOException { + int elementCount = randomIntBetween(1, 100); + long maxDocs = elementCount * 10L; // Keeping this high so that it ensures some bits are not set. + BloomFilter filter = new BloomFilter(maxDocs, getFpp(), () -> idIterator(elementCount)); + byte[] buffer = new byte[(int)maxDocs * 5]; + ByteArrayDataOutput out = new ByteArrayDataOutput(buffer); + + // Write in the format readable through factory + out.writeString(filter.setType().getSetName()); + filter.writeTo(out); + + FuzzySet reconstructedFilter = FuzzySetFactory.deserializeFuzzySet(new ByteArrayIndexInput("filter", buffer)); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, reconstructedFilter.setType()); + + Iterator idIterator = idIterator(elementCount); + while (idIterator.hasNext()) { + BytesRef element = idIterator.next(); + assertEquals(FuzzySet.Result.MAYBE, reconstructedFilter.contains(element)); + assertEquals(FuzzySet.Result.MAYBE, filter.contains(element)); + } + } + + public void testBloomFilterIsSaturated_returnsTrue() throws IOException { + BloomFilter bloomFilter = new BloomFilter(1L, getFpp(), () -> idIterator(1000)); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, bloomFilter.setType()); + assertEquals(true, bloomFilter.isSaturated()); + } + + public void testBloomFilterIsSaturated_returnsFalse() throws IOException { + int elementCount = randomIntBetween(1, 100); + BloomFilter bloomFilter = new BloomFilter(20000, getFpp(), () -> idIterator(elementCount)); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, bloomFilter.setType()); + assertEquals(false, bloomFilter.isSaturated()); + } + + public void testBloomFilterWithLargeCapacity() throws IOException { + long maxDocs = randomLongBetween(Integer.MAX_VALUE, 5L * Integer.MAX_VALUE); + BloomFilter bloomFilter = new BloomFilter(maxDocs, getFpp(), () -> List.of(new BytesRef("bar")).iterator()); + assertEquals(FuzzySet.SetType.BLOOM_FILTER_V1, bloomFilter.setType()); + } + + private double getFpp() { + return randomDoubleBetween(0.01, 0.50, true); + } + + private Iterator idIterator(int count) { + return new Iterator() { + int cnt = count; + @Override + public boolean hasNext() { + return cnt -- > 0; + } + + @Override + public BytesRef next() { + return new BytesRef(Integer.toString(cnt)); + } + }; + } +} diff --git a/server/src/test/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormatTests.java b/server/src/test/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormatTests.java new file mode 100644 index 0000000000000..674300f039722 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/codec/fuzzy/FuzzyFilterPostingsFormatTests.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.fuzzy; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.tests.index.BasePostingsFormatTestCase; +import org.apache.lucene.tests.util.TestUtil; + +import java.util.TreeMap; + +public class FuzzyFilterPostingsFormatTests extends BasePostingsFormatTestCase { + + private TreeMap params = new TreeMap<>() { + @Override + public FuzzySetParameters get(Object k) { + return new FuzzySetParameters(() -> 0.2047); + } + }; + + private Codec fuzzyFilterCodec = TestUtil.alwaysPostingsFormat(new FuzzyFilterPostingsFormat(TestUtil.getDefaultPostingsFormat(), new FuzzySetFactory(params))); + + @Override + protected Codec getCodec() { + return fuzzyFilterCodec; + } +} diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 0c6c81103922f..5cca2647962c1 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -208,6 +208,8 @@ import static org.opensearch.core.common.util.CollectionUtils.eagerPartition; import static org.opensearch.discovery.DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING; import static org.opensearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING; +import static org.opensearch.index.IndexSettings.INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING; +import static org.opensearch.index.IndexSettings.INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING; import static org.opensearch.index.IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.test.XContentTestUtils.convertToMap; @@ -770,6 +772,11 @@ public Settings indexSettings() { ); } + if (randomBoolean()) { + builder.put(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING.getKey(), true); + builder.put(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING.getKey(), randomDoubleBetween(0.00, 0.50, true)); + } + return builder.build(); }