Skip to content

Commit

Permalink
Refactor TransferManager interface to RemoteStoreFileTrackerAdapter
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed Mar 20, 2024
1 parent eaf9bda commit f1cd4e4
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 28 deletions.
9 changes: 8 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.ComposableIndexTemplate;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -90,6 +91,7 @@
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogFactory;
Expand All @@ -104,6 +106,7 @@
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.threadpool.ThreadPool;

import java.awt.*;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -486,12 +489,16 @@ public synchronized IndexShard createShard(
};

Store remoteStore = null;
Directory remoteDirectory = null;
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path);
remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY, path);
}

Directory directory = directoryFactory.newDirectory(this.indexSettings, path);
if (directory instanceof CompositeDirectory) {
((CompositeDirectory) directory).setRemoteDirectory(remoteDirectory);
}
store = new Store(
shardId,
this.indexSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.filetracker.FileState;
Expand All @@ -27,14 +27,18 @@
public class CompositeDirectory extends FilterDirectory {

private final FSDirectory localDirectory;
private final TransferManager transferManager;
private final RemoteStoreFileTrackerAdapter remoteStoreFileTrackerAdapter;
private final FileCache fileCache;

public CompositeDirectory(FSDirectory localDirectory, BlobContainer blobContainer, FileCache fileCache) {
public CompositeDirectory(FSDirectory localDirectory, FileCache fileCache) {
super(localDirectory);
this.localDirectory = localDirectory;
this.fileCache = fileCache;
this.transferManager = new CompositeDirectoryTransferManager(fileCache, blobContainer);
this.remoteStoreFileTrackerAdapter = new CompositeDirectoryRemoteStoreFileTrackerAdapter(fileCache);
}

public void setRemoteDirectory(Directory remoteDirectory) {
((CompositeDirectoryRemoteStoreFileTrackerAdapter)remoteStoreFileTrackerAdapter).setRemoteDirectory(remoteDirectory);
}

@Override
Expand All @@ -45,7 +49,7 @@ public String[] listAll() throws IOException {
@Override
public void deleteFile(String name) throws IOException {
super.deleteFile(name);
transferManager.removeFileFromTracker(name);
remoteStoreFileTrackerAdapter.removeFileFromTracker(name);
fileCache.remove(localDirectory.getDirectory().resolve(name));
}

Expand All @@ -56,7 +60,7 @@ public long fileLength(String name) throws IOException {

@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
transferManager.trackFile(name, FileState.DISK, FileType.NON_BLOCK);
remoteStoreFileTrackerAdapter.trackFile(name, FileState.DISK, FileType.NON_BLOCK);
return localDirectory.createOutput(name, context);
}

Expand All @@ -78,17 +82,17 @@ public void syncMetaData() throws IOException {
@Override
public void rename(String source, String dest) throws IOException {
localDirectory.rename(source, dest);
transferManager.trackFile(dest, transferManager.getFileState(source), transferManager.getFileType(source));
transferManager.removeFileFromTracker(source);
remoteStoreFileTrackerAdapter.trackFile(dest, remoteStoreFileTrackerAdapter.getFileState(source), remoteStoreFileTrackerAdapter.getFileType(source));
remoteStoreFileTrackerAdapter.removeFileFromTracker(source);
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
if (!transferManager.isFilePresent(name)) {
if (!remoteStoreFileTrackerAdapter.isFilePresent(name)) {
return localDirectory.openInput(name, context);
}
IndexInput indexInput = null;
switch (transferManager.getFileState(name)) {
switch (remoteStoreFileTrackerAdapter.getFileState(name)) {
case DISK:
indexInput = localDirectory.openInput(name, context);
break;
Expand Down Expand Up @@ -121,8 +125,8 @@ public Set<String> getPendingDeletions() throws IOException {

public void afterSyncToRemote(Collection<String> files) throws IOException {
for (String fileName : files) {
if (transferManager.isFilePresent(fileName) && !transferManager.getFileState(fileName).equals(FileState.CACHE)) {
transferManager.updateFileState(fileName, FileState.CACHE);
if (remoteStoreFileTrackerAdapter.isFilePresent(fileName) && !remoteStoreFileTrackerAdapter.getFileState(fileName).equals(FileState.CACHE)) {
remoteStoreFileTrackerAdapter.updateFileState(fileName, FileState.CACHE);
}
fileCache.put(localDirectory.getDirectory().resolve(fileName), new CachedIndexInput() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,8 @@ public CompositeDirectoryFactory(Supplier<RepositoriesService> repositoriesServi

@Override
public Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
String repositoryName = indexSettings.getRemoteStoreRepository();
Repository repository = repositoriesService.get().repository(repositoryName);
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
String shardId = String.valueOf(shardPath.getShardId().getId());
String indexUUID = indexSettings.getIndex().getUUID();
BlobPath blobPath = blobStoreRepository.basePath().add(indexUUID).add(shardId).add("segments").add("data");
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(blobPath);

final Path location = shardPath.resolveIndex();
final FSDirectory primaryDirectory = FSDirectory.open(location);

return new CompositeDirectory(primaryDirectory, blobContainer, remoteStoreFileCache);
return new CompositeDirectory(primaryDirectory, remoteStoreFileCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.store;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.index.store.remote.filecache.FileCache;
Expand All @@ -19,18 +20,22 @@
import java.util.HashMap;
import java.util.Map;

public class CompositeDirectoryTransferManager implements TransferManager {
public class CompositeDirectoryRemoteStoreFileTrackerAdapter implements RemoteStoreFileTrackerAdapter {

private FileCache fileCache;
private Map<String, FileTrackingInfo> fileTracker;
private BlobContainer blobContainer;
private RemoteSegmentStoreDirectory remoteDirectory;

public CompositeDirectoryTransferManager(FileCache fileCache, BlobContainer blobContainer) {
public CompositeDirectoryRemoteStoreFileTrackerAdapter(FileCache fileCache) {
this.fileCache = fileCache;
this.blobContainer = blobContainer;
remoteDirectory = null;
this.fileTracker = new HashMap<>();
}

public void setRemoteDirectory(Directory remoteDirectory) {
this.remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectory;
}

@Override
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) {
// TODO - This function will fetch the requested data from blobContainer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.index.store.remote.utils.filetracker.FileState;
import org.opensearch.index.store.remote.utils.filetracker.FileType;

public interface TransferManager {
public interface RemoteStoreFileTrackerAdapter {
IndexInput fetchBlob(BlobFetchRequest blobFetchRequest);

void trackFile(String name, FileState fileState, FileType fileType);
Expand Down

0 comments on commit f1cd4e4

Please sign in to comment.