diff --git a/server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java b/server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java deleted file mode 100644 index 8b8c0ca137079..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/remote/IndexCreationPreIndexMetadataUploadListener.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import org.opensearch.cluster.metadata.IndexMetadata; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -/** - * Hook for running code that needs to be executed before the upload of index metadata during index creation or - * after enabling the remote cluster statement for the first time. The listener is intended to be run in parallel and - * async with the index metadata upload. - * - * @opensearch.internal - */ -public interface IndexCreationPreIndexMetadataUploadListener { - - /** - * This returns the additional count that needs to be added in the latch present in {@link RemoteClusterStateService} - * which is used to achieve parallelism and async nature of uploads for index metadata upload. The same latch is used - * for running pre index metadata upload listener. - * - * @param newIndexMetadataList list of index metadata of new indexes (or first time index metadata upload). - * @return latch count to be added by the caller. - */ - int latchCount(List newIndexMetadataList); - - /** - * This will run the pre index metadata upload listener using the {@code newIndexMetadataList}, {@code latch} and - * {@code exceptionList}. This method must execute the operation in parallel and async to ensure that the cluster state - * upload time remains the same. - * - * @param newIndexMetadataList list of index metadata of new indexes (or first time index metadata upload). - * @param latch this is used for counting down once the unit of work per index is done. - * @param exceptionList exception if any during run will be added here and used by the caller. - */ - void run(List newIndexMetadataList, CountDownLatch latch, List exceptionList) throws IOException; -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadInterceptor.java b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadInterceptor.java new file mode 100644 index 0000000000000..709b61ef6efb2 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/IndexMetadataUploadInterceptor.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.core.action.ActionListener; + +import java.io.IOException; +import java.util.List; + +/** + * Hook for running code that needs to be executed before the upload of index metadata. Here we have introduced a hook + * for index creation (also triggerred after enabling the remote cluster statement for the first time). The Interceptor + * is intended to be run in parallel and async with the index metadata upload. + * + * @opensearch.internal + */ +public interface IndexMetadataUploadInterceptor { + + /** + * Intercepts the index metadata upload flow with input containing index metadata of new indexes (or first time upload). + * The caller is expected to trigger onSuccess or onFailure of the {@code ActionListener}. + * + * @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload). + * @param actionListener listener to be invoked on success or failure. + */ + void interceptIndexCreation(List indexMetadataList, ActionListener actionListener) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index c7790ab965ca1..dbd2754cd3a1d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -27,6 +27,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; @@ -160,7 +161,7 @@ public class RemoteClusterStateService implements Closeable { private final Settings settings; private final LongSupplier relativeTimeNanosSupplier; private final ThreadPool threadpool; - private final IndexCreationPreIndexMetadataUploadListener indexCreationListener; + private final List indexMetadataUploadInterceptors; private BlobStoreRepository blobStoreRepository; private BlobStoreTransferService blobStoreTransferService; private volatile TimeValue slowWriteLoggingThreshold; @@ -191,7 +192,7 @@ public RemoteClusterStateService( ClusterSettings clusterSettings, LongSupplier relativeTimeNanosSupplier, ThreadPool threadPool, - IndexCreationPreIndexMetadataUploadListener indexCreationListener + List indexMetadataUploadInterceptors ) { assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled"; this.nodeId = nodeId; @@ -208,7 +209,7 @@ public RemoteClusterStateService( clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); this.remoteStateStats = new RemotePersistenceStats(); - this.indexCreationListener = indexCreationListener; + this.indexMetadataUploadInterceptors = indexMetadataUploadInterceptors; } private BlobStoreTransferService getBlobStoreTransferService() { @@ -236,11 +237,12 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri // Write globalMetadata String globalMetadataFile = writeGlobalMetadata(clusterState); + List toUpload = new ArrayList<>(clusterState.metadata().indices().values()); // any validations before/after upload ? final List allUploadedIndexMetadata = writeIndexMetadataParallel( clusterState, - new ArrayList<>(clusterState.metadata().indices().values()), - previousClusterUUID + toUpload, + ClusterState.UNKNOWN_UUID.equals(previousClusterUUID) ? toUpload : Collections.emptyList() ); final ClusterMetadataManifest manifest = uploadManifest( clusterState, @@ -335,11 +337,11 @@ public ClusterMetadataManifest writeIncrementalMetadata( previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); } - List uploadedIndexMetadataList = writeIndexMetadataParallel( - clusterState, - toUpload, - indexNamePreviousVersionMap - ); + List newIndexMetadataList = toUpload.stream() + /* If the previous state's index metadata version is null, then this is index creation */ + .filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName()))) + .collect(Collectors.toList()); + List uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList); uploadedIndexMetadataList.forEach( uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata) ); @@ -452,13 +454,50 @@ private List writeIndexMetadataParallel( List toUpload, List newIndexMetadataList ) throws IOException { - assert Objects.nonNull(indexCreationListener) : "indexCreationListener can not be null"; - int latchCount = toUpload.size() + indexCreationListener.latchCount(newIndexMetadataList); + assert CollectionUtils.isEmpty(indexMetadataUploadInterceptors) == false : "indexMetadataUploadInterceptors can not be empty"; + int latchCount = toUpload.size() + indexMetadataUploadInterceptors.size(); List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); final CountDownLatch latch = new CountDownLatch(latchCount); List result = new ArrayList<>(toUpload.size()); - uploadIndexMetadataAsync(clusterState, result, toUpload, latch, exceptionList); - indexCreationListener.run(newIndexMetadataList, latch, exceptionList); + + LatchedActionListener latchedActionListener = new LatchedActionListener<>( + ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> { + logger.trace( + String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName()) + ); + result.add(uploadedIndexMetadata); + }, ex -> { + assert ex instanceof RemoteStateTransferException; + logger.error( + () -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()), + ex + ); + exceptionList.add(ex); + }), + latch + ); + + for (IndexMetadata indexMetadata : toUpload) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 + writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener); + } + + for (IndexMetadataUploadInterceptor interceptor : indexMetadataUploadInterceptors) { + // We are submitting the task for async execution to ensure that we are not blocking the cluster state upload + String interceptorName = interceptor.getClass().getSimpleName(); + threadpool.executor(ThreadPool.Names.GENERIC).execute(() -> { + try { + interceptor.interceptIndexCreation( + newIndexMetadataList, + getIndexMetadataUploadInterceptorListener(newIndexMetadataList, latch, exceptionList, interceptorName) + ); + } catch (IOException e) { + exceptionList.add( + new RemoteStateTransferException("Exception occurred while running interceptIndexCreation in " + interceptorName, e) + ); + } + }); + } try { if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { @@ -499,58 +538,27 @@ private List writeIndexMetadataParallel( return result; } - private void uploadIndexMetadataAsync( - ClusterState clusterState, - List result, - List toUpload, + private ActionListener getIndexMetadataUploadInterceptorListener( + List newIndexMetadataList, CountDownLatch latch, - List exceptionList - ) throws IOException { - LatchedActionListener indexMetadataLatchedActionListener = new LatchedActionListener<>( - ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> { - logger.trace( - String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName()) - ); - result.add(uploadedIndexMetadata); - }, ex -> { - assert ex instanceof RemoteStateTransferException; - logger.error( - () -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()), - ex - ); - exceptionList.add(ex); - }), + List exceptionList, + String interceptorName + ) { + return new LatchedActionListener<>( + ActionListener.wrap( + ignored -> logger.trace( + new ParameterizedMessage("{} : Intercepted {} successfully", interceptorName, newIndexMetadataList) + ), + ex -> { + logger.error( + new ParameterizedMessage("{} : Exception during interception of {}", interceptorName, newIndexMetadataList), + ex + ); + exceptionList.add(ex); + } + ), latch ); - - for (IndexMetadata indexMetadata : toUpload) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 - writeIndexMetadataAsync(clusterState, indexMetadata, indexMetadataLatchedActionListener); - } - } - - private List writeIndexMetadataParallel( - ClusterState clusterState, - List toUpload, - String previousClusterUUID - ) throws IOException { - List newIndexMetadaList = Collections.emptyList(); - if (ClusterState.UNKNOWN_UUID.equals(previousClusterUUID)) { - newIndexMetadaList = toUpload; - } - return writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadaList); - } - - private List writeIndexMetadataParallel( - ClusterState clusterState, - List toUpload, - Map indexNamePreviousVersionMap - ) throws IOException { - List newIndexMetadataList = toUpload.stream() - /* If the previous state's index metadata version is null, then this is index creation */ - .filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName()))) - .collect(Collectors.toList()); - return writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList); } /** @@ -1177,7 +1185,7 @@ public void writeMetadataFailed() { /** * Exception for Remote state transfer. */ - static class RemoteStateTransferException extends RuntimeException { + public static class RemoteStateTransferException extends RuntimeException { public RemoteStateTransferException(String errorDesc) { super(errorDesc); diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java similarity index 72% rename from server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java rename to server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java index 29868b1e0ecc5..2d0fc8406f783 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteUploadPathIndexCreationListener.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteIndexPathUploader.java @@ -15,11 +15,14 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; -import org.opensearch.gateway.remote.IndexCreationPreIndexMetadataUploadListener; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.IndexMetadataUploadInterceptor; import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.RemoteClusterStateService.RemoteStateTransferException; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; @@ -28,14 +31,19 @@ import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING; import static org.opensearch.index.remote.RemoteIndexPath.SEGMENT_PATH; import static org.opensearch.index.remote.RemoteIndexPath.TRANSLOG_PATH; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; @@ -45,7 +53,7 @@ * Uploads the remote store path for all possible combinations of {@link org.opensearch.index.remote.RemoteStoreEnums.DataCategory} * and {@link org.opensearch.index.remote.RemoteStoreEnums.DataType} for each shard of an index. */ -public class RemoteUploadPathIndexCreationListener implements IndexCreationPreIndexMetadataUploadListener { +public class RemoteIndexPathUploader implements IndexMetadataUploadInterceptor { public static final ChecksumBlobStoreFormat REMOTE_INDEX_PATH_FORMAT = new ChecksumBlobStoreFormat<>( "remote-index-path", @@ -53,51 +61,71 @@ public class RemoteUploadPathIndexCreationListener implements IndexCreationPreIn RemoteIndexPath::fromXContent ); - private static final Logger logger = LogManager.getLogger(RemoteUploadPathIndexCreationListener.class); + private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s"; + private static final String UPLOAD_EXCEPTION_MSG = "Exception occurred while uploading remote index paths for indexes=%s"; + + private static final Logger logger = LogManager.getLogger(RemoteIndexPathUploader.class); private final Settings settings; private final boolean isRemoteDataAttributePresent; private final boolean isTranslogSegmentRepoSame; private final Supplier repositoriesService; + private volatile TimeValue indexMetadataUploadTimeout; private BlobStoreRepository translogRepository; private BlobStoreRepository segmentRepository; - public RemoteUploadPathIndexCreationListener(Settings settings, Supplier repositoriesService) { + public RemoteIndexPathUploader(Settings settings, Supplier repositoriesService, ClusterSettings clusterSettings) { this.settings = settings; this.repositoriesService = repositoriesService; isRemoteDataAttributePresent = isRemoteDataAttributePresent(settings); // If the remote data attributes are not present, then there is no effect of translog and segment being same or different or null. isTranslogSegmentRepoSame = isTranslogSegmentRepoSame(); + indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); + clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); } @Override - public int latchCount(List newIndexMetadataList) { + public void interceptIndexCreation(List indexMetadataList, ActionListener actionListener) throws IOException { if (isRemoteDataAttributePresent == false) { - return 0; + actionListener.onResponse(null); + return; } - int eligibleIndexCount = (int) newIndexMetadataList.stream().filter(this::uploadIndexPathFile).count(); - return isTranslogSegmentRepoSame ? eligibleIndexCount : 2 * eligibleIndexCount; - } - @Override - public void run(List newIndexMetadataList, CountDownLatch latch, List exceptionList) throws IOException { - if (isRemoteDataAttributePresent == false) { - return; + List eligibleList = indexMetadataList.stream().filter(this::requiresPathUpload).collect(Collectors.toList()); + int latchCount = eligibleList.size() * (isTranslogSegmentRepoSame ? 1 : 2); + CountDownLatch latch = new CountDownLatch(latchCount); + List exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount)); + for (IndexMetadata indexMetadata : eligibleList) { + writeIndexPathAsync(indexMetadata, latch, exceptionList); } - List elibibleIndexMetadaList = newIndexMetadataList.stream() - .filter(this::uploadIndexPathFile) - .collect(Collectors.toList()); - if (isTranslogSegmentRepoSame) { - assert latchCount(newIndexMetadataList) == elibibleIndexMetadaList.size() - : "Latch count is not equal to elibibleIndexMetadaList's size for path upload"; - } else { - assert latchCount(newIndexMetadataList) == 2 * elibibleIndexMetadaList.size() - : "Latch count is not equal to (2 * elibibleIndexMetadaList's size) for path upload"; + String indexNames = eligibleList.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining(",")); + + try { + if (latch.await(indexMetadataUploadTimeout.millis(), TimeUnit.MILLISECONDS) == false) { + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, TIMEOUT_EXCEPTION_MSG, indexNames) + ); + exceptionList.forEach(ex::addSuppressed); + actionListener.onFailure(ex); + return; + } + } catch (InterruptedException exception) { + exceptionList.forEach(exception::addSuppressed); + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, TIMEOUT_EXCEPTION_MSG, indexNames), + exception + ); + actionListener.onFailure(ex); } - for (IndexMetadata indexMetadata : elibibleIndexMetadaList) { - writeIndexPathAsync(indexMetadata, latch, exceptionList); + if (exceptionList.size() > 0) { + RemoteStateTransferException ex = new RemoteStateTransferException( + String.format(Locale.ROOT, UPLOAD_EXCEPTION_MSG, indexNames) + ); + exceptionList.forEach(ex::addSuppressed); + actionListener.onFailure(ex); } + actionListener.onResponse(null); } private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List exceptionList) throws IOException { @@ -122,9 +150,7 @@ private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List indexUUID, translogRepository.getCompressor(), getUploadPathLatchedActionListener(idxMD, latch, exceptionList, pathCreationMap), - RemoteClusterStateService.FORMAT_PARAMS, - true, - XContentType.JSON + RemoteClusterStateService.FORMAT_PARAMS ); } else { // If the repositories are different, then we need to upload one file per segment and translog containing their individual @@ -135,9 +161,7 @@ private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List indexUUID, translogRepository.getCompressor(), getUploadPathLatchedActionListener(idxMD, latch, exceptionList, TRANSLOG_PATH), - RemoteClusterStateService.FORMAT_PARAMS, - true, - XContentType.JSON + RemoteClusterStateService.FORMAT_PARAMS ); BlobPath segmentBasePath = segmentRepository.basePath(); @@ -148,9 +172,7 @@ private void writeIndexPathAsync(IndexMetadata idxMD, CountDownLatch latch, List indexUUID, segmentRepository.getCompressor(), getUploadPathLatchedActionListener(idxMD, latch, exceptionList, SEGMENT_PATH), - RemoteClusterStateService.FORMAT_PARAMS, - true, - XContentType.JSON + RemoteClusterStateService.FORMAT_PARAMS ); } } @@ -218,7 +240,7 @@ private LatchedActionListener getUploadPathLatchedActionListener( * This method checks if the index metadata has attributes that calls for uploading the index path for remote store * uploads. It checks if the remote store path type is {@code HASHED_PREFIX} and returns true if so. */ - private boolean uploadIndexPathFile(IndexMetadata indexMetadata) { + private boolean requiresPathUpload(IndexMetadata indexMetadata) { // A cluster will have remote custom metadata only if the cluster is remote store enabled from data side. Map remoteCustomData = indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY); if (Objects.isNull(remoteCustomData) || remoteCustomData.isEmpty()) { @@ -231,4 +253,8 @@ private boolean uploadIndexPathFile(IndexMetadata indexMetadata) { // We need to upload the path only if the path type for an index is hashed_prefix return RemoteStoreEnums.PathType.HASHED_PREFIX == RemoteStoreEnums.PathType.parseString(pathTypeStr); } + + private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) { + this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout; + } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 028c495523591..abba602c9b102 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -146,8 +146,8 @@ import org.opensearch.index.analysis.AnalysisRegistry; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.recovery.RemoteStoreRestoreService; +import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; -import org.opensearch.index.remote.RemoteUploadPathIndexCreationListener; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; @@ -727,9 +727,13 @@ protected Node( threadPool::relativeTimeInMillis ); final RemoteClusterStateService remoteClusterStateService; - final RemoteUploadPathIndexCreationListener indexCreationListener; + final RemoteIndexPathUploader remoteIndexPathUploader; if (isRemoteStoreClusterStateEnabled(settings)) { - indexCreationListener = new RemoteUploadPathIndexCreationListener(settings, repositoriesServiceReference::get); + remoteIndexPathUploader = new RemoteIndexPathUploader( + settings, + repositoriesServiceReference::get, + clusterService.getClusterSettings() + ); remoteClusterStateService = new RemoteClusterStateService( nodeEnvironment.nodeId(), repositoriesServiceReference::get, @@ -737,11 +741,11 @@ protected Node( clusterService.getClusterSettings(), threadPool::preciseRelativeTimeInNanos, threadPool, - indexCreationListener + List.of(remoteIndexPathUploader) ); } else { remoteClusterStateService = null; - indexCreationListener = null; + remoteIndexPathUploader = null; } // collect engine factory providers from plugins @@ -1317,7 +1321,7 @@ protected Node( b.bind(SearchRequestSlowLog.class).toInstance(searchRequestSlowLog); b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); - b.bind(RemoteUploadPathIndexCreationListener.class).toProvider(() -> indexCreationListener); + b.bind(RemoteIndexPathUploader.class).toProvider(() -> remoteIndexPathUploader); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory); @@ -1467,9 +1471,7 @@ public Node start() throws NodeValidationException { if (remoteClusterStateService != null) { remoteClusterStateService.start(); } - final RemoteUploadPathIndexCreationListener indexCreationListener = injector.getInstance( - RemoteUploadPathIndexCreationListener.class - ); + final RemoteIndexPathUploader indexCreationListener = injector.getInstance(RemoteIndexPathUploader.class); if (indexCreationListener != null) { indexCreationListener.start(); } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 1be096dd92577..3e6052a5ef820 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -57,7 +57,6 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.compress.Compressor; -import org.opensearch.core.xcontent.MediaType; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; @@ -191,32 +190,9 @@ public void write( final String name, final Compressor compressor, final ToXContent.Params params - ) throws IOException { - write(obj, blobContainer, name, compressor, params, false, XContentType.SMILE); - } - - /** - * Writes blob with resolving the blob name using {@link #blobName} method. - *

- * The blob will optionally by compressed. - * - * @param obj object to be serialized - * @param blobContainer blob container - * @param name blob name - * @param compressor whether to use compression - * @param params ToXContent params - */ - public void write( - final T obj, - final BlobContainer blobContainer, - final String name, - final Compressor compressor, - final ToXContent.Params params, - boolean skipHeaderFooter, - MediaType mediaType ) throws IOException { final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor, params, skipHeaderFooter, mediaType); + final BytesReference bytes = serialize(obj, blobName, compressor, params); blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } @@ -232,17 +208,7 @@ public void writeAsync( final ToXContent.Params params ) throws IOException { // use NORMAL priority by default - this.writeAsyncWithPriority( - obj, - blobContainer, - name, - compressor, - WritePriority.NORMAL, - listener, - params, - false, - XContentType.SMILE - ); + this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.NORMAL, listener, params); } /** @@ -260,30 +226,7 @@ public void writeAsyncWithUrgentPriority( ActionListener listener, final ToXContent.Params params ) throws IOException { - this.writeAsyncWithPriority( - obj, - blobContainer, - name, - compressor, - WritePriority.URGENT, - listener, - params, - false, - XContentType.SMILE - ); - } - - public void writeAsyncWithUrgentPriority( - final T obj, - final BlobContainer blobContainer, - final String name, - final Compressor compressor, - ActionListener listener, - final ToXContent.Params params, - boolean skipHeaderFooter, - MediaType type - ) throws IOException { - this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.URGENT, listener, params, skipHeaderFooter, type); + this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.URGENT, listener, params); } /** @@ -305,17 +248,15 @@ private void writeAsyncWithPriority( final Compressor compressor, final WritePriority priority, ActionListener listener, - final ToXContent.Params params, - boolean skipHeaderFooter, - MediaType mediaType + final ToXContent.Params params ) throws IOException { if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { - write(obj, blobContainer, name, compressor, params, skipHeaderFooter, mediaType); + write(obj, blobContainer, name, compressor, params); listener.onResponse(null); return; } final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compressor, params, skipHeaderFooter, mediaType); + final BytesReference bytes = serialize(obj, blobName, compressor, params); final String resourceDescription = "ChecksumBlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")"; try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { long expectedChecksum; @@ -349,17 +290,6 @@ private void writeAsyncWithPriority( public BytesReference serialize(final T obj, final String blobName, final Compressor compressor, final ToXContent.Params params) throws IOException { - return serialize(obj, blobName, compressor, params, false, XContentType.SMILE); - } - - public BytesReference serialize( - final T obj, - final String blobName, - final Compressor compressor, - final ToXContent.Params params, - boolean skipHeaderFooter, - MediaType type - ) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( @@ -369,9 +299,7 @@ public BytesReference serialize( BUFFER_SIZE ) ) { - if (skipHeaderFooter == false) { - CodecUtil.writeHeader(indexOutput, codec, VERSION); - } + CodecUtil.writeHeader(indexOutput, codec, VERSION); try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) { @Override public void close() throws IOException { @@ -380,7 +308,7 @@ public void close() throws IOException { } }; XContentBuilder builder = MediaTypeRegistry.contentBuilder( - type, + XContentType.SMILE, compressor.threadLocalOutputStream(indexOutputOutputStream) ) ) { @@ -388,9 +316,7 @@ public void close() throws IOException { obj.toXContent(builder, params); builder.endObject(); } - if (skipHeaderFooter == false) { - CodecUtil.writeFooter(indexOutput); - } + CodecUtil.writeFooter(indexOutput); } return outputStream.bytes(); } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 6a831f636df6f..012d2623b45df 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -71,7 +71,7 @@ import org.opensearch.gateway.remote.RemotePersistenceStats; import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult; -import org.opensearch.index.remote.RemoteUploadPathIndexCreationListener; +import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.node.Node; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.fs.FsRepository; @@ -482,14 +482,15 @@ public void testDataOnlyNodePersistence() throws Exception { Collections.emptyMap(), transportService.getThreadPool() ); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); return new RemoteClusterStateService( nodeEnvironment.nodeId(), repositoriesServiceSupplier, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + clusterSettings, () -> 0L, threadPool, - new RemoteUploadPathIndexCreationListener(settings, repositoriesServiceSupplier) + List.of(new RemoteIndexPathUploader(settings, repositoriesServiceSupplier, clusterSettings)) ); } else { return null; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index e760b915ed260..9ddbce3f794e6 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -38,8 +38,8 @@ import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; +import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.remote.RemoteUploadPathIndexCreationListener; import org.opensearch.indices.IndicesModule; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; @@ -156,7 +156,7 @@ public void setup() { clusterSettings, () -> 0L, threadPool, - new RemoteUploadPathIndexCreationListener(settings, repositoriesServiceSupplier) + List.of(new RemoteIndexPathUploader(settings, repositoriesServiceSupplier, clusterSettings)) ); } @@ -175,16 +175,17 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException public void testFailInitializationWhenRemoteStateDisabled() { final Settings settings = Settings.builder().build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); assertThrows( AssertionError.class, () -> new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, settings, - new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + clusterSettings, () -> 0L, threadPool, - new RemoteUploadPathIndexCreationListener(settings, repositoriesServiceSupplier) + List.of(new RemoteIndexPathUploader(settings, repositoriesServiceSupplier, clusterSettings)) ) ); }