diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 54c0c1c1b157c..214d77541779e 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -65,9 +65,7 @@ corruption is detected, it will prevent the shard from being opened. Accepts: `fix`:: - Check for both physical and logical corruption. Segments that were reported - as corrupted will be automatically removed. This option *may result in data loss*. - Use with extreme caution! + The same as `false`. This option is deprecated and will be completely removed in 7.0. WARNING: Expert only. Checking shards may take a lot of time on large indices. -- diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index ef5f9ab0ef3ec..24c78c72033f3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -301,6 +301,10 @@ public IndexShard( logger.debug("state: [CREATED]"); this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); + if ("fix".equals(checkIndexOnStartup)) { + deprecationLogger.deprecated("Setting [index.shard.check_on_startup] is set to deprecated value [fix], " + + "which has no effect and will not be accepted in future"); + } this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); final String aId = shardRouting.allocationId().getId(); this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger); @@ -1325,7 +1329,7 @@ private void innerOpenEngineAndTranslog() throws IOException { } recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX); // also check here, before we apply the translog - if (Booleans.isTrue(checkIndexOnStartup)) { + if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) { try { checkIndex(); } catch (IOException ex) { @@ -1933,6 +1937,9 @@ void checkIndex() throws IOException { if (store.tryIncRef()) { try { doCheckIndex(); + } catch (IOException e) { + store.markStoreCorrupted(e); + throw e; } finally { store.decRef(); } @@ -1976,18 +1983,7 @@ private void doCheckIndex() throws IOException { return; } logger.warn("check index [failure]\n{}", os.bytes().utf8ToString()); - if ("fix".equals(checkIndexOnStartup)) { - if (logger.isDebugEnabled()) { - logger.debug("fixing index, writing new segments file ..."); - } - store.exorciseIndex(status); - if (logger.isDebugEnabled()) { - logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName); - } - } else { - // only throw a failure if we are not going to fix the index - throw new IllegalStateException("index check failure but can't fix it"); - } + throw new IOException("index check failure"); } } diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 85975bc68c85f..a00d779ca7712 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -134,7 +134,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0 static final int VERSION_START = 0; static final int VERSION = VERSION_WRITE_THROWABLE; - static final String CORRUPTED = "corrupted_"; + // public is for test purposes + public static final String CORRUPTED = "corrupted_"; public static final Setting INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING = Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope); @@ -360,18 +361,6 @@ public CheckIndex.Status checkIndex(PrintStream out) throws IOException { } } - /** - * Repairs the index using the previous returned status from {@link #checkIndex(PrintStream)}. - */ - public void exorciseIndex(CheckIndex.Status status) throws IOException { - metadataLock.writeLock().lock(); - try (CheckIndex checkIndex = new CheckIndex(directory)) { - checkIndex.exorciseIndex(status); - } finally { - metadataLock.writeLock().unlock(); - } - } - public StoreStats stats() throws IOException { ensureOpen(); return new StoreStats(directory.estimateSize()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 50f95bf4d4738..fdf7f5438d05f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -118,6 +119,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.test.CorruptionUtils; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.FieldMaskingReader; import org.elasticsearch.test.VersionUtils; @@ -126,7 +128,11 @@ import java.io.IOException; import java.nio.charset.Charset; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1239,7 +1245,7 @@ public String[] listAll() throws IOException { }; try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) { - IndexShard shard = newShard(shardRouting, shardPath, metaData, store, + IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store, null, new InternalEngineFactory(), () -> { }, EMPTY_EVENT_LISTENER); AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false); @@ -2590,6 +2596,143 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept closeShards(newShard); } + public void testIndexCheckOnStartup() throws Exception { + final IndexShard indexShard = newStartedShard(true); + + final long numDocs = between(10, 100); + for (long i = 0; i < numDocs; i++) { + indexDoc(indexShard, "_doc", Long.toString(i), "{}"); + } + indexShard.flush(new FlushRequest()); + closeShards(indexShard); + + final ShardPath shardPath = indexShard.shardPath(); + + final Path indexPath = corruptIndexFile(shardPath); + + final AtomicInteger corruptedMarkerCount = new AtomicInteger(); + final SimpleFileVisitor corruptedVisitor = new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) { + corruptedMarkerCount.incrementAndGet(); + } + return FileVisitResult.CONTINUE; + } + }; + Files.walkFileTree(indexPath, corruptedVisitor); + + assertThat("corruption marker should not be there", corruptedMarkerCount.get(), equalTo(0)); + + final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE + ); + // start shard and perform index check on startup. It enforce shard to fail due to corrupted index files + final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData()) + .settings(Settings.builder() + .put(indexShard.indexSettings.getSettings()) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("true", "checksum"))) + .build(); + + IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, + null, null, indexShard.engineFactory, + indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + + final IndexShardRecoveryException indexShardRecoveryException = + expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true)); + assertThat(indexShardRecoveryException.getMessage(), equalTo("failed recovery")); + + // check that corrupt marker is there + Files.walkFileTree(indexPath, corruptedVisitor); + assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1)); + + try { + closeShards(corruptedShard); + } catch (RuntimeException e) { + assertThat(e.getMessage(), equalTo("CheckIndex failed")); + } + } + + public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception { + final IndexShard indexShard = newStartedShard(true); + + final long numDocs = between(10, 100); + for (long i = 0; i < numDocs; i++) { + indexDoc(indexShard, "_doc", Long.toString(i), "{}"); + } + indexShard.flush(new FlushRequest()); + closeShards(indexShard); + + final ShardPath shardPath = indexShard.shardPath(); + + final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), + RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE + ); + final IndexMetaData indexMetaData = indexShard.indexSettings().getIndexMetaData(); + + final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME); + + // create corrupted marker + final String corruptionMessage = "fake ioexception"; + try(Store store = createStore(indexShard.indexSettings(), shardPath)) { + store.markStoreCorrupted(new IOException(corruptionMessage)); + } + + // try to start shard on corrupted files + final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData, + null, null, indexShard.engineFactory, + indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + + final IndexShardRecoveryException exception1 = expectThrows(IndexShardRecoveryException.class, + () -> newStartedShard(p -> corruptedShard, true)); + assertThat(exception1.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)")); + closeShards(corruptedShard); + + final AtomicInteger corruptedMarkerCount = new AtomicInteger(); + final SimpleFileVisitor corruptedVisitor = new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) { + corruptedMarkerCount.incrementAndGet(); + } + return FileVisitResult.CONTINUE; + } + }; + Files.walkFileTree(indexPath, corruptedVisitor); + assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1)); + + // try to start another time shard on corrupted files + final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData, + null, null, indexShard.engineFactory, + indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + + final IndexShardRecoveryException exception2 = expectThrows(IndexShardRecoveryException.class, + () -> newStartedShard(p -> corruptedShard2, true)); + assertThat(exception2.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)")); + closeShards(corruptedShard2); + + // check that corrupt marker is there + corruptedMarkerCount.set(0); + Files.walkFileTree(indexPath, corruptedVisitor); + assertThat("store still has a single corrupt marker", corruptedMarkerCount.get(), equalTo(1)); + } + + private Path corruptIndexFile(ShardPath shardPath) throws IOException { + final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME); + final Path[] filesToCorrupt = + Files.walk(indexPath) + .filter(p -> { + final String name = p.getFileName().toString(); + return Files.isRegularFile(p) + && name.startsWith("extra") == false // Skip files added by Lucene's ExtrasFS + && IndexWriter.WRITE_LOCK_NAME.equals(name) == false + && name.startsWith("segments_") == false && name.endsWith(".si") == false; + }) + .toArray(Path[]::new); + CorruptionUtils.corruptFile(random(), filesToCorrupt); + return indexPath; + } + /** * Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService * and checking index concurrently. This should always be possible without any exception. @@ -2613,7 +2756,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData()) .settings(Settings.builder() .put(indexShard.indexSettings.getSettings()) - .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix"))) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum"))) .build(); final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData, null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); @@ -2655,6 +2798,16 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception { closeShards(newShard); } + public void testCheckOnStartupDeprecatedValue() throws Exception { + final Settings settings = Settings.builder().put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "fix").build(); + + final IndexShard newShard = newShard(true, settings); + closeShards(newShard); + + assertWarnings("Setting [index.shard.check_on_startup] is set to deprecated value [fix], " + + "which has no effect and will not be accepted in future"); + } + class Result { private final int localCheckpoint; private final int maxSeqNo; diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 2f4a3dfd6c12c..375e74f6cca3e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.uid.Versions; @@ -156,7 +157,6 @@ public Settings threadPoolSettings() { return Settings.EMPTY; } - protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex())); } @@ -169,7 +169,6 @@ public Directory newDirectory() throws IOException { } }; return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId)); - } /** @@ -179,7 +178,17 @@ public Directory newDirectory() throws IOException { * another shard) */ protected IndexShard newShard(boolean primary) throws IOException { - return newShard(primary, Settings.EMPTY, new InternalEngineFactory()); + return newShard(primary, Settings.EMPTY); + } + + /** + * Creates a new initializing shard. The shard will have its own unique data path. + * + * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from + * another shard) + */ + protected IndexShard newShard(final boolean primary, final Settings settings) throws IOException { + return newShard(primary, settings, new InternalEngineFactory()); } /** @@ -318,23 +327,25 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, * @param routing shard routing to use * @param shardPath path to use for shard data * @param indexMetaData indexMetaData for the shard, including any mapping - * @param store an optional custom store to use. If null a default file based store will be created + * @param storeProvider an optional custom store provider to use. If null a default file based store will be created * @param indexSearcherWrapper an optional wrapper to be used during searchers * @param globalCheckpointSyncer callback for syncing global checkpoints * @param indexEventListener index event listener * @param listeners an optional set of listeners to add to the shard */ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData, - @Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper, + @Nullable CheckedFunction storeProvider, + @Nullable IndexSearcherWrapper indexSearcherWrapper, @Nullable EngineFactory engineFactory, Runnable globalCheckpointSyncer, IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException { final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings); final IndexShard indexShard; - if (store == null) { - store = createStore(indexSettings, shardPath); + if (storeProvider == null) { + storeProvider = is -> createStore(is, shardPath); } + final Store store = storeProvider.apply(indexSettings); boolean success = false; try { IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null); @@ -424,7 +435,18 @@ protected IndexShard newStartedShard(final boolean primary) throws IOException { */ protected IndexShard newStartedShard( final boolean primary, final Settings settings, final EngineFactory engineFactory) throws IOException { - IndexShard shard = newShard(primary, settings, engineFactory); + return newStartedShard(p -> newShard(p, settings, engineFactory), primary); + } + + /** + * creates a new empty shard and starts it. + * + * @param shardFunction shard factory function + * @param primary controls whether the shard will be a primary or a replica. + */ + protected IndexShard newStartedShard(CheckedFunction shardFunction, + boolean primary) throws IOException { + IndexShard shard = shardFunction.apply(primary); if (primary) { recoverShardFromStore(shard); } else {