From f85d113c86ce6f54dbb8de0942b6b74c53b20d88 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Wed, 9 Mar 2022 16:58:46 -0800 Subject: [PATCH] Fix InternalEngine to correctly set up read only shards. (#2422) This change fixes a bug that flipped primary and replica shard InternalEngine implementations. This also updates replicas to set a historyUUID so that assertSequenceNumbersInCommit succeeds when initializing the shard. Signed-off-by: Marc Handalian --- .../index/engine/EngineConfigFactory.java | 4 +-- .../index/engine/InternalEngine.java | 29 +++++++++---------- .../opensearch/index/shard/IndexShard.java | 4 +-- 3 files changed, 16 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index 869c274ba4841..527252f20cb97 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -113,7 +113,7 @@ public EngineConfig newEngineConfig( Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier, - Boolean isPrimary + Boolean isReadOnly ) { return new EngineConfig( @@ -138,7 +138,7 @@ public EngineConfig newEngineConfig( circuitBreakerService, globalCheckpointSupplier, retentionLeasesSupplier, - isPrimary, + isReadOnly, primaryTermSupplier, tombstoneDocSupplier ); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index a5ec9b437e00d..aa1be50499b24 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -221,10 +221,6 @@ public class InternalEngine extends Engine { @Nullable private volatile String forceMergeUUID; - private boolean isReadOnlyReplica() { - return engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isReadOnly(); - } - public InternalEngine(EngineConfig engineConfig) { this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new); } @@ -281,18 +277,16 @@ public InternalEngine(EngineConfig engineConfig) { ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); // TODO: Segrep - should have a separate read only engine rather than all this conditional logic. - if (isReadOnlyReplica()) { + if (engineConfig.isReadOnly()) { // Segrep - hack to make this engine read only and not use writer = null; - historyUUID = null; - forceMergeUUID = null; } else { writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); - final Map commitData = commitDataAsMap(writer); - historyUUID = loadHistoryUUID(commitData); - forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); } + final Map commitData = commitDataAsMap(writer); + historyUUID = loadHistoryUUID(commitData); + forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); indexWriter = writer; } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); @@ -586,7 +580,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery translog.currentFileGeneration() ) ); - if (isReadOnlyReplica() == false) { + if (engineConfig.isReadOnly() == false) { flush(false, true); } translog.trimUnreferencedReaders(); @@ -656,7 +650,7 @@ public Translog.Location getTranslogLastWriteLocation() { private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { if (combinedDeletionPolicy.hasUnreferencedCommits()) { - if (isReadOnlyReplica() == false) { + if (engineConfig.isReadOnly() == false) { indexWriter.deleteUnusedFiles(); } } @@ -720,7 +714,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. - if (isReadOnlyReplica()) { + if (engineConfig.isReadOnly()) { return DirectoryReader.open(store.directory()); } return DirectoryReader.open(indexWriter); @@ -1984,7 +1978,7 @@ public boolean shouldPeriodicallyFlush() { @Override public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - if (isReadOnlyReplica()) { + if (engineConfig.isReadOnly()) { return; } ensureOpen(); @@ -2444,7 +2438,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed logger.trace("rollback indexWriter"); try { - if (isReadOnlyReplica() == false) { + if (engineConfig.isReadOnly() == false) { indexWriter.rollback(); } } catch (AlreadyClosedException ex) { @@ -2929,7 +2923,10 @@ public Closeable acquireHistoryRetentionLock() { /** * Gets the commit data from {@link IndexWriter} as a map. */ - private static Map commitDataAsMap(final IndexWriter indexWriter) { + private Map commitDataAsMap(final IndexWriter indexWriter) throws IOException { + if (engineConfig.isReadOnly()) { + return SegmentInfos.readLatestCommit(store.directory()).getUserData(); + } final Map commitData = new HashMap<>(8); for (Map.Entry entry : indexWriter.getLiveCommitData()) { commitData.put(entry.getKey(), entry.getValue()); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index be13aa8273dae..2099df55bee3b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2067,7 +2067,6 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. onSettingsChanged(); - // TODO: Segrep - Fix assert assertSequenceNumbersInCommit(); recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); } @@ -2715,7 +2714,6 @@ public boolean assertRetentionLeasesPersisted() throws IOException { public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); - // TODO: Segrep - Fix retention leases replicationTracker.renewPeerRecoveryRetentionLeases(); final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { @@ -3328,7 +3326,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { replicationTracker::getRetentionLeases, () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), - shardRouting.primary() + indexSettings.isSegrepEnabled() && shardRouting.primary() == false ); }