Skip to content

Commit

Permalink
Incoporate PR review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Apr 16, 2024
1 parent 723ead9 commit ddb0e60
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 244 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.util.List;

/**
* Hook for running code that needs to be executed before the upload of index metadata. Here we have introduced a hook
* for index creation (also triggerred after enabling the remote cluster statement for the first time). The Interceptor
* is intended to be run in parallel and async with the index metadata upload.
*
* @opensearch.internal
*/
public interface IndexMetadataUploadInterceptor {

/**
* Intercepts the index metadata upload flow with input containing index metadata of new indexes (or first time upload).
* The caller is expected to trigger onSuccess or onFailure of the {@code ActionListener}.
*
* @param indexMetadataList list of index metadata of new indexes (or first time index metadata upload).
* @param actionListener listener to be invoked on success or failure.
*/
void interceptIndexCreation(List<IndexMetadata> indexMetadataList, ActionListener<Void> actionListener) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
Expand Down Expand Up @@ -160,7 +161,7 @@ public class RemoteClusterStateService implements Closeable {
private final Settings settings;
private final LongSupplier relativeTimeNanosSupplier;
private final ThreadPool threadpool;
private final IndexCreationPreIndexMetadataUploadListener indexCreationListener;
private final List<IndexMetadataUploadInterceptor> indexMetadataUploadInterceptors;
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue slowWriteLoggingThreshold;
Expand Down Expand Up @@ -191,7 +192,7 @@ public RemoteClusterStateService(
ClusterSettings clusterSettings,
LongSupplier relativeTimeNanosSupplier,
ThreadPool threadPool,
IndexCreationPreIndexMetadataUploadListener indexCreationListener
List<IndexMetadataUploadInterceptor> indexMetadataUploadInterceptors
) {
assert isRemoteStoreClusterStateEnabled(settings) : "Remote cluster state is not enabled";
this.nodeId = nodeId;
Expand All @@ -208,7 +209,7 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.indexCreationListener = indexCreationListener;
this.indexMetadataUploadInterceptors = indexMetadataUploadInterceptors;
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -236,11 +237,12 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState, Stri
// Write globalMetadata
String globalMetadataFile = writeGlobalMetadata(clusterState);

List<IndexMetadata> toUpload = new ArrayList<>(clusterState.metadata().indices().values());
// any validations before/after upload ?
final List<UploadedIndexMetadata> allUploadedIndexMetadata = writeIndexMetadataParallel(
clusterState,
new ArrayList<>(clusterState.metadata().indices().values()),
previousClusterUUID
toUpload,
ClusterState.UNKNOWN_UUID.equals(previousClusterUUID) ? toUpload : Collections.emptyList()
);
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
Expand Down Expand Up @@ -335,11 +337,11 @@ public ClusterMetadataManifest writeIncrementalMetadata(
previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName());
}

List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(
clusterState,
toUpload,
indexNamePreviousVersionMap
);
List<IndexMetadata> newIndexMetadataList = toUpload.stream()
/* If the previous state's index metadata version is null, then this is index creation */
.filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName())))
.collect(Collectors.toList());
List<UploadedIndexMetadata> uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList);
uploadedIndexMetadataList.forEach(
uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata)
);
Expand Down Expand Up @@ -452,13 +454,50 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
List<IndexMetadata> toUpload,
List<IndexMetadata> newIndexMetadataList
) throws IOException {
assert Objects.nonNull(indexCreationListener) : "indexCreationListener can not be null";
int latchCount = toUpload.size() + indexCreationListener.latchCount(newIndexMetadataList);
assert CollectionUtils.isEmpty(indexMetadataUploadInterceptors) == false : "indexMetadataUploadInterceptors can not be empty";
int latchCount = toUpload.size() + indexMetadataUploadInterceptors.size();
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(latchCount));
final CountDownLatch latch = new CountDownLatch(latchCount);
List<UploadedIndexMetadata> result = new ArrayList<>(toUpload.size());
uploadIndexMetadataAsync(clusterState, result, toUpload, latch, exceptionList);
indexCreationListener.run(newIndexMetadataList, latch, exceptionList);

LatchedActionListener<UploadedIndexMetadata> latchedActionListener = new LatchedActionListener<>(
ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> {
logger.trace(
String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName())
);
result.add(uploadedIndexMetadata);
}, ex -> {
assert ex instanceof RemoteStateTransferException;
logger.error(
() -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()),
ex
);
exceptionList.add(ex);
}),
latch
);

for (IndexMetadata indexMetadata : toUpload) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener);
}

for (IndexMetadataUploadInterceptor interceptor : indexMetadataUploadInterceptors) {
// We are submitting the task for async execution to ensure that we are not blocking the cluster state upload
String interceptorName = interceptor.getClass().getSimpleName();
threadpool.executor(ThreadPool.Names.GENERIC).execute(() -> {
try {
interceptor.interceptIndexCreation(
newIndexMetadataList,
getIndexMetadataUploadInterceptorListener(newIndexMetadataList, latch, exceptionList, interceptorName)
);
} catch (IOException e) {
exceptionList.add(

Check warning on line 495 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L494-L495

Added lines #L494 - L495 were not covered by tests
new RemoteStateTransferException("Exception occurred while running interceptIndexCreation in " + interceptorName, e)
);
}
});
}

try {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
Expand Down Expand Up @@ -499,58 +538,27 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(
return result;
}

private void uploadIndexMetadataAsync(
ClusterState clusterState,
List<UploadedIndexMetadata> result,
List<IndexMetadata> toUpload,
private ActionListener<Void> getIndexMetadataUploadInterceptorListener(
List<IndexMetadata> newIndexMetadataList,
CountDownLatch latch,
List<Exception> exceptionList
) throws IOException {
LatchedActionListener<UploadedIndexMetadata> indexMetadataLatchedActionListener = new LatchedActionListener<>(
ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> {
logger.trace(
String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName())
);
result.add(uploadedIndexMetadata);
}, ex -> {
assert ex instanceof RemoteStateTransferException;
logger.error(
() -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()),
ex
);
exceptionList.add(ex);
}),
List<Exception> exceptionList,
String interceptorName
) {
return new LatchedActionListener<>(
ActionListener.wrap(
ignored -> logger.trace(
new ParameterizedMessage("{} : Intercepted {} successfully", interceptorName, newIndexMetadataList)
),
ex -> {
logger.error(

Check warning on line 553 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L553

Added line #L553 was not covered by tests
new ParameterizedMessage("{} : Exception during interception of {}", interceptorName, newIndexMetadataList),
ex
);
exceptionList.add(ex);
}

Check warning on line 558 in server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java#L557-L558

Added lines #L557 - L558 were not covered by tests
),
latch
);

for (IndexMetadata indexMetadata : toUpload) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200
writeIndexMetadataAsync(clusterState, indexMetadata, indexMetadataLatchedActionListener);
}
}

private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
String previousClusterUUID
) throws IOException {
List<IndexMetadata> newIndexMetadaList = Collections.emptyList();
if (ClusterState.UNKNOWN_UUID.equals(previousClusterUUID)) {
newIndexMetadaList = toUpload;
}
return writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadaList);
}

private List<UploadedIndexMetadata> writeIndexMetadataParallel(
ClusterState clusterState,
List<IndexMetadata> toUpload,
Map<String, Long> indexNamePreviousVersionMap
) throws IOException {
List<IndexMetadata> newIndexMetadataList = toUpload.stream()
/* If the previous state's index metadata version is null, then this is index creation */
.filter(indexMetadata -> Objects.isNull(indexNamePreviousVersionMap.get(indexMetadata.getIndex().getName())))
.collect(Collectors.toList());
return writeIndexMetadataParallel(clusterState, toUpload, newIndexMetadataList);
}

/**
Expand Down Expand Up @@ -1177,7 +1185,7 @@ public void writeMetadataFailed() {
/**
* Exception for Remote state transfer.
*/
static class RemoteStateTransferException extends RuntimeException {
public static class RemoteStateTransferException extends RuntimeException {

public RemoteStateTransferException(String errorDesc) {
super(errorDesc);
Expand Down
Loading

0 comments on commit ddb0e60

Please sign in to comment.