From c130e18fb9d5c97ae0277c24550965e15f814c26 Mon Sep 17 00:00:00 2001 From: Nick Knize Date: Mon, 14 Aug 2023 20:06:51 -0500 Subject: [PATCH] Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility (#9262) * Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility This commit refactors the CompressorFactory static singleton class and CompressorType enum to a formal CompressorRegistry and enables downstream implementations to register their own compression implementations for use in compressing Blob stores and MediaType data. This is different from Lucene's Codec compression extension points which expose different compression implementations for Lucene's Stored Fields. Signed-off-by: Nicholas Walter Knize * fix missing javadoc and relocate the common compress to compress package Signed-off-by: Nicholas Walter Knize * fix deprecated case Signed-off-by: Nicholas Walter Knize * fix typo in compress build.gradle Signed-off-by: Nicholas Walter Knize * PR cleanup Signed-off-by: Nicholas Walter Knize * PR changes Signed-off-by: Nicholas Walter Knize * update with initial annotations Signed-off-by: Nicholas Walter Knize * remove NONE singleton in CompressorRegistry and remove static block init Signed-off-by: Nicholas Walter Knize --------- Signed-off-by: Nicholas Walter Knize Signed-off-by: Kiran Reddy --- gradle/missing-javadoc.gradle | 1 + libs/compress/build.gradle | 38 ++++++ .../licenses/zstd-jni-1.5.5-3.jar.sha1 | 0 .../compress}/licenses/zstd-jni-LICENSE.txt | 0 .../compress}/licenses/zstd-jni-NOTICE.txt | 0 .../opensearch}/compress/ZstdCompressor.java | 16 ++- .../opensearch}/compress/package-info.java | 6 +- .../compress/spi/CompressionProvider.java | 32 +++++ .../opensearch/compress/spi/package-info.java | 15 +++ .../java/org/opensearch/package-info.java | 13 ++ ...earch.core.compress.spi.CompressorProvider | 9 ++ .../compress/ZstdCompressTests.java | 9 +- .../java/org/opensearch/ExceptionsHelper.java | 2 +- .../{common => }/compress/Compressor.java | 16 ++- .../core/compress/CompressorRegistry.java | 115 ++++++++++++++++++ .../core}/compress/NoneCompressor.java | 15 ++- .../compress/NotCompressedException.java | 2 +- .../compress/NotXContentException.java | 2 +- .../core/compress/package-info.java | 14 +++ .../core/compress/spi/CompressorProvider.java | 34 ++++++ .../spi/DefaultCompressorProvider.java | 31 +++++ .../core/compress/spi/package-info.java | 16 +++ ...earch.core.compress.spi.CompressorProvider | 9 ++ server/build.gradle | 4 +- .../coordination/CompressedStreamUtils.java | 10 +- .../common/compress/CompressedXContent.java | 13 +- .../common/compress/CompressorFactory.java | 101 --------------- .../common/compress/CompressorType.java | 42 ------- .../common/compress/DeflateCompressor.java | 17 ++- .../spi/ServerCompressorProvider.java | 36 ++++++ .../common/compress/spi/package-info.java | 16 +++ .../common/io/stream/BytesStreamOutput.java | 6 +- .../common/util/PageCacheRecycler.java | 4 +- .../common/xcontent/XContentHelper.java | 16 +-- .../org/opensearch/index/get/GetResult.java | 4 +- .../blobstore/BlobStoreRepository.java | 22 ++-- .../blobstore/ChecksumBlobStoreFormat.java | 2 +- .../java/org/opensearch/search/SearchHit.java | 4 +- .../CompressibleBytesOutputStream.java | 6 +- .../transport/TransportDecompressor.java | 12 +- .../opensearch/transport/TransportLogger.java | 8 +- ...earch.core.compress.spi.CompressorProvider | 9 ++ .../common/compress/DeflateCompressTests.java | 7 +- .../DeflateCompressedXContentTests.java | 2 +- .../index/mapper/BinaryFieldMapperTests.java | 8 +- .../snapshots/BlobStoreFormatTests.java | 23 ++-- .../CompressibleBytesOutputStreamTests.java | 12 +- .../transport/TransportDecompressorTests.java | 8 +- ...earchBlobStoreRepositoryIntegTestCase.java | 4 +- .../AbstractSnapshotIntegTestCase.java | 4 +- .../compress/AbstractCompressorTestCase.java | 8 +- 51 files changed, 542 insertions(+), 261 deletions(-) create mode 100644 libs/compress/build.gradle rename {server => libs/compress}/licenses/zstd-jni-1.5.5-3.jar.sha1 (100%) rename {server => libs/compress}/licenses/zstd-jni-LICENSE.txt (100%) rename {server => libs/compress}/licenses/zstd-jni-NOTICE.txt (100%) rename {server/src/main/java/org/opensearch/common => libs/compress/src/main/java/org/opensearch}/compress/ZstdCompressor.java (85%) rename libs/{core/src/main/java/org/opensearch/core/common => compress/src/main/java/org/opensearch}/compress/package-info.java (63%) create mode 100644 libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java create mode 100644 libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java create mode 100644 libs/compress/src/main/java/org/opensearch/package-info.java create mode 100644 libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider rename {server/src/test/java/org/opensearch/common => libs/compress/src/test/java/org/opensearch}/compress/ZstdCompressTests.java (58%) rename libs/core/src/main/java/org/opensearch/core/{common => }/compress/Compressor.java (77%) create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java rename {server/src/main/java/org/opensearch/common => libs/core/src/main/java/org/opensearch/core}/compress/NoneCompressor.java (74%) rename {server/src/main/java/org/opensearch/common => libs/core/src/main/java/org/opensearch/core}/compress/NotCompressedException.java (97%) rename libs/core/src/main/java/org/opensearch/core/{common => }/compress/NotXContentException.java (96%) create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/package-info.java create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java create mode 100644 libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java create mode 100644 libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider delete mode 100644 server/src/main/java/org/opensearch/common/compress/CompressorFactory.java delete mode 100644 server/src/main/java/org/opensearch/common/compress/CompressorType.java create mode 100644 server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java create mode 100644 server/src/main/java/org/opensearch/common/compress/spi/package-info.java create mode 100644 server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider rename server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java => test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java (98%) diff --git a/gradle/missing-javadoc.gradle b/gradle/missing-javadoc.gradle index 03addc1ba3616..7512955fef3c4 100644 --- a/gradle/missing-javadoc.gradle +++ b/gradle/missing-javadoc.gradle @@ -166,6 +166,7 @@ configure([ configure([ project(":libs:opensearch-common"), project(":libs:opensearch-core"), + project(":libs:opensearch-compress"), project(":plugins:events-correlation-engine"), project(":server") ]) { diff --git a/libs/compress/build.gradle b/libs/compress/build.gradle new file mode 100644 index 0000000000000..7a5bc2f573dea --- /dev/null +++ b/libs/compress/build.gradle @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +apply plugin: 'opensearch.build' +apply plugin: 'opensearch.publish' + +base { + archivesName = 'opensearch-compress' +} + +dependencies { + api project(':libs:opensearch-common') + api project(':libs:opensearch-core') + + //zstd + api "com.github.luben:zstd-jni:${versions.zstd}" + + testImplementation(project(":test:framework")) { + // tests use the locally compiled version of server + exclude group: 'org.opensearch', module: 'opensearch-compress' + } +} + +tasks.named('forbiddenApisMain').configure { + // :libs:opensearch-compress does not depend on server + // TODO: Need to decide how we want to handle for forbidden signatures with the changes to server + replaceSignatureFiles 'jdk-signatures' +} + +jarHell.enabled = false diff --git a/server/licenses/zstd-jni-1.5.5-3.jar.sha1 b/libs/compress/licenses/zstd-jni-1.5.5-3.jar.sha1 similarity index 100% rename from server/licenses/zstd-jni-1.5.5-3.jar.sha1 rename to libs/compress/licenses/zstd-jni-1.5.5-3.jar.sha1 diff --git a/server/licenses/zstd-jni-LICENSE.txt b/libs/compress/licenses/zstd-jni-LICENSE.txt similarity index 100% rename from server/licenses/zstd-jni-LICENSE.txt rename to libs/compress/licenses/zstd-jni-LICENSE.txt diff --git a/server/licenses/zstd-jni-NOTICE.txt b/libs/compress/licenses/zstd-jni-NOTICE.txt similarity index 100% rename from server/licenses/zstd-jni-NOTICE.txt rename to libs/compress/licenses/zstd-jni-NOTICE.txt diff --git a/server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java b/libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java similarity index 85% rename from server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java rename to libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java index 922e8f6f39668..e9d09f21f7ace 100644 --- a/server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java +++ b/libs/compress/src/main/java/org/opensearch/compress/ZstdCompressor.java @@ -6,13 +6,14 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.compress; import com.github.luben.zstd.RecyclingBufferPool; import com.github.luben.zstd.ZstdInputStreamNoFinalizer; import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -24,7 +25,8 @@ /** * {@link Compressor} implementation based on the ZSTD compression algorithm. * - * @opensearch.internal + * @opensearch.api - registered name requires BWC support + * @opensearch.experimental - class methods might change */ public class ZstdCompressor implements Compressor { // An arbitrary header that we use to identify compressed streams @@ -33,6 +35,14 @@ public class ZstdCompressor implements Compressor { // a XContent private static final byte[] HEADER = new byte[] { 'Z', 'S', 'T', 'D', '\0' }; + /** + * The name to register the compressor by + * + * @opensearch.api - requires BWC support + */ + @PublicApi(since = "2.10.0") + public static final String NAME = "ZSTD"; + private static final int LEVEL = 3; private static final int BUFFER_SIZE = 4096; diff --git a/libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java b/libs/compress/src/main/java/org/opensearch/compress/package-info.java similarity index 63% rename from libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java rename to libs/compress/src/main/java/org/opensearch/compress/package-info.java index 99459f99c42d8..3ffa53079fa69 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/compress/package-info.java +++ b/libs/compress/src/main/java/org/opensearch/compress/package-info.java @@ -6,5 +6,7 @@ * compatible open source license. */ -/** Classes for core compress module */ -package org.opensearch.core.common.compress; +/** + * Concrete {@link org.opensearch.core.compress.Compressor} implementations + */ +package org.opensearch.compress; diff --git a/libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java b/libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java new file mode 100644 index 0000000000000..4e0ba105d7685 --- /dev/null +++ b/libs/compress/src/main/java/org/opensearch/compress/spi/CompressionProvider.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.compress.spi; + +import org.opensearch.compress.ZstdCompressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.spi.CompressorProvider; + +import java.util.AbstractMap.SimpleEntry; +import java.util.Map.Entry; +import java.util.List; + +/** + * Additional "optional" compressor implementations provided by the opensearch compress library + * + * @opensearch.internal + */ +public class CompressionProvider implements CompressorProvider { + + /** Returns the concrete {@link Compressor}s provided by the compress library */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List> getCompressors() { + return List.of(new SimpleEntry<>(ZstdCompressor.NAME, new ZstdCompressor())); + } +} diff --git a/libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java b/libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java new file mode 100644 index 0000000000000..47d982a7ca2f9 --- /dev/null +++ b/libs/compress/src/main/java/org/opensearch/compress/spi/package-info.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Provider Interface for registering concrete {@link org.opensearch.core.compress.Compressor} + * implementations. + * + * See {@link org.opensearch.compress.ZstdCompressor} + */ +package org.opensearch.compress.spi; diff --git a/libs/compress/src/main/java/org/opensearch/package-info.java b/libs/compress/src/main/java/org/opensearch/package-info.java new file mode 100644 index 0000000000000..264680e9cb271 --- /dev/null +++ b/libs/compress/src/main/java/org/opensearch/package-info.java @@ -0,0 +1,13 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This is the compress library for registering optional + * {@link org.opensearch.core.compress.Compressor} implementations + */ +package org.opensearch; diff --git a/libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider b/libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider new file mode 100644 index 0000000000000..a9ea063e24436 --- /dev/null +++ b/libs/compress/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.compress.spi.CompressionProvider diff --git a/server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java b/libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java similarity index 58% rename from server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java rename to libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java index 9def702792ffc..54864054a0e02 100644 --- a/server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java +++ b/libs/compress/src/test/java/org/opensearch/compress/ZstdCompressTests.java @@ -6,19 +6,20 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.compress; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.test.core.compress.AbstractCompressorTestCase; /** * Test streaming compression */ -public class ZstdCompressTests extends AbstractCompressorTests { +public class ZstdCompressTests extends AbstractCompressorTestCase { private final Compressor compressor = new ZstdCompressor(); @Override - Compressor compressor() { + protected Compressor compressor() { return compressor; } } diff --git a/libs/core/src/main/java/org/opensearch/ExceptionsHelper.java b/libs/core/src/main/java/org/opensearch/ExceptionsHelper.java index 317ea99cab6a7..9692d20a050ff 100644 --- a/libs/core/src/main/java/org/opensearch/ExceptionsHelper.java +++ b/libs/core/src/main/java/org/opensearch/ExceptionsHelper.java @@ -43,7 +43,7 @@ import org.opensearch.common.CheckedSupplier; import org.opensearch.common.Nullable; import org.opensearch.core.action.ShardOperationFailedException; -import org.opensearch.core.common.compress.NotXContentException; +import org.opensearch.core.compress.NotXContentException; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.Index; import org.opensearch.core.rest.RestStatus; diff --git a/libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java b/libs/core/src/main/java/org/opensearch/core/compress/Compressor.java similarity index 77% rename from libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java rename to libs/core/src/main/java/org/opensearch/core/compress/Compressor.java index 88b6c9f85f225..27d5b5dfdfa15 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/compress/Compressor.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/Compressor.java @@ -30,8 +30,10 @@ * GitHub history for details. */ -package org.opensearch.core.common.compress; +package org.opensearch.core.compress; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.bytes.BytesReference; import java.io.IOException; @@ -39,10 +41,18 @@ import java.io.OutputStream; /** - * Compressor interface + * Compressor interface used for compressing {@link org.opensearch.core.xcontent.MediaType} and + * {@code org.opensearch.repositories.blobstore.BlobStoreRepository} implementations. * - * @opensearch.internal + * This is not to be confused with {@link org.apache.lucene.codecs.compressing.Compressor} which is used + * for codec implementations such as {@code org.opensearch.index.codec.customcodecs.Lucene95CustomCodec} + * for compressing {@link org.apache.lucene.document.StoredField}s + * + * @opensearch.api - intended to be extended + * @opensearch.experimental - however, bwc is not guaranteed at this time */ +@ExperimentalApi +@PublicApi(since = "2.10.0") public interface Compressor { boolean isCompressed(BytesReference bytes); diff --git a/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java new file mode 100644 index 0000000000000..9290254c30d8d --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/CompressorRegistry.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.compress; + +import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.InternalApi; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.spi.CompressorProvider; +import org.opensearch.core.xcontent.MediaTypeRegistry; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +/** + * A registry that wraps a static Map singleton which holds a mapping of unique String names (typically the + * compressor header as a string) to registerd {@link Compressor} implementations. + * + * This enables plugins, modules, extensions to register their own compression implementations through SPI + * + * @opensearch.experimental + * @opensearch.internal + */ +@InternalApi +public final class CompressorRegistry { + + // the backing registry map + private static final Map registeredCompressors = ServiceLoader.load( + CompressorProvider.class, + CompressorProvider.class.getClassLoader() + ) + .stream() + .flatMap(p -> p.get().getCompressors().stream()) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + + // no instance: + private CompressorRegistry() {} + + /** + * Returns the default compressor + */ + public static Compressor defaultCompressor() { + return registeredCompressors.get("DEFLATE"); + } + + public static Compressor none() { + return registeredCompressors.get(NoneCompressor.NAME); + } + + public static boolean isCompressed(BytesReference bytes) { + return compressor(bytes) != null; + } + + @Nullable + public static Compressor compressor(final BytesReference bytes) { + for (Compressor compressor : registeredCompressors.values()) { + if (compressor.isCompressed(bytes) == true) { + // bytes should be either detected as compressed or as xcontent, + // if we have bytes that can be either detected as compressed or + // as a xcontent, we have a problem + assert MediaTypeRegistry.xContentType(bytes) == null; + return compressor; + } + } + + if (MediaTypeRegistry.xContentType(bytes) == null) { + throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"); + } + + return null; + } + + /** Decompress the provided {@link BytesReference}. */ + public static BytesReference uncompress(BytesReference bytes) throws IOException { + Compressor compressor = compressor(bytes); + if (compressor == null) { + throw new NotCompressedException(); + } + return compressor.uncompress(bytes); + } + + /** + * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}. + */ + public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException { + Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null")); + return compressor == null ? bytes : compressor.uncompress(bytes); + } + + /** Returns a registered compressor by its registered name */ + public static Compressor getCompressor(final String name) { + if (registeredCompressors.containsKey(name)) { + return registeredCompressors.get(name); + } + throw new IllegalArgumentException("No registered compressor found by name [" + name + "]"); + } + + /** + * Returns the registered compressors as an Immutable collection + * + * note: used for testing + */ + public static Map registeredCompressors() { + // no destructive danger as backing map is immutable + return registeredCompressors; + } +} diff --git a/server/src/main/java/org/opensearch/common/compress/NoneCompressor.java b/libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java similarity index 74% rename from server/src/main/java/org/opensearch/common/compress/NoneCompressor.java rename to libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java index f820a3351bc80..6e607ed701633 100644 --- a/server/src/main/java/org/opensearch/common/compress/NoneCompressor.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/NoneCompressor.java @@ -6,10 +6,10 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.core.compress; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; import java.io.IOException; import java.io.InputStream; @@ -18,9 +18,18 @@ /** * {@link Compressor} no compressor implementation. * - * @opensearch.internal + * @opensearch.api - registered name requires BWC support + * @opensearch.experimental - class methods might change */ public class NoneCompressor implements Compressor { + /** + * The name to register the compressor by + * + * @opensearch.api - requires BWC support + */ + @PublicApi(since = "2.10.0") + public static final String NAME = "NONE"; + @Override public boolean isCompressed(BytesReference bytes) { return false; diff --git a/server/src/main/java/org/opensearch/common/compress/NotCompressedException.java b/libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java similarity index 97% rename from server/src/main/java/org/opensearch/common/compress/NotCompressedException.java rename to libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java index 7f070e0b499d8..91d6bc57f1cd6 100644 --- a/server/src/main/java/org/opensearch/common/compress/NotCompressedException.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/NotCompressedException.java @@ -30,7 +30,7 @@ * GitHub history for details. */ -package org.opensearch.common.compress; +package org.opensearch.core.compress; /** * Exception indicating that we were expecting something compressed, which diff --git a/libs/core/src/main/java/org/opensearch/core/common/compress/NotXContentException.java b/libs/core/src/main/java/org/opensearch/core/compress/NotXContentException.java similarity index 96% rename from libs/core/src/main/java/org/opensearch/core/common/compress/NotXContentException.java rename to libs/core/src/main/java/org/opensearch/core/compress/NotXContentException.java index d1a3e7709a7d0..99337d5a26025 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/compress/NotXContentException.java +++ b/libs/core/src/main/java/org/opensearch/core/compress/NotXContentException.java @@ -30,7 +30,7 @@ * GitHub history for details. */ -package org.opensearch.core.common.compress; +package org.opensearch.core.compress; import org.opensearch.core.xcontent.XContent; diff --git a/libs/core/src/main/java/org/opensearch/core/compress/package-info.java b/libs/core/src/main/java/org/opensearch/core/compress/package-info.java new file mode 100644 index 0000000000000..c0365e45702bc --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/package-info.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Concrete {@link org.opensearch.core.compress.Compressor} implementations provided by the core library + * + * See {@link org.opensearch.core.compress.NoneCompressor} + */ +package org.opensearch.core.compress; diff --git a/libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java b/libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java new file mode 100644 index 0000000000000..019e282444d64 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/spi/CompressorProvider.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.compress.spi; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.compress.Compressor; + +import java.util.List; +import java.util.Map; + +/** + * Service Provider Interface for plugins, modules, extensions providing custom + * compression algorithms + * + * see {@link Compressor} for implementing methods + * and {@link org.opensearch.core.compress.CompressorRegistry} for the registration of custom + * Compressors + * + * @opensearch.experimental + * @opensearch.api + */ +@ExperimentalApi +@PublicApi(since = "2.10.0") +public interface CompressorProvider { + /** Extensions that implement their own concrete {@link Compressor}s provide them through this interface method*/ + List> getCompressors(); +} diff --git a/libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java b/libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java new file mode 100644 index 0000000000000..3ca10b564ef68 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/spi/DefaultCompressorProvider.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.compress.spi; + +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NoneCompressor; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map.Entry; + +/** + * Default {@link Compressor} implementations provided by the + * opensearch core library + * + * @opensearch.internal + */ +public class DefaultCompressorProvider implements CompressorProvider { + /** Returns the default {@link Compressor}s provided by the core library */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List> getCompressors() { + return List.of(new SimpleEntry(NoneCompressor.NAME, new NoneCompressor())); + } +} diff --git a/libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java b/libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java new file mode 100644 index 0000000000000..6e33cc8fb63d3 --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/compress/spi/package-info.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * The Service Provider Interface implementation for registering {@link org.opensearch.core.compress.Compressor} + * with the {@link org.opensearch.core.compress.CompressorRegistry} + * + * See {@link org.opensearch.core.compress.spi.DefaultCompressorProvider} as an example of registering the core + * {@link org.opensearch.core.compress.NoneCompressor} + */ +package org.opensearch.core.compress.spi; diff --git a/libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider b/libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider new file mode 100644 index 0000000000000..181b802952c60 --- /dev/null +++ b/libs/core/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.core.compress.spi.DefaultCompressorProvider diff --git a/server/build.gradle b/server/build.gradle index 3fde1b745c546..f6db3d53a0dcc 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -101,6 +101,7 @@ dependencies { api project(':libs:opensearch-common') api project(':libs:opensearch-core') + api project(":libs:opensearch-compress") api project(':libs:opensearch-secure-sm') api project(':libs:opensearch-x-content') api project(":libs:opensearch-geo") @@ -157,9 +158,6 @@ dependencies { api "com.google.protobuf:protobuf-java:${versions.protobuf}" api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}" - //zstd - api "com.github.luben:zstd-jni:${versions.zstd}" - testImplementation(project(":test:framework")) { // tests use the locally compiled version of server exclude group: 'org.opensearch', module: 'server' diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java index 1bed7c7759317..b761fe7021ced 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CompressedStreamUtils.java @@ -12,16 +12,16 @@ import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.common.CheckedConsumer; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.Compressor; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.transport.BytesTransportRequest; import java.io.IOException; @@ -37,7 +37,7 @@ public final class CompressedStreamUtils { public static BytesReference createCompressedStream(Version version, CheckedConsumer outputConsumer) throws IOException { final BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = new OutputStreamStreamOutput(CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream))) { + try (StreamOutput stream = new OutputStreamStreamOutput(CompressorRegistry.defaultCompressor().threadLocalOutputStream(bStream))) { stream.setVersion(version); outputConsumer.accept(stream); } @@ -48,7 +48,7 @@ public static BytesReference createCompressedStream(Version version, CheckedCons public static StreamInput decompressBytes(BytesTransportRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException { - final Compressor compressor = CompressorFactory.compressor(request.bytes()); + final Compressor compressor = CompressorRegistry.compressor(request.bytes()); final StreamInput in; if (compressor != null) { in = new InputStreamStreamInput(compressor.threadLocalInputStream(request.bytes().streamInput())); diff --git a/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java b/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java index 953f7d2340898..6a37b1dda6c96 100644 --- a/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java +++ b/server/src/main/java/org/opensearch/common/compress/CompressedXContent.java @@ -36,9 +36,10 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; @@ -86,7 +87,7 @@ private CompressedXContent(byte[] compressed, int crc32) { */ public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - OutputStream compressedStream = CompressorFactory.defaultCompressor().threadLocalOutputStream(bStream); + OutputStream compressedStream = CompressorRegistry.defaultCompressor().threadLocalOutputStream(bStream); CRC32 crc32 = new CRC32(); OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32); try (XContentBuilder builder = XContentFactory.jsonBuilder(checkedStream)) { @@ -108,20 +109,20 @@ public CompressedXContent(ToXContent xcontent, ToXContent.Params params) throws * that may already be compressed. */ public CompressedXContent(BytesReference data) throws IOException { - Compressor compressor = CompressorFactory.compressor(data); + Compressor compressor = CompressorRegistry.compressor(data); if (compressor != null) { // already compressed... this.bytes = BytesReference.toBytes(data); this.crc32 = crc32(uncompressed()); } else { - this.bytes = BytesReference.toBytes(CompressorFactory.defaultCompressor().compress(data)); + this.bytes = BytesReference.toBytes(CompressorRegistry.defaultCompressor().compress(data)); this.crc32 = crc32(data); } assertConsistent(); } private void assertConsistent() { - assert CompressorFactory.compressor(new BytesArray(bytes)) != null; + assert CompressorRegistry.compressor(new BytesArray(bytes)) != null; assert this.crc32 == crc32(uncompressed()); } @@ -146,7 +147,7 @@ public BytesReference compressedReference() { /** Return the uncompressed bytes. */ public BytesReference uncompressed() { try { - return CompressorFactory.uncompress(new BytesArray(bytes)); + return CompressorRegistry.uncompress(new BytesArray(bytes)); } catch (IOException e) { throw new IllegalStateException("Cannot decompress compressed string", e); } diff --git a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java b/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java deleted file mode 100644 index e40dd89abab54..0000000000000 --- a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.common.compress; - -import org.opensearch.common.Nullable; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.compress.Compressor; -import org.opensearch.core.common.compress.NotXContentException; -import org.opensearch.core.xcontent.MediaTypeRegistry; - -import java.io.IOException; -import java.util.Objects; - -/** - * Factory to create a compressor instance. - * - * @opensearch.internal - */ -public class CompressorFactory { - - public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor(); - - public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor(); - - public static final Compressor NONE_COMPRESSOR = new NoneCompressor(); - - public static boolean isCompressed(BytesReference bytes) { - return compressor(bytes) != null; - } - - public static Compressor defaultCompressor() { - return DEFLATE_COMPRESSOR; - } - - @Nullable - public static Compressor compressor(BytesReference bytes) { - if (DEFLATE_COMPRESSOR.isCompressed(bytes)) { - // bytes should be either detected as compressed or as xcontent, - // if we have bytes that can be either detected as compressed or - // as a xcontent, we have a problem - assert MediaTypeRegistry.xContentType(bytes) == null; - return DEFLATE_COMPRESSOR; - } else if (ZSTD_COMPRESSOR.isCompressed(bytes)) { - assert MediaTypeRegistry.xContentType(bytes) == null; - return ZSTD_COMPRESSOR; - } - - if (MediaTypeRegistry.xContentType(bytes) == null) { - throw new NotXContentException("Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"); - } - - return null; - } - - /** - * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}. - */ - public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException { - Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null")); - return compressor == null ? bytes : compressor.uncompress(bytes); - } - - /** Decompress the provided {@link BytesReference}. */ - public static BytesReference uncompress(BytesReference bytes) throws IOException { - Compressor compressor = compressor(bytes); - if (compressor == null) { - throw new NotCompressedException(); - } - return compressor.uncompress(bytes); - } -} diff --git a/server/src/main/java/org/opensearch/common/compress/CompressorType.java b/server/src/main/java/org/opensearch/common/compress/CompressorType.java deleted file mode 100644 index bc688bab57c37..0000000000000 --- a/server/src/main/java/org/opensearch/common/compress/CompressorType.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.common.compress; - -import org.opensearch.core.common.compress.Compressor; - -/** - * Supported compression types - * - * @opensearch.internal - */ -public enum CompressorType { - - DEFLATE { - @Override - public Compressor compressor() { - return CompressorFactory.DEFLATE_COMPRESSOR; - } - }, - - ZSTD { - @Override - public Compressor compressor() { - return CompressorFactory.ZSTD_COMPRESSOR; - } - }, - - NONE { - @Override - public Compressor compressor() { - return CompressorFactory.NONE_COMPRESSOR; - } - }; - - public abstract Compressor compressor(); -} diff --git a/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java b/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java index 07d96dd2c0846..3ccac1a941741 100644 --- a/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java +++ b/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java @@ -32,11 +32,12 @@ package org.opensearch.common.compress; -import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.core.Assertions; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.Compressor; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -53,7 +54,8 @@ /** * {@link Compressor} implementation based on the DEFLATE compression algorithm. * - * @opensearch.internal + * @opensearch.api - registered name requires BWC support + * @opensearch.experimental - class methods might change */ public class DeflateCompressor implements Compressor { @@ -62,6 +64,15 @@ public class DeflateCompressor implements Compressor { // enough so that no stream starting with these bytes could be detected as // a XContent private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' }; + + /** + * The name to register the compressor by + * + * @opensearch.api - requires BWC support + */ + @PublicApi(since = "2.10.0") + public static String NAME = "DEFLATE"; + // 3 is a good trade-off between speed and compression ratio private static final int LEVEL = 3; // We use buffering on the input and output of in/def-laters in order to diff --git a/server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java b/server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java new file mode 100644 index 0000000000000..42036f8d88610 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/compress/spi/ServerCompressorProvider.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.compress.spi; + +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.spi.CompressorProvider; + +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import java.util.Map.Entry; + +/** + * Default {@link Compressor} implementations provided by the + * opensearch core library + * + * @opensearch.internal + * + * @deprecated This class is deprecated and will be removed when the {@link DeflateCompressor} is moved to the compress + * library as a default compression option + */ +@Deprecated +public class ServerCompressorProvider implements CompressorProvider { + /** Returns the concrete {@link Compressor}s provided by the server module */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public List> getCompressors() { + return List.of(new SimpleEntry(DeflateCompressor.NAME, new DeflateCompressor())); + } +} diff --git a/server/src/main/java/org/opensearch/common/compress/spi/package-info.java b/server/src/main/java/org/opensearch/common/compress/spi/package-info.java new file mode 100644 index 0000000000000..a8019b23c7d90 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/compress/spi/package-info.java @@ -0,0 +1,16 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Service Provider Interface for registering the{@link org.opensearch.common.compress.DeflateCompressor} with the + * {@link org.opensearch.core.compress.CompressorRegistry}. + * + * Note: this will be refactored to the {@code :libs:opensearch-compress} library after other dependency classes are + * refactored. + */ +package org.opensearch.common.compress.spi; diff --git a/server/src/main/java/org/opensearch/common/io/stream/BytesStreamOutput.java b/server/src/main/java/org/opensearch/common/io/stream/BytesStreamOutput.java index c2b6da3c1756a..ad09efde72009 100644 --- a/server/src/main/java/org/opensearch/common/io/stream/BytesStreamOutput.java +++ b/server/src/main/java/org/opensearch/common/io/stream/BytesStreamOutput.java @@ -35,13 +35,13 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; import org.opensearch.common.Nullable; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.util.BigArrays; -import org.opensearch.core.common.util.ByteArray; import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStream; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.util.ByteArray; import java.io.IOException; diff --git a/server/src/main/java/org/opensearch/common/util/PageCacheRecycler.java b/server/src/main/java/org/opensearch/common/util/PageCacheRecycler.java index 65be2a082f084..b6fd385d25082 100644 --- a/server/src/main/java/org/opensearch/common/util/PageCacheRecycler.java +++ b/server/src/main/java/org/opensearch/common/util/PageCacheRecycler.java @@ -33,14 +33,14 @@ package org.opensearch.common.util; import org.apache.lucene.util.RamUsageEstimator; -import org.opensearch.core.common.bytes.PagedBytesReference; import org.opensearch.common.recycler.AbstractRecyclerC; import org.opensearch.common.recycler.Recycler; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; -import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.common.bytes.PagedBytesReference; +import org.opensearch.core.common.unit.ByteSizeValue; import java.util.Arrays; import java.util.Locale; diff --git a/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java b/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java index ecd5ec1e15943..798a58551457f 100644 --- a/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java +++ b/server/src/main/java/org/opensearch/common/xcontent/XContentHelper.java @@ -33,11 +33,11 @@ package org.opensearch.common.xcontent; import org.opensearch.OpenSearchParseException; +import org.opensearch.common.collect.Tuple; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.common.collect.Tuple; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -77,7 +77,7 @@ public static XContentParser createParser( DeprecationHandler deprecationHandler, BytesReference bytes ) throws IOException { - Compressor compressor = CompressorFactory.compressor(bytes); + Compressor compressor = CompressorRegistry.compressor(bytes); if (compressor != null) { InputStream compressedInput = null; try { @@ -106,7 +106,7 @@ public static XContentParser createParser( MediaType mediaType ) throws IOException { Objects.requireNonNull(mediaType); - Compressor compressor = CompressorFactory.compressor(bytes); + Compressor compressor = CompressorRegistry.compressor(bytes); if (compressor != null) { InputStream compressedInput = null; try { @@ -163,7 +163,7 @@ public static Tuple> convertToMap(Bytes try { final MediaType contentType; InputStream input; - Compressor compressor = CompressorFactory.compressor(bytes); + Compressor compressor = CompressorRegistry.compressor(bytes); if (compressor != null) { InputStream compressedStreamInput = compressor.threadLocalInputStream(bytes.streamInput()); if (compressedStreamInput.markSupported() == false) { @@ -451,7 +451,7 @@ private static boolean allListValuesAreMapsOfOne(List list) { */ @Deprecated public static void writeRawField(String field, BytesReference source, XContentBuilder builder, Params params) throws IOException { - Compressor compressor = CompressorFactory.compressor(source); + Compressor compressor = CompressorRegistry.compressor(source); if (compressor != null) { try (InputStream compressedStreamInput = compressor.threadLocalInputStream(source.streamInput())) { builder.rawField(field, compressedStreamInput); @@ -470,7 +470,7 @@ public static void writeRawField(String field, BytesReference source, XContentBu public static void writeRawField(String field, BytesReference source, XContentType xContentType, XContentBuilder builder, Params params) throws IOException { Objects.requireNonNull(xContentType); - Compressor compressor = CompressorFactory.compressor(source); + Compressor compressor = CompressorRegistry.compressor(source); if (compressor != null) { try (InputStream compressedStreamInput = compressor.threadLocalInputStream(source.streamInput())) { builder.rawField(field, compressedStreamInput, xContentType); diff --git a/server/src/main/java/org/opensearch/index/get/GetResult.java b/server/src/main/java/org/opensearch/index/get/GetResult.java index 6e069e0bd69bd..9fd70f9f13295 100644 --- a/server/src/main/java/org/opensearch/index/get/GetResult.java +++ b/server/src/main/java/org/opensearch/index/get/GetResult.java @@ -36,11 +36,11 @@ import org.opensearch.Version; import org.opensearch.core.common.Strings; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.document.DocumentField; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; @@ -206,7 +206,7 @@ public BytesReference sourceRef() { } try { - this.source = CompressorFactory.uncompressIfNeeded(this.source); + this.source = CompressorRegistry.uncompressIfNeeded(this.source); return this.source; } catch (IOException e) { throw new OpenSearchParseException("failed to decompress source", e); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index b85437c40fa4f..fcb281d708b9c 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -47,6 +47,7 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; +import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.core.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.StepListener; @@ -79,10 +80,8 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.collect.Tuple; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; -import org.opensearch.core.common.compress.Compressor; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.compress.CompressorType; -import org.opensearch.core.common.compress.NotXContentException; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.NotXContentException; import org.opensearch.common.io.Streams; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.store.InputStreamIndexInput; @@ -97,6 +96,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.lease.Releasable; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.util.BytesRefUtils; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -265,10 +265,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp public static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", false, Setting.Property.NodeScope); - public static final Setting COMPRESSION_TYPE_SETTING = new Setting<>( + public static final Setting COMPRESSION_TYPE_SETTING = new Setting<>( "compression_type", - CompressorType.DEFLATE.name().toLowerCase(Locale.ROOT), - s -> CompressorType.valueOf(s.toUpperCase(Locale.ROOT)), + DeflateCompressor.NAME.toLowerCase(Locale.ROOT), + s -> CompressorRegistry.getCompressor(s.toUpperCase(Locale.ROOT)), Setting.Property.NodeScope ); @@ -402,7 +402,7 @@ protected BlobStoreRepository( cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings()); - this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()).compressor() : CompressorFactory.NONE_COMPRESSOR; + this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()) : CompressorRegistry.none(); } @Override @@ -770,7 +770,7 @@ public BlobStore blobStore() { * @return true if compression is needed */ protected final boolean isCompress() { - return compressor != CompressorFactory.NONE_COMPRESSOR; + return compressor != CompressorRegistry.none(); } /** @@ -1949,7 +1949,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) { if (cacheRepositoryData && bestEffortConsistency == false) { final BytesReference serialized; try { - serialized = CompressorFactory.defaultCompressor().compress(updated); + serialized = CompressorRegistry.defaultCompressor().compress(updated); final int len = serialized.length(); if (len > ByteSizeUnit.KB.toBytes(500)) { logger.debug( @@ -1985,7 +1985,7 @@ private void cacheRepositoryData(BytesReference updated, long generation) { } private RepositoryData repositoryDataFromCachedEntry(Tuple cacheEntry) throws IOException { - try (InputStream input = CompressorFactory.defaultCompressor().threadLocalInputStream(cacheEntry.v2().streamInput())) { + try (InputStream input = CompressorRegistry.defaultCompressor().threadLocalInputStream(cacheEntry.v2().streamInput())) { return RepositoryData.snapshotsFromXContent( MediaTypeRegistry.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, input), cacheEntry.v1() diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 01b15b7d3dd0f..b0e4e45deeb34 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -43,7 +43,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.CheckedFunction; import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; diff --git a/server/src/main/java/org/opensearch/search/SearchHit.java b/server/src/main/java/org/opensearch/search/SearchHit.java index a2e17df4c61e3..c18155cf68122 100644 --- a/server/src/main/java/org/opensearch/search/SearchHit.java +++ b/server/src/main/java/org/opensearch/search/SearchHit.java @@ -37,7 +37,6 @@ import org.opensearch.Version; import org.opensearch.action.OriginalIndices; import org.opensearch.common.Nullable; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.document.DocumentField; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.common.ParsingException; @@ -47,6 +46,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.text.Text; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.ConstructingObjectParser; @@ -383,7 +383,7 @@ public BytesReference getSourceRef() { } try { - this.source = CompressorFactory.uncompressIfNeeded(this.source); + this.source = CompressorRegistry.uncompressIfNeeded(this.source); return this.source; } catch (IOException e) { throw new OpenSearchParseException("failed to decompress source", e); diff --git a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java index 1deddf93b6252..b2471bfb7c4d8 100644 --- a/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java +++ b/server/src/main/java/org/opensearch/transport/CompressibleBytesOutputStream.java @@ -33,11 +33,11 @@ package org.opensearch.transport; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.Streams; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.BytesStream; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.compress.CompressorRegistry; import java.io.IOException; import java.io.OutputStream; @@ -68,7 +68,7 @@ final class CompressibleBytesOutputStream extends StreamOutput { this.bytesStreamOutput = bytesStreamOutput; this.shouldCompress = shouldCompress; if (shouldCompress) { - this.stream = CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput)); + this.stream = CompressorRegistry.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput)); } else { this.stream = bytesStreamOutput; } diff --git a/server/src/main/java/org/opensearch/transport/TransportDecompressor.java b/server/src/main/java/org/opensearch/transport/TransportDecompressor.java index ca64a3c917d46..8fbc3b7ce6803 100644 --- a/server/src/main/java/org/opensearch/transport/TransportDecompressor.java +++ b/server/src/main/java/org/opensearch/transport/TransportDecompressor.java @@ -34,13 +34,13 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.bytes.ReleasableBytesReference; -import org.opensearch.core.common.compress.Compressor; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.recycler.Recycler; import org.opensearch.common.util.PageCacheRecycler; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.Compressor; +import org.opensearch.core.compress.CompressorRegistry; import java.io.Closeable; import java.io.IOException; @@ -70,7 +70,7 @@ public TransportDecompressor(PageCacheRecycler recycler) { public int decompress(BytesReference bytesReference) throws IOException { int bytesConsumed = 0; if (hasReadHeader == false) { - final Compressor compressor = CompressorFactory.defaultCompressor(); + final Compressor compressor = CompressorRegistry.defaultCompressor(); if (compressor.isCompressed(bytesReference) == false) { int maxToRead = Math.min(bytesReference.length(), 10); StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead) @@ -137,7 +137,7 @@ public int decompress(BytesReference bytesReference) throws IOException { } public boolean canDecompress(int bytesAvailable) { - return hasReadHeader || bytesAvailable >= CompressorFactory.defaultCompressor().headerLength(); + return hasReadHeader || bytesAvailable >= CompressorRegistry.defaultCompressor().headerLength(); } public boolean isEOS() { diff --git a/server/src/main/java/org/opensearch/transport/TransportLogger.java b/server/src/main/java/org/opensearch/transport/TransportLogger.java index d7d00a20964f8..997b3bb5ba18e 100644 --- a/server/src/main/java/org/opensearch/transport/TransportLogger.java +++ b/server/src/main/java/org/opensearch/transport/TransportLogger.java @@ -34,12 +34,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.compress.CompressorRegistry; import java.io.IOException; @@ -179,7 +179,7 @@ private static String format(TcpChannel channel, InboundMessage message, String private static StreamInput decompressingStream(byte status, StreamInput streamInput) throws IOException { if (TransportStatus.isCompress(status) && streamInput.available() > 0) { try { - return new InputStreamStreamInput(CompressorFactory.defaultCompressor().threadLocalInputStream(streamInput)); + return new InputStreamStreamInput(CompressorRegistry.defaultCompressor().threadLocalInputStream(streamInput)); } catch (IllegalArgumentException e) { throw new IllegalStateException("stream marked as compressed, but is missing deflate header"); } diff --git a/server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider b/server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider new file mode 100644 index 0000000000000..8d93d45035f3f --- /dev/null +++ b/server/src/main/resources/META-INF/services/org.opensearch.core.compress.spi.CompressorProvider @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.common.compress.spi.ServerCompressorProvider diff --git a/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java b/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java index 8c7ab05addc4c..262a7ec40a8f0 100644 --- a/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java +++ b/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java @@ -32,17 +32,18 @@ package org.opensearch.common.compress; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; +import org.opensearch.test.core.compress.AbstractCompressorTestCase; /** * Test streaming compression (e.g. used for recovery) */ -public class DeflateCompressTests extends AbstractCompressorTests { +public class DeflateCompressTests extends AbstractCompressorTestCase { private final Compressor compressor = new DeflateCompressor(); @Override - Compressor compressor() { + protected Compressor compressor() { return compressor; } } diff --git a/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java b/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java index 4d295e17b16aa..5c65bd0c54272 100644 --- a/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java +++ b/server/src/test/java/org/opensearch/common/compress/DeflateCompressedXContentTests.java @@ -35,7 +35,7 @@ import org.apache.lucene.tests.util.TestUtil; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.io.stream.BytesStreamOutput; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.test.OpenSearchTestCase; import org.junit.Assert; diff --git a/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java b/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java index 40e7786e829c6..87b5ad3434944 100644 --- a/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java +++ b/server/src/test/java/org/opensearch/index/mapper/BinaryFieldMapperTests.java @@ -33,10 +33,10 @@ package org.opensearch.index.mapper; import org.apache.lucene.util.BytesRef; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.XContentBuilder; import java.io.IOException; @@ -119,11 +119,11 @@ public void testStoredValue() throws IOException { // case 2: a value that looks compressed: this used to fail in 1.x BytesStreamOutput out = new BytesStreamOutput(); - try (OutputStream compressed = CompressorFactory.defaultCompressor().threadLocalOutputStream(out)) { + try (OutputStream compressed = CompressorRegistry.defaultCompressor().threadLocalOutputStream(out)) { new BytesArray(binaryValue1).writeTo(compressed); } final byte[] binaryValue2 = BytesReference.toBytes(out.bytes()); - assertTrue(CompressorFactory.isCompressed(new BytesArray(binaryValue2))); + assertTrue(CompressorRegistry.isCompressed(new BytesArray(binaryValue2))); for (byte[] value : Arrays.asList(binaryValue1, binaryValue2)) { ParsedDocument doc = mapperService.documentMapper().parse(source(b -> b.field("field", value))); diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index 008d569e22b38..a3e99ef0cddc3 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -39,13 +39,12 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobStore; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.compress.Compressor; -import org.opensearch.common.compress.CompressorFactory; -import org.opensearch.common.compress.CompressorType; +import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; @@ -57,7 +56,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.util.Arrays; import java.util.Map; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; @@ -121,12 +119,12 @@ public void testBlobStoreOperations() throws IOException { ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); // Write blobs in different formats - checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", CompressorType.NONE.compressor()); + checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", CompressorRegistry.none()); checksumSMILE.write( new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", - CompressorFactory.DEFLATE_COMPRESSOR + CompressorRegistry.getCompressor(DeflateCompressor.NAME) ); // Assert that all checksum blobs can be read @@ -143,8 +141,8 @@ public void testCompressionIsApplied() throws IOException { } ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); BlobObj blobObj = new BlobObj(veryRedundantText.toString()); - checksumFormat.write(blobObj, blobContainer, "blob-comp", CompressorType.DEFLATE.compressor()); - checksumFormat.write(blobObj, blobContainer, "blob-not-comp", CompressorType.NONE.compressor()); + checksumFormat.write(blobObj, blobContainer, "blob-comp", CompressorRegistry.getCompressor(DeflateCompressor.NAME)); + checksumFormat.write(blobObj, blobContainer, "blob-not-comp", CompressorRegistry.none()); Map blobs = blobContainer.listBlobsByPrefix("blob-"); assertEquals(blobs.size(), 2); assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); @@ -156,12 +154,7 @@ public void testBlobCorruption() throws IOException { String testString = randomAlphaOfLength(randomInt(10000)); BlobObj blobObj = new BlobObj(testString); ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); - checksumFormat.write( - blobObj, - blobContainer, - "test-path", - randomFrom(Arrays.stream(CompressorType.values()).map(CompressorType::compressor).toArray(Compressor[]::new)) - ); + checksumFormat.write(blobObj, blobContainer, "test-path", randomFrom(CompressorRegistry.registeredCompressors().values())); assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(), testString); randomCorruption(blobContainer, "test-path"); try { diff --git a/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java b/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java index b9071d5851315..89018b7353e7c 100644 --- a/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java +++ b/server/src/test/java/org/opensearch/transport/CompressibleBytesOutputStreamTests.java @@ -32,12 +32,12 @@ package org.opensearch.transport; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.core.common.io.stream.BytesStream; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.InputStreamStreamInput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.test.OpenSearchTestCase; import java.io.EOFException; @@ -56,7 +56,7 @@ public void testStreamWithoutCompression() throws IOException { // Closing compression stream does not close underlying stream stream.close(); - assertFalse(CompressorFactory.defaultCompressor().isCompressed(bytesRef)); + assertFalse(CompressorRegistry.defaultCompressor().isCompressed(bytesRef)); StreamInput streamInput = bytesRef.streamInput(); byte[] actualBytes = new byte[expectedBytes.length]; @@ -83,10 +83,10 @@ public void testStreamWithCompression() throws IOException { BytesReference bytesRef = stream.materializeBytes(); stream.close(); - assertTrue(CompressorFactory.defaultCompressor().isCompressed(bytesRef)); + assertTrue(CompressorRegistry.defaultCompressor().isCompressed(bytesRef)); StreamInput streamInput = new InputStreamStreamInput( - CompressorFactory.defaultCompressor().threadLocalInputStream(bytesRef.streamInput()) + CompressorRegistry.defaultCompressor().threadLocalInputStream(bytesRef.streamInput()) ); byte[] actualBytes = new byte[expectedBytes.length]; streamInput.readBytes(actualBytes, 0, expectedBytes.length); @@ -110,7 +110,7 @@ public void testCompressionWithCallingMaterializeFails() throws IOException { stream.write(expectedBytes); StreamInput streamInput = new InputStreamStreamInput( - CompressorFactory.defaultCompressor().threadLocalInputStream(bStream.bytes().streamInput()) + CompressorRegistry.defaultCompressor().threadLocalInputStream(bStream.bytes().streamInput()) ); byte[] actualBytes = new byte[expectedBytes.length]; EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length)); diff --git a/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java b/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java index 35caedae00edb..4d92d99cd75b6 100644 --- a/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java +++ b/server/src/test/java/org/opensearch/transport/TransportDecompressorTests.java @@ -35,7 +35,6 @@ import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.bytes.CompositeBytesReference; import org.opensearch.common.bytes.ReleasableBytesReference; -import org.opensearch.common.compress.CompressorFactory; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.OutputStreamStreamOutput; @@ -43,6 +42,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.common.util.PageCacheRecycler; import org.opensearch.common.lease.Releasables; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -54,7 +54,7 @@ public void testSimpleCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { byte randomByte = randomByte(); try ( - OutputStream deflateStream = CompressorFactory.defaultCompressor() + OutputStream deflateStream = CompressorRegistry.defaultCompressor() .threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) { deflateStream.write(randomByte); @@ -77,7 +77,7 @@ public void testMultiPageCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { try ( StreamOutput deflateStream = new OutputStreamStreamOutput( - CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) + CompressorRegistry.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) ) { for (int i = 0; i < 10000; ++i) { @@ -109,7 +109,7 @@ public void testIncrementalMultiPageCompression() throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { try ( StreamOutput deflateStream = new OutputStreamStreamOutput( - CompressorFactory.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) + CompressorRegistry.defaultCompressor().threadLocalOutputStream(Streams.flushOnCloseStream(output)) ) ) { for (int i = 0; i < 10000; ++i) { diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java index 3b7a921381882..054e5850dde76 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java @@ -47,10 +47,10 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.common.compress.CompressorType; import org.opensearch.common.io.Streams; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -97,7 +97,7 @@ protected Settings repositorySettings() { final Settings.Builder builder = Settings.builder(); builder.put("compress", compress); if (compress) { - builder.put("compression_type", randomFrom(CompressorType.values())); + builder.put("compression_type", randomFrom(CompressorRegistry.registeredCompressors().keySet())); } return builder.build(); } diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 2ac1e707109a2..8f151da15fb22 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -52,7 +52,6 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.compress.CompressorType; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentFactory; @@ -60,6 +59,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.core.xcontent.DeprecationHandler; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; @@ -417,7 +417,7 @@ protected Settings.Builder randomRepositorySettings() { final boolean compress = randomBoolean(); settings.put("location", randomRepoPath()).put("compress", compress); if (compress) { - settings.put("compression_type", randomFrom(CompressorType.values())); + settings.put("compression_type", randomFrom(CompressorRegistry.registeredCompressors().keySet())); } if (rarely()) { settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES); diff --git a/server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java b/test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java similarity index 98% rename from server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java rename to test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java index a2a54f444ad9d..be53e46122157 100644 --- a/server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java +++ b/test/framework/src/main/java/org/opensearch/test/core/compress/AbstractCompressorTestCase.java @@ -6,11 +6,11 @@ * compatible open source license. */ -package org.opensearch.common.compress; +package org.opensearch.test.core.compress; import org.apache.lucene.tests.util.LineFileDocs; import org.apache.lucene.tests.util.TestUtil; -import org.opensearch.core.common.compress.Compressor; +import org.opensearch.core.compress.Compressor; import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; @@ -22,7 +22,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; -abstract class AbstractCompressorTests extends OpenSearchTestCase { +public abstract class AbstractCompressorTestCase extends OpenSearchTestCase { public void testRandom() throws IOException { Random r = random(); @@ -404,6 +404,6 @@ private void doTest(byte bytes[]) throws IOException { assertArrayEquals(bytes, uncompressedOut.toByteArray()); } - abstract Compressor compressor(); + protected abstract Compressor compressor(); }