From 54eca8da2da5d14c7786da4d9eea90decd5f0007 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 22 Jan 2019 09:22:33 +0100 Subject: [PATCH] Ensure that max seq # is equal to the global checkpoint when creating ReadOnlyEngines (#37426) Since version 6.7.0 the Close Index API guarantees that all translog operations have been correctly flushed before the index is closed. If the index is reopened as a Frozen index (which uses a ReadOnlyEngine) we can verify that the maximum sequence number from the last Lucene commit is indeed equal to the last known global checkpoint and refuses to open the read only engine if it's not the case. In this PR the check is only done for indices created on or after 6.7.0 as they are guaranteed to be closed using the new Close Index API. Related #33888 --- .../index/engine/ReadOnlyEngine.java | 28 +++++++++++++- .../index/engine/ReadOnlyEngineTests.java | 38 ++++++++++++++++++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index c926a5a47191e..5c09708b62cae 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -30,6 +30,8 @@ import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; +import org.elasticsearch.Assertions; +import org.elasticsearch.Version; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.core.internal.io.IOUtils; @@ -98,7 +100,25 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null; this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; - this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; + if (seqNoStats == null) { + seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos); + // During a peer-recovery the global checkpoint is not known and up to date when the engine + // is created, so we only check the max seq no / global checkpoint coherency when the global + // checkpoint is different from the unassigned sequence number value. + // In addition to that we only execute the check if the index the engine belongs to has been + // created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction + // that guarantee that all operations have been flushed to Lucene. + final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); + if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO + && engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) { + if (seqNoStats.getMaxSeqNo() != globalCheckpoint) { + assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint); + throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo() + + "] from last commit does not match global checkpoint [" + globalCheckpoint + "]"); + } + } + } + this.seqNoStats = seqNoStats; this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); reader = open(indexCommit); reader = wrapReader(reader, readerWrapperFunction); @@ -116,6 +136,12 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats } } + protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { + if (Assertions.ENABLED) { + assert false : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]"; + } + } + protected final DirectoryReader wrapReader(DirectoryReader reader, Function readerWrapperFunction) throws IOException { reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 89f89ae69a4ef..b77d6b9664acf 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -70,7 +70,7 @@ public void testReadOnlyEngine() throws Exception { lastDocIds = getDocIds(engine, true); assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); - assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); + assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); for (int i = 0; i < numDocs; i++) { if (randomBoolean()) { String delId = Integer.toString(i); @@ -126,7 +126,7 @@ public void testFlushes() throws IOException { if (rarely()) { engine.flush(); } - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + globalCheckpoint.set(i); } engine.syncTranslog(); engine.flushAndClose(); @@ -139,6 +139,40 @@ public void testFlushes() throws IOException { } } + public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException { + IOUtils.close(engine, store); + Engine readOnlyEngine = null; + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + final int numDocs = scaledRandomIntBetween(10, 100); + try (InternalEngine engine = createEngine(config)) { + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); + maxSeqNo = engine.getLocalCheckpoint(); + } + globalCheckpoint.set(engine.getLocalCheckpoint() - 1); + engine.syncTranslog(); + engine.flushAndClose(); + + IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> new ReadOnlyEngine(engine.engineConfig, null, null, true, Function.identity()) { + @Override + protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { + // we don't want the assertion to trip in this test + } + }); + assertThat(exception.getMessage(), equalTo("Maximum sequence number [" + maxSeqNo + + "] from last commit does not match global checkpoint [" + globalCheckpoint.get() + "]")); + } finally { + IOUtils.close(readOnlyEngine); + } + } + } + public void testReadOnly() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);