Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
Fix InternalEngine to correctly set up read only shards. (opensearch-…
Browse files Browse the repository at this point in the history
…project#2422)

This change fixes a bug that flipped primary and replica shard InternalEngine implementations.
This also updates replicas to set a historyUUID so that assertSequenceNumbersInCommit succeeds when initializing  the shard.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Mar 10, 2022
1 parent 9341099 commit f85d113
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public EngineConfig newEngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
Boolean isPrimary
Boolean isReadOnly
) {

return new EngineConfig(
Expand All @@ -138,7 +138,7 @@ public EngineConfig newEngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
isPrimary,
isReadOnly,
primaryTermSupplier,
tombstoneDocSupplier
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,6 @@ public class InternalEngine extends Engine {
@Nullable
private volatile String forceMergeUUID;

private boolean isReadOnlyReplica() {
return engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isReadOnly();
}

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
}
Expand Down Expand Up @@ -281,18 +277,16 @@ public InternalEngine(EngineConfig engineConfig) {
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
// TODO: Segrep - should have a separate read only engine rather than all this conditional logic.
if (isReadOnlyReplica()) {
if (engineConfig.isReadOnly()) {
// Segrep - hack to make this engine read only and not use
writer = null;
historyUUID = null;
forceMergeUUID = null;
} else {
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
}
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
indexWriter = writer;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -586,7 +580,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
translog.currentFileGeneration()
)
);
if (isReadOnlyReplica() == false) {
if (engineConfig.isReadOnly() == false) {
flush(false, true);
}
translog.trimUnreferencedReaders();
Expand Down Expand Up @@ -656,7 +650,7 @@ public Translog.Location getTranslogLastWriteLocation() {

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
if (isReadOnlyReplica() == false) {
if (engineConfig.isReadOnly() == false) {
indexWriter.deleteUnusedFiles();
}
}
Expand Down Expand Up @@ -720,7 +714,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external

private DirectoryReader getDirectoryReader() throws IOException {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
if (isReadOnlyReplica()) {
if (engineConfig.isReadOnly()) {
return DirectoryReader.open(store.directory());
}
return DirectoryReader.open(indexWriter);
Expand Down Expand Up @@ -1984,7 +1978,7 @@ public boolean shouldPeriodicallyFlush() {

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (isReadOnlyReplica()) {
if (engineConfig.isReadOnly()) {
return;
}
ensureOpen();
Expand Down Expand Up @@ -2444,7 +2438,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
logger.trace("rollback indexWriter");
try {
if (isReadOnlyReplica() == false) {
if (engineConfig.isReadOnly() == false) {
indexWriter.rollback();
}
} catch (AlreadyClosedException ex) {
Expand Down Expand Up @@ -2929,7 +2923,10 @@ public Closeable acquireHistoryRetentionLock() {
/**
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
private Map<String, String> commitDataAsMap(final IndexWriter indexWriter) throws IOException {
if (engineConfig.isReadOnly()) {
return SegmentInfos.readLatestCommit(store.directory()).getUserData();
}
final Map<String, String> commitData = new HashMap<>(8);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,6 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
// TODO: Segrep - Fix
assert assertSequenceNumbersInCommit();
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}
Expand Down Expand Up @@ -2715,7 +2714,6 @@ public boolean assertRetentionLeasesPersisted() throws IOException {
public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
// TODO: Segrep - Fix retention leases
replicationTracker.renewPeerRecoveryRetentionLeases();
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
Expand Down Expand Up @@ -3328,7 +3326,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
shardRouting.primary()
indexSettings.isSegrepEnabled() && shardRouting.primary() == false
);
}

Expand Down

0 comments on commit f85d113

Please sign in to comment.