Skip to content

Commit

Permalink
Fix test files
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed Jul 9, 2024
1 parent c30e574 commit 856fb51
Show file tree
Hide file tree
Showing 13 changed files with 442 additions and 348 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
Expand All @@ -25,7 +24,6 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
Expand Down Expand Up @@ -56,17 +54,13 @@
*/
public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";

private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private final Compressor compressor;
private final RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteWritableEntityStore;
private final RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
private BlobStoreRepository blobStoreRepository;
private ThreadPool threadPool;
private final ThreadPool threadPool;

public InternalRemoteRoutingTableService(
Supplier<RepositoriesService> repositoriesService,
Expand All @@ -83,7 +77,7 @@ public InternalRemoteRoutingTableService(
this.settings = settings;
this.threadPool = threadpool;
this.compressor = compressor;
this.remoteWritableEntityStore = new RemoteRoutingTableBlobStore<>(
this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
blobStoreTransferService,
blobStoreRepository,
clusterName,
Expand Down Expand Up @@ -117,27 +111,23 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting

/**
* Create async action for writing one {@code IndexRoutingTable} to remote store
* @param clusterState current cluster state
* @param term current term
* @param version current version
* @param clusterUUID current cluster UUID
* @param indexRouting indexRoutingTable to write to remote store
* @param latchedActionListener listener for handling async action response
* @return returns runnable async action
*/
@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
ClusterState clusterState,
String clusterUUID,
long term,
long version,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {

RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(
indexRouting,
clusterUUID,
compressor,
clusterState.term(),
clusterState.version()
);
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(indexRouting, clusterUUID, compressor, term, version);

ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(remoteIndexRoutingTable.getUploadedMetadata()),
Expand All @@ -146,7 +136,7 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
)
);

return () -> remoteWritableEntityStore.writeAsync(remoteIndexRoutingTable, completionListener);
return () -> remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener);
}

/**
Expand Down Expand Up @@ -177,7 +167,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {

Expand All @@ -186,9 +175,9 @@ public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
latchedActionListener::onFailure
);

RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, index, clusterUUID, compressor);
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);

return () -> remoteWritableEntityStore.readAsync(remoteIndexRoutingTable, actionListener);
return () -> remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
package org.opensearch.cluster.routing.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.IOException;
Expand All @@ -42,8 +40,9 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting

@Override
public CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
ClusterState clusterState,
String clusterUUID,
long term,
long version,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
Expand All @@ -65,7 +64,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
public CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
) {
// noop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@
package org.opensearch.cluster.routing.remote;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.lifecycle.LifecycleComponent;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.IOException;
Expand All @@ -30,8 +28,8 @@
* @opensearch.internal
*/
public interface RemoteRoutingTableService extends LifecycleComponent {
public static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER =
new DiffableUtils.NonDiffableValueSerializer<>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
Expand All @@ -48,7 +46,6 @@ public IndexRoutingTable read(StreamInput in, String key) throws IOException {
CheckedRunnable<IOException> getAsyncIndexRoutingReadAction(
String clusterUUID,
String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener
);

Expand All @@ -63,8 +60,9 @@ DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>>
);

CheckedRunnable<IOException> getAsyncIndexRoutingWriteAction(
ClusterState clusterState,
String clusterUUID,
long term,
long version,
IndexRoutingTable indexRouting,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public BlobPathParameters(final List<String> pathTokens, final String filePrefix
this.filePrefix = filePrefix;
}

public BlobPathParameters(final List<String> pathTokens) {
this(pathTokens, null);
}

public List<String> getPathTokens() {
return pathTokens;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.cluster.node.DiscoveryNodes.Builder;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -43,7 +42,6 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
Expand Down Expand Up @@ -106,6 +104,8 @@
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -656,10 +656,11 @@ private UploadedMetadataResults writeMetadataInParallel(
});
indicesRoutingToUpload.forEach(indexRoutingTable -> {
uploadTasks.put(
InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX + indexRoutingTable.getIndex().getName(),
String.join(CUSTOM_DELIMITER, INDEX_ROUTING_TABLE, indexRoutingTable.getIndex().getName()),
remoteRoutingTableService.getAsyncIndexRoutingWriteAction(
clusterState,
clusterState.metadata().clusterUUID(),
clusterState.term(),
clusterState.version(),
indexRoutingTable,
listener
)
Expand Down Expand Up @@ -712,7 +713,7 @@ private UploadedMetadataResults writeMetadataInParallel(
UploadedMetadataResults response = new UploadedMetadataResults();
results.forEach((name, uploadedMetadata) -> {
if (uploadedMetadata.getClass().equals(UploadedIndexMetadata.class)
&& uploadedMetadata.getComponent().contains(InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX)) {
&& uploadedMetadata.getComponent().contains(INDEX_ROUTING_TABLE_PREFIX)) {
response.uploadedIndicesRoutingMetadata.add((UploadedIndexMetadata) uploadedMetadata);
} else if (name.startsWith(CUSTOM_METADATA)) {
// component name for custom metadata will look like custom--<metadata-attribute>
Expand Down Expand Up @@ -1037,7 +1038,6 @@ private ClusterState readClusterStateInParallel(
remoteRoutingTableService.getAsyncIndexRoutingReadAction(
clusterUUID,
indexRouting.getUploadedFilename(),
new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()),
routingTableLatchedActionListener
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_PATH_TOKEN;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;

/**
* Extends the RemoteClusterStateBlobStore to support {@link RemoteIndexRoutingTable}
Expand Down Expand Up @@ -60,7 +56,6 @@ public class RemoteRoutingTableBlobStore<IndexRoutingTable, U extends AbstractRe
Setting.Property.Dynamic
);

public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
private RemoteStoreEnums.PathType pathType;
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;

Expand All @@ -85,7 +80,7 @@ public BlobPath getBlobPathForUpload(final AbstractRemoteWritableBlobEntity<Inde
BlobPath indexRoutingPath = getBasePath().add(RemoteClusterStateUtils.encodeString(getClusterName()))
.add("cluster-state")
.add(obj.clusterUUID())
.add(INDEX_ROUTING_PATH_TOKEN);
.add(INDEX_ROUTING_TABLE);
BlobPath path = pathType.path(
RemoteStorePathStrategy.PathInput.builder()
.basePath(indexRoutingPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@

package org.opensearch.gateway.remote.routingtable;


import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.IndexOutputOutputStream;
import org.opensearch.common.remote.AbstractRemoteWritableBlobEntity;
import org.opensearch.common.remote.BlobPathParameters;
import org.opensearch.core.compress.Compressor;
Expand All @@ -22,29 +18,27 @@
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.repositories.blobstore.ChecksumWritableBlobStoreFormat;


import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import static org.opensearch.cluster.routing.remote.InternalRemoteRoutingTableService.INDEX_ROUTING_METADATA_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore.INDEX_ROUTING_FILE_PREFIX;

/**
* Remote store object for IndexRoutingTable
*/
public class RemoteIndexRoutingTable extends AbstractRemoteWritableBlobEntity<IndexRoutingTable> {

public static final String INDEX_ROUTING_TABLE = "index_routing_table";
public static final String INDEX_ROUTING_TABLE = "index-routing";
public static final String INDEX_ROUTING_TABLE_PREFIX = "index-routing--";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
public static final String INDEX_ROUTING_FILE = "index_routing";
private IndexRoutingTable indexRoutingTable;
private final Index index;
private long term;
private long version;
public static final ChecksumWritableBlobStoreFormat<IndexRoutingTable> INDEX_ROUTING_TABLE_FORMAT = new ChecksumWritableBlobStoreFormat<>(
"index-routing-table",
IndexRoutingTable::readFrom
);
public static final ChecksumWritableBlobStoreFormat<IndexRoutingTable> INDEX_ROUTING_TABLE_FORMAT =
new ChecksumWritableBlobStoreFormat<>("index-routing-table", IndexRoutingTable::readFrom);

public RemoteIndexRoutingTable(
IndexRoutingTable indexRoutingTable,
Expand All @@ -58,17 +52,17 @@ public RemoteIndexRoutingTable(
this.indexRoutingTable = indexRoutingTable;
this.term = term;
this.version = version;
this.blobFileName = generateBlobFileName();
}

/**
* Reads data from inputStream and creates RemoteIndexRoutingTable object with the {@link IndexRoutingTable}
* @param blobName name of the blob, which contains the index routing data
* @param index index for the current routing data
* @param clusterUUID UUID of the cluster
* @param compressor Compressor object
*/
public RemoteIndexRoutingTable(String blobName, Index index, String clusterUUID, Compressor compressor) {
public RemoteIndexRoutingTable(String blobName, String clusterUUID, Compressor compressor) {
super(clusterUUID, compressor, null);
this.index = index;
this.index = null;
this.term = -1;
this.version = -1;
this.blobName = blobName;
Expand All @@ -84,7 +78,7 @@ public Index getIndex() {

@Override
public BlobPathParameters getBlobPathParameters() {
return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()), "");
return new BlobPathParameters(List.of(indexRoutingTable.getIndex().getUUID()));
}

@Override
Expand All @@ -94,17 +88,22 @@ public String getType() {

@Override
public String generateBlobFileName() {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(term),
RemoteStoreUtils.invertLong(version),
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);
if (blobFileName == null) {
blobFileName = String.join(
DELIMITER,
INDEX_ROUTING_FILE,
RemoteStoreUtils.invertLong(term),
RemoteStoreUtils.invertLong(version),
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);
}
return blobFileName;
}

@Override
public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {
assert blobName != null;
assert index != null;
return new ClusterMetadataManifest.UploadedIndexMetadata(index.getName(), index.getUUID(), blobName, INDEX_ROUTING_METADATA_PREFIX);
}

Expand Down
Loading

0 comments on commit 856fb51

Please sign in to comment.