Skip to content

Commit

Permalink
Ensure that max seq # is equal to the global checkpoint when creating…
Browse files Browse the repository at this point in the history
… ReadOnlyEngines (elastic#37426)

Since version 6.7.0 the Close Index API guarantees that all translog 
operations have been correctly flushed before the index is closed. If 
the index is reopened as a Frozen index (which uses a ReadOnlyEngine) 
we can verify that the maximum sequence number from the last Lucene 
commit is indeed equal to the last known global checkpoint and refuses 
to open the read only engine if it's not the case. In this PR the check is 
only done for indices created on or after 6.7.0 as they are guaranteed 
to be closed using the new Close Index API.

Related elastic#33888
  • Loading branch information
tlrx committed Feb 11, 2019
1 parent 5cea933 commit 54eca8d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -98,7 +100,25 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos);
// During a peer-recovery the global checkpoint is not known and up to date when the engine
// is created, so we only check the max seq no / global checkpoint coherency when the global
// checkpoint is different from the unassigned sequence number value.
// In addition to that we only execute the check if the index the engine belongs to has been
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
// that guarantee that all operations have been flushed to Lucene.
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO
&& engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) {
if (seqNoStats.getMaxSeqNo() != globalCheckpoint) {
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint);
throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo()
+ "] from last commit does not match global checkpoint [" + globalCheckpoint + "]");
}
}
}
this.seqNoStats = seqNoStats;
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
reader = open(indexCommit);
reader = wrapReader(reader, readerWrapperFunction);
Expand All @@ -116,6 +136,12 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
}
}

protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
if (Assertions.ENABLED) {
assert false : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]";
}
}

protected final DirectoryReader wrapReader(DirectoryReader reader,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testReadOnlyEngine() throws Exception {
lastDocIds = getDocIds(engine, true);
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo()));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
for (int i = 0; i < numDocs; i++) {
if (randomBoolean()) {
String delId = Integer.toString(i);
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testFlushes() throws IOException {
if (rarely()) {
engine.flush();
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
globalCheckpoint.set(i);
}
engine.syncTranslog();
engine.flushAndClose();
Expand All @@ -139,6 +139,40 @@ public void testFlushes() throws IOException {
}
}

public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException {
IOUtils.close(engine, store);
Engine readOnlyEngine = null;
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
final int numDocs = scaledRandomIntBetween(10, 100);
try (InternalEngine engine = createEngine(config)) {
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
maxSeqNo = engine.getLocalCheckpoint();
}
globalCheckpoint.set(engine.getLocalCheckpoint() - 1);
engine.syncTranslog();
engine.flushAndClose();

IllegalStateException exception = expectThrows(IllegalStateException.class,
() -> new ReadOnlyEngine(engine.engineConfig, null, null, true, Function.identity()) {
@Override
protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
// we don't want the assertion to trip in this test
}
});
assertThat(exception.getMessage(), equalTo("Maximum sequence number [" + maxSeqNo
+ "] from last commit does not match global checkpoint [" + globalCheckpoint.get() + "]"));
} finally {
IOUtils.close(readOnlyEngine);
}
}
}

public void testReadOnly() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down

0 comments on commit 54eca8d

Please sign in to comment.