-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
drop index.shard.check_on_startup: fix
#32279
Changes from 7 commits
843f977
4f01609
153e4f2
2964fef
c71e306
a7668d6
97fa399
c155b36
85b7eef
3231803
c2b5b8a
14e6175
fee8a5b
5cee2b9
ad62da0
6f6ca5a
6763cf9
5083e83
2a9dbeb
aa16487
d26fbfb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ | |
import org.apache.lucene.index.CorruptIndexException; | ||
import org.apache.lucene.index.DirectoryReader; | ||
import org.apache.lucene.index.IndexCommit; | ||
import org.apache.lucene.index.IndexWriter; | ||
import org.apache.lucene.index.Term; | ||
import org.apache.lucene.search.IndexSearcher; | ||
import org.apache.lucene.search.TermQuery; | ||
|
@@ -111,6 +112,7 @@ | |
import org.elasticsearch.snapshots.SnapshotId; | ||
import org.elasticsearch.snapshots.SnapshotInfo; | ||
import org.elasticsearch.snapshots.SnapshotShardFailure; | ||
import org.elasticsearch.test.CorruptionUtils; | ||
import org.elasticsearch.test.DummyShardLock; | ||
import org.elasticsearch.test.FieldMaskingReader; | ||
import org.elasticsearch.test.VersionUtils; | ||
|
@@ -119,7 +121,11 @@ | |
|
||
import java.io.IOException; | ||
import java.nio.charset.Charset; | ||
import java.nio.file.FileVisitResult; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.SimpleFileVisitor; | ||
import java.nio.file.attribute.BasicFileAttributes; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
|
@@ -1230,7 +1236,7 @@ public String[] listAll() throws IOException { | |
}; | ||
|
||
try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) { | ||
IndexShard shard = newShard(shardRouting, shardPath, metaData, store, | ||
IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store, | ||
null, new InternalEngineFactory(), () -> { | ||
}, EMPTY_EVENT_LISTENER); | ||
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false); | ||
|
@@ -2571,6 +2577,142 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept | |
closeShards(newShard); | ||
} | ||
|
||
public void testIndexCheckOnStartup() throws Exception { | ||
final IndexShard indexShard = newStartedShard(true); | ||
|
||
final long numDocs = between(10, 100); | ||
for (long i = 0; i < numDocs; i++) { | ||
indexDoc(indexShard, "_doc", Long.toString(i), "{}"); | ||
} | ||
indexShard.flush(new FlushRequest()); | ||
closeShards(indexShard); | ||
|
||
// start shard with checksum - it has to pass successfully | ||
final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), | ||
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE | ||
); | ||
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData()) | ||
.settings(Settings.builder() | ||
.put(indexShard.indexSettings.getSettings()) | ||
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("true", "checksum"))) | ||
.build(); | ||
|
||
final ShardPath shardPath = indexShard.shardPath(); | ||
|
||
final Path indexPath = corruptIndexFile(shardPath); | ||
|
||
// check that corrupt marker is *NOT* there | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this comment is necessary here. The check is the assertion a few lines further down, and it should be clear what that assertion is doing. I think the message in the assertion could use the phrase "corruption marker" rather than "clean" for consistency. |
||
final AtomicInteger corruptedMarkerCount = new AtomicInteger(); | ||
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() { | ||
@Override | ||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { | ||
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) { | ||
corruptedMarkerCount.incrementAndGet(); | ||
} | ||
return FileVisitResult.CONTINUE; | ||
} | ||
}; | ||
Files.walkFileTree(indexPath, corruptedVisitor); | ||
|
||
assertThat("store is clean", corruptedMarkerCount.get(), equalTo(0)); | ||
|
||
IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, | ||
null, null, indexShard.engineFactory, | ||
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); | ||
|
||
expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please could you assert something about the thrown exception - ideally that its message describes the affected shard and contains an appropriate message? |
||
|
||
// check that corrupt marker is there | ||
Files.walkFileTree(indexPath, corruptedVisitor); | ||
assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1)); | ||
|
||
try { | ||
closeShards(corruptedShard); | ||
} catch (RuntimeException e) { | ||
assertThat(e.getMessage(), equalTo("CheckIndex failed")); | ||
} | ||
} | ||
|
||
public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception { | ||
final IndexShard indexShard = newStartedShard(true); | ||
|
||
final long numDocs = between(10, 100); | ||
for (long i = 0; i < numDocs; i++) { | ||
indexDoc(indexShard, "_doc", Long.toString(i), "{}"); | ||
} | ||
indexShard.flush(new FlushRequest()); | ||
closeShards(indexShard); | ||
|
||
final ShardPath shardPath = indexShard.shardPath(); | ||
|
||
final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), | ||
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE | ||
); | ||
final IndexMetaData indexMetaData = indexShard.indexSettings().getIndexMetaData(); | ||
|
||
final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME); | ||
|
||
// create corrupted marker | ||
final String corruptionMessage = "fake ioexception"; | ||
try(Store store = createStore(indexShard.indexSettings(), shardPath)){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: whitespace |
||
store.markStoreCorrupted(new IOException(corruptionMessage)); | ||
} | ||
|
||
// try to start shard on corrupted files | ||
final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, | ||
null, null, indexShard.engineFactory, | ||
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); | ||
|
||
final IndexShardRecoveryException exception1 = expectThrows(IndexShardRecoveryException.class, | ||
() -> newStartedShard(p -> corruptedShard, true)); | ||
assertThat(exception1.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)")); | ||
closeShards(corruptedShard); | ||
|
||
// check that corrupt marker is there | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly - this comment isn't needed here. |
||
final AtomicInteger corruptedMarkerCount = new AtomicInteger(); | ||
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() { | ||
@Override | ||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { | ||
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) { | ||
corruptedMarkerCount.incrementAndGet(); | ||
} | ||
return FileVisitResult.CONTINUE; | ||
} | ||
}; | ||
Files.walkFileTree(indexPath, corruptedVisitor); | ||
assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1)); | ||
|
||
// try to start another time shard on corrupted files | ||
final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData, | ||
null, null, indexShard.engineFactory, | ||
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); | ||
|
||
final IndexShardRecoveryException exception2 = expectThrows(IndexShardRecoveryException.class, | ||
() -> newStartedShard(p -> corruptedShard2, true)); | ||
assertThat(exception2.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)")); | ||
closeShards(corruptedShard2); | ||
|
||
// check that corrupt marker is there | ||
corruptedMarkerCount.set(0); | ||
Files.walkFileTree(indexPath, corruptedVisitor); | ||
assertThat("store still has a single corrupt marker", corruptedMarkerCount.get(), equalTo(1)); | ||
} | ||
|
||
private Path corruptIndexFile(ShardPath shardPath) throws IOException { | ||
final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME); | ||
final Path[] filesToCorrupt = | ||
Files.walk(indexPath) | ||
.filter(p -> { | ||
final String name = p.getFileName().toString(); | ||
return Files.isRegularFile(p) | ||
&& IndexWriter.WRITE_LOCK_NAME.equals(name) == false | ||
&& name.startsWith("segments_") == false && name.endsWith(".si") == false; | ||
}) | ||
.toArray(Path[]::new); | ||
CorruptionUtils.corruptFile(random(), filesToCorrupt); | ||
return indexPath; | ||
} | ||
|
||
/** | ||
* Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService | ||
* and checking index concurrently. This should always be possible without any exception. | ||
|
@@ -2594,7 +2736,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { | |
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData()) | ||
.settings(Settings.builder() | ||
.put(indexShard.indexSettings.getSettings()) | ||
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix"))) | ||
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum"))) | ||
.build(); | ||
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData, | ||
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems misleading: we don't start the shard here, and we expect it not to pass successfully when we do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, addressed your comments