Skip to content

Commit

Permalink
[Remote Cluster State] Remote state interfaces (opensearch-project#13785
Browse files Browse the repository at this point in the history
)

* Remote Writable Entity interfaces

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha authored and wangdongyu.danny committed Aug 22, 2024
1 parent 7738703 commit de3fcac
Show file tree
Hide file tree
Showing 8 changed files with 340 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER;

/**
* An extension of {@link RemoteWriteableEntity} class which caters to the use case of writing to and reading from a blob storage
*
* @param <T> The class type which can be uploaded to or downloaded from a blob storage.
*/
public abstract class AbstractRemoteWritableBlobEntity<T> implements RemoteWriteableEntity<T> {

protected String blobFileName;

protected String blobName;
private final String clusterUUID;
private final Compressor compressor;
private final NamedXContentRegistry namedXContentRegistry;
private String[] pathTokens;

public AbstractRemoteWritableBlobEntity(
final String clusterUUID,
final Compressor compressor,
final NamedXContentRegistry namedXContentRegistry
) {
this.clusterUUID = clusterUUID;
this.compressor = compressor;
this.namedXContentRegistry = namedXContentRegistry;
}

public abstract BlobPathParameters getBlobPathParameters();

public String getFullBlobName() {
return blobName;
}

public String getBlobFileName() {
if (blobFileName == null) {
String[] pathTokens = getBlobPathTokens();
if (pathTokens == null || pathTokens.length < 1) {
return null;
}
blobFileName = pathTokens[pathTokens.length - 1];
}
return blobFileName;
}

public String[] getBlobPathTokens() {
if (pathTokens != null) {
return pathTokens;
}
if (blobName == null) {
return null;
}
pathTokens = blobName.split(PATH_DELIMITER);
return pathTokens;
}

public abstract String generateBlobFileName();

public String clusterUUID() {
return clusterUUID;
}

public abstract UploadedMetadata getUploadedMetadata();

public void setFullBlobName(BlobPath blobPath) {
this.blobName = blobPath.buildAsString() + blobFileName;
}

public NamedXContentRegistry getNamedXContentRegistry() {
return namedXContentRegistry;
}

protected Compressor getCompressor() {
return compressor;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import java.util.List;

/**
* Parameters which can be used to construct a blob path
*
*/
public class BlobPathParameters {

private final List<String> pathTokens;
private final String filePrefix;

public BlobPathParameters(final List<String> pathTokens, final String filePrefix) {
this.pathTokens = pathTokens;
this.filePrefix = filePrefix;
}

public List<String> getPathTokens() {
return pathTokens;
}

public String getFilePrefix() {
return filePrefix;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import org.opensearch.core.action.ActionListener;

import java.io.IOException;

/**
* An interface to read/write an object from/to a remote storage. This interface is agnostic of the remote storage type.
*
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
* @param <U> The wrapper entity which provides methods for serializing/deserializing entity T.
*/
public interface RemoteWritableEntityStore<T, U extends RemoteWriteableEntity<T>> {

public void writeAsync(U entity, ActionListener<Void> listener);

public T read(U entity) throws IOException;

public void readAsync(U entity, ActionListener<T> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.remote;

import java.io.IOException;
import java.io.InputStream;

/**
* An interface to which provides defines the serialization/deserialization methods for objects to be uploaded to or downloaded from remote store.
* This interface is agnostic of the remote storage type.
*
* @param <T> The object type which can be uploaded to or downloaded from remote storage.
*/
public interface RemoteWriteableEntity<T> {
/**
* @return An InputStream created by serializing the entity T
* @throws IOException Exception encountered while serialization
*/
public InputStream serialize() throws IOException;

/**
* @param inputStream The InputStream which is used to read the serialized entity
* @return The entity T after deserialization
* @throws IOException Exception encountered while deserialization
*/
public T deserialize(InputStream inputStream) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/**
* Common remote store package
*/
package org.opensearch.common.remote;
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import java.nio.charset.StandardCharsets;
import java.util.Base64;

/**
* Utility class for Remote Cluster State
*/
public class RemoteClusterStateUtils {
public static final String PATH_DELIMITER = "/";

public static String encodeString(String content) {
return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote.model;

import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.common.remote.RemoteWriteableEntity;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;

/**
* Abstract class for a blob type storage
*
* @param <T> The entity which can be uploaded to / downloaded from blob store
* @param <U> The concrete class implementing {@link RemoteWriteableEntity} which is used as a wrapper for T entity.
*/
public class RemoteClusterStateBlobStore<T, U extends AbstractRemoteWritableBlobEntity<T>> implements RemoteWritableEntityStore<T, U> {

private final BlobStoreTransferService transferService;
private final BlobStoreRepository blobStoreRepository;
private final String clusterName;
private final ExecutorService executorService;

public RemoteClusterStateBlobStore(
final BlobStoreTransferService blobStoreTransferService,
final BlobStoreRepository blobStoreRepository,
final String clusterName,
final ThreadPool threadPool,
final String executor
) {
this.transferService = blobStoreTransferService;
this.blobStoreRepository = blobStoreRepository;
this.clusterName = clusterName;
this.executorService = threadPool.executor(executor);
}

@Override
public void writeAsync(final U entity, final ActionListener<Void> listener) {
try {
try (InputStream inputStream = entity.serialize()) {
BlobPath blobPath = getBlobPathForUpload(entity);
entity.setFullBlobName(blobPath);
// TODO uncomment below logic after merging PR https://github.com/opensearch-project/OpenSearch/pull/13836
// transferService.uploadBlob(inputStream, getBlobPathForUpload(entity), entity.getBlobFileName(), WritePriority.URGENT,
// listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
}

public T read(final U entity) throws IOException {
// TODO Add timing logs and tracing
assert entity.getFullBlobName() != null;
return entity.deserialize(transferService.downloadBlob(getBlobPathForDownload(entity), entity.getBlobFileName()));
}

@Override
public void readAsync(final U entity, final ActionListener<T> listener) {
executorService.execute(() -> {
try {
listener.onResponse(read(entity));
} catch (Exception e) {
listener.onFailure(e);
}
});
}

private BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<T> obj) {
BlobPath blobPath = blobStoreRepository.basePath()
.add(RemoteClusterStateUtils.encodeString(clusterName))
.add("cluster-state")
.add(obj.clusterUUID());
for (String token : obj.getBlobPathParameters().getPathTokens()) {
blobPath = blobPath.add(token);
}
return blobPath;
}

private BlobPath getBlobPathForDownload(final AbstractRemoteWritableBlobEntity<T> obj) {
String[] pathTokens = obj.getBlobPathTokens();
BlobPath blobPath = new BlobPath();
if (pathTokens == null || pathTokens.length < 1) {
return blobPath;
}
// Iterate till second last path token to get the blob folder
for (int i = 0; i < pathTokens.length - 1; i++) {
blobPath = blobPath.add(pathTokens[i]);
}
return blobPath;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Package containing models for remote cluster state
*/
package org.opensearch.gateway.remote.model;

0 comments on commit de3fcac

Please sign in to comment.