diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index c926a5a47191e..5c09708b62cae 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -30,6 +30,8 @@ import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; +import org.elasticsearch.Assertions; +import org.elasticsearch.Version; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.core.internal.io.IOUtils; @@ -98,7 +100,25 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null; this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; - this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; + if (seqNoStats == null) { + seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos); + // During a peer-recovery the global checkpoint is not known and up to date when the engine + // is created, so we only check the max seq no / global checkpoint coherency when the global + // checkpoint is different from the unassigned sequence number value. + // In addition to that we only execute the check if the index the engine belongs to has been + // created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction + // that guarantee that all operations have been flushed to Lucene. + final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); + if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO + && engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) { + if (seqNoStats.getMaxSeqNo() != globalCheckpoint) { + assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint); + throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo() + + "] from last commit does not match global checkpoint [" + globalCheckpoint + "]"); + } + } + } + this.seqNoStats = seqNoStats; this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); reader = open(indexCommit); reader = wrapReader(reader, readerWrapperFunction); @@ -116,6 +136,12 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats } } + protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { + if (Assertions.ENABLED) { + assert false : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]"; + } + } + protected final DirectoryReader wrapReader(DirectoryReader reader, Function readerWrapperFunction) throws IOException { reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 89f89ae69a4ef..b77d6b9664acf 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -70,7 +70,7 @@ public void testReadOnlyEngine() throws Exception { lastDocIds = getDocIds(engine, true); assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); - assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); + assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); for (int i = 0; i < numDocs; i++) { if (randomBoolean()) { String delId = Integer.toString(i); @@ -126,7 +126,7 @@ public void testFlushes() throws IOException { if (rarely()) { engine.flush(); } - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + globalCheckpoint.set(i); } engine.syncTranslog(); engine.flushAndClose(); @@ -139,6 +139,40 @@ public void testFlushes() throws IOException { } } + public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException { + IOUtils.close(engine, store); + Engine readOnlyEngine = null; + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + final int numDocs = scaledRandomIntBetween(10, 100); + try (InternalEngine engine = createEngine(config)) { + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); + maxSeqNo = engine.getLocalCheckpoint(); + } + globalCheckpoint.set(engine.getLocalCheckpoint() - 1); + engine.syncTranslog(); + engine.flushAndClose(); + + IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> new ReadOnlyEngine(engine.engineConfig, null, null, true, Function.identity()) { + @Override + protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { + // we don't want the assertion to trip in this test + } + }); + assertThat(exception.getMessage(), equalTo("Maximum sequence number [" + maxSeqNo + + "] from last commit does not match global checkpoint [" + globalCheckpoint.get() + "]")); + } finally { + IOUtils.close(readOnlyEngine); + } + } + } + public void testReadOnly() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);