Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitkala committed May 14, 2024
1 parent 2854383 commit 20c892b
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@

package org.opensearch.remotestore;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;

Expand All @@ -28,15 +31,25 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public class CompositeDirectoryIT extends RemoteStoreBaseIntegTestCase {
@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.WRITEABLE_REMOTE_INDEX, "true").build();
}

public void testCompositeDirectory() throws Exception {
int totalDocs = 0;
Settings settings = Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.WRITEABLE_REMOTE_INDEX, "true")
//.put(super.featureFlagSettings())
//.put(FeatureFlags.WRITEABLE_REMOTE_INDEX, "true")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), "partial")
.put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "fs")
.build();
logger.info("ankitkala: settings {}", settings);
logger.info("ankitkala: Creating index");
assertAcked(client().admin().indices().prepareCreate("test-idx-1").setSettings(settings).get());
GetIndexResponse getIndexResponse = client().admin()
.indices()
Expand Down Expand Up @@ -65,5 +78,28 @@ public void testCompositeDirectory() throws Exception {
ensureGreen("test-idx-1");
indexData(10, false, "test-idx-1");
ensureGreen("test-idx-1");

settings = Settings.builder()
.put("logger.org.opensearch.indices.recovery", "DEBUG")
.put("logger.org.opensearch.index.store.CompositeDirectory", "TRACE")
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings).get());
// Add Replicas.
logger.info("ankitkala: Adding replica now");
settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
assertAcked(client().admin().indices().prepareUpdateSettings("test-idx-1").setSettings(settings).get());
logger.info("ankitkala: Adding replica Done");
//Thread.sleep(10000);

ensureGreen(TimeValue.timeValueSeconds(30), "test-idx-1");
//
indexData(10, false, "test-idx-1");
logger.info("ankitkala: replica green");
//internalCluster().getDataNodeNames().stream().forEach();
settings = Settings.builder()
.put("logger.org.opensearch.indices.recovery", (String) null)
.put("logger.org.opensearch.index.store.CompositeDirectory", (String) null)
.build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings).get());
}
}
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,9 @@ public synchronized IndexShard createShard(
* Composite Directory currently uses FSDirectory's getDirectory() method to fetch and use the Path for operating on FileCache
* TODO : Refactor FileCache to have key in form of String instead of Path. Once that is done we can remove this assertion
*/
assert directoryFactory instanceof FsDirectoryFactory
: "For Composite Directory, local directory must be of type FSDirectory";
logger.info("ankitkala: {}", directoryFactory);
// assert directoryFactory instanceof FsDirectoryFactory
// : "For Composite Directory, local directory must be of type FSDirectory";
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);
directory = new CompositeDirectory(
(FSDirectory) localDirectory,
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,9 @@ public boolean isRemoteStoreEnabled() {
return isRemoteStoreEnabled;
}

public boolean isWarmIndex() {
return isStoreLocalityPartial;
}
public boolean isRemoteNode() {
return RemoteStoreNodeAttribute.isRemoteDataAttributePresent(this.getNodeSettings());
}
Expand Down
30 changes: 21 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,7 @@ public Optional<NRTReplicationEngine> getReplicationEngine() {
}

public void finalizeReplication(SegmentInfos infos) throws IOException {
logger.info("ankitkala: finalize replication");
if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos);
}
Expand Down Expand Up @@ -4988,6 +4989,12 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
boolean syncSegmentSuccess = false;
// Warm index uses a composite directory so directory will show the remote files as well.
// We shouldn't be overriding any files for this case.
boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isWarmIndex() == false;
if (indexSettings.isWarmIndex()) {
logger.info("ankitkala: warm sync segments");
}
long startTimeMs = System.currentTimeMillis();
assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded();
logger.trace("Downloading segments from remote segment store");
Expand All @@ -5010,7 +5017,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());
for (String file : uploadedSegments.keySet()) {
long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);
} else {
recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);
Expand All @@ -5019,23 +5026,28 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
} else {
storeDirectory = store.directory();
}
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
// Do not copy the files as these are fetched on-demand basis
if(indexSettings.isWarmIndex() == false) {
copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, shouldOverrideLocalFiles, onFileSync);
}

if (remoteSegmentMetadata != null) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
if(indexSettings.isWarmIndex() == false) {
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ private boolean isReadyForUpload() {
boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery() || indexShard.shouldSeedRemoteStore();

if (isReady == false) {
StringBuilder sb = new StringBuilder("Skipped syncing segments with");
StringBuilder sb = new StringBuilder("MAX_REPLICATION_BACKPRESSURE_TIME_SETTING");
if (indexShard.getReplicationTracker() != null) {
sb.append(" primaryMode=").append(indexShard.getReplicationTracker().isPrimaryMode());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,15 @@ public void afterSyncToRemote(Collection<String> files) throws IOException {
logger.trace("afterSyncToRemote called even though remote directory is not set");
return;
}
for (String fileName : files) {
writeLock.lock();
try {
localDirectory.deleteFile(fileName);
} finally {
writeLock.unlock();
}
}

// for (String fileName : files) {
// writeLock.lock();
// try {
// localDirectory.deleteFile(fileName);
// } finally {
// writeLock.unlock();
// }
// }
}

private String[] getRemoteFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public void clusterChanged(ClusterChangedEvent event) {
}

private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) {
logger.info("ankitkala: start source service recovery");
final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
final IndexShard shard = indexService.getShard(request.shardId().id());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final StartRecoveryRequest startRequest;
final ReplicationTimer timer;
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.get(recoveryId)) {
logger.info("ankitkala: starting peer recovery");
if (recoveryRef == null) {
logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);
return;
Expand All @@ -252,7 +253,9 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
// plan to revamp this flow so that node-node segment copy will not happen.
// GitHub Issue to track the revamp: https://github.com/opensearch-project/OpenSearch/issues/11331
try {
logger.info("ankitkala: triggering sync segments");
indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime);
logger.info("ankitkala: triggering sync segments done");
} catch (Exception e) {
logger.error(
"Exception while downloading segment files from remote store, will continue with peer to peer segment copy",
Expand All @@ -267,6 +270,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog);
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
logger.info("ankitkala: start recovery request");
startRequest = getStartRecoveryRequest(
logger,
clusterService.localNode(),
Expand All @@ -276,6 +280,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
);
requestToSend = startRequest;
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
logger.info("ankitkala: start recovery request done");
} catch (final Exception e) {
// this will be logged as warning later on...
logger.debug("unexpected error while preparing shard for peer recovery, failing recovery", e);
Expand All @@ -294,6 +299,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
logger.debug("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());
}
}
logger.info("ankitkala: sending request to the source node");
transportService.sendRequest(
startRequest.sourceNode(),
actionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
final StepListener<SendFileResult> sendFileStep = new StepListener<>();
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
logger.info("ankitkala: peer recovery handler innerRecoveryToTarget");

// It is always file based recovery while recovering replicas which are not relocating primary where the
// underlying indices are backed by remote store for storing segments and translog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -63,6 +64,7 @@ public void getCheckpointMetadata(
ActionListener<CheckpointInfoResponse> listener
) {
Map<String, StoreFileMetadata> metadataMap;
logger.info("ankitkala: get checkpoint metadata");
// TODO: Need to figure out a way to pass this information for segment metadata via remote store.
try (final GatedCloseable<SegmentInfos> segmentInfosSnapshot = indexShard.getSegmentInfosSnapshot()) {
final Version version = segmentInfosSnapshot.get().getCommitLuceneVersion();
Expand All @@ -74,6 +76,7 @@ public void getCheckpointMetadata(
return;
}
assert mdFile != null : "Remote metadata file can't be null if shard is active " + indexShard.state();
logger.info("ankitkala: all files in metadata: {}", mdFile.getMetadata().keySet());
metadataMap = mdFile.getMetadata()
.entrySet()
.stream()
Expand Down Expand Up @@ -114,12 +117,17 @@ public void getSegmentFiles(
if (remoteMetadataExists()) {
final Directory storeDirectory = indexShard.store().directory();
final Collection<String> directoryFiles = List.of(storeDirectory.listAll());
final List<String> toDownloadSegmentNames = new ArrayList<>();
List<String> toDownloadSegmentNames = new ArrayList<>();
for (StoreFileMetadata fileMetadata : filesToFetch) {
String file = fileMetadata.name();
assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
assert directoryFiles.contains(file) == false || indexShard.indexSettings().isWarmIndex() : "Local store already contains the file " + file;
toDownloadSegmentNames.add(file);
}
logger.info("ankitkala: toDownloadSegmentNames is {}", toDownloadSegmentNames);
if(indexShard.indexSettings().isWarmIndex()) {
logger.info("Skipping download for warm index");
listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
}
indexShard.getFileDownloader()
.downloadAsync(
cancellableThreads,
Expand Down
Loading

0 comments on commit 20c892b

Please sign in to comment.