Skip to content

Commit

Permalink
drop index.shard.check_on_startup: fix (#32279)
Browse files Browse the repository at this point in the history
drop `index.shard.check_on_startup: fix`

Relates #31389
  • Loading branch information
vladimirdolzhenko authored Aug 31, 2018
1 parent d4f2b5b commit 3d82a30
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 39 deletions.
4 changes: 1 addition & 3 deletions docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ corruption is detected, it will prevent the shard from being opened. Accepts:

`fix`::

Check for both physical and logical corruption. Segments that were reported
as corrupted will be automatically removed. This option *may result in data loss*.
Use with extreme caution!
The same as `false`. This option is deprecated and will be completely removed in 7.0.

WARNING: Expert only. Checking shards may take a lot of time on large indices.
--
Expand Down
22 changes: 9 additions & 13 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ public IndexShard(
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
if ("fix".equals(checkIndexOnStartup)) {
deprecationLogger.deprecated("Setting [index.shard.check_on_startup] is set to deprecated value [fix], "
+ "which has no effect and will not be accepted in future");
}
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
final String aId = shardRouting.allocationId().getId();
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger);
Expand Down Expand Up @@ -1325,7 +1329,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
}
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
// also check here, before we apply the translog
if (Booleans.isTrue(checkIndexOnStartup)) {
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
try {
checkIndex();
} catch (IOException ex) {
Expand Down Expand Up @@ -1933,6 +1937,9 @@ void checkIndex() throws IOException {
if (store.tryIncRef()) {
try {
doCheckIndex();
} catch (IOException e) {
store.markStoreCorrupted(e);
throw e;
} finally {
store.decRef();
}
Expand Down Expand Up @@ -1976,18 +1983,7 @@ private void doCheckIndex() throws IOException {
return;
}
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
if ("fix".equals(checkIndexOnStartup)) {
if (logger.isDebugEnabled()) {
logger.debug("fixing index, writing new segments file ...");
}
store.exorciseIndex(status);
if (logger.isDebugEnabled()) {
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
}
} else {
// only throw a failure if we are not going to fix the index
throw new IllegalStateException("index check failure but can't fix it");
}
throw new IOException("index check failure");
}
}

Expand Down
15 changes: 2 additions & 13 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
static final int VERSION_START = 0;
static final int VERSION = VERSION_WRITE_THROWABLE;
static final String CORRUPTED = "corrupted_";
// public is for test purposes
public static final String CORRUPTED = "corrupted_";
public static final Setting<TimeValue> INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING =
Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope);

Expand Down Expand Up @@ -360,18 +361,6 @@ public CheckIndex.Status checkIndex(PrintStream out) throws IOException {
}
}

/**
* Repairs the index using the previous returned status from {@link #checkIndex(PrintStream)}.
*/
public void exorciseIndex(CheckIndex.Status status) throws IOException {
metadataLock.writeLock().lock();
try (CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.exorciseIndex(status);
} finally {
metadataLock.writeLock().unlock();
}
}

public StoreStats stats() throws IOException {
ensureOpen();
return new StoreStats(directory.estimateSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
Expand Down Expand Up @@ -118,6 +119,7 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.VersionUtils;
Expand All @@ -126,7 +128,11 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1239,7 +1245,7 @@ public String[] listAll() throws IOException {
};

try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
IndexShard shard = newShard(shardRouting, shardPath, metaData, store,
IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store,
null, new InternalEngineFactory(), () -> {
}, EMPTY_EVENT_LISTENER);
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
Expand Down Expand Up @@ -2590,6 +2596,143 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
closeShards(newShard);
}

public void testIndexCheckOnStartup() throws Exception {
final IndexShard indexShard = newStartedShard(true);

final long numDocs = between(10, 100);
for (long i = 0; i < numDocs; i++) {
indexDoc(indexShard, "_doc", Long.toString(i), "{}");
}
indexShard.flush(new FlushRequest());
closeShards(indexShard);

final ShardPath shardPath = indexShard.shardPath();

final Path indexPath = corruptIndexFile(shardPath);

final AtomicInteger corruptedMarkerCount = new AtomicInteger();
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
corruptedMarkerCount.incrementAndGet();
}
return FileVisitResult.CONTINUE;
}
};
Files.walkFileTree(indexPath, corruptedVisitor);

assertThat("corruption marker should not be there", corruptedMarkerCount.get(), equalTo(0));

final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE
);
// start shard and perform index check on startup. It enforce shard to fail due to corrupted index files
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
.settings(Settings.builder()
.put(indexShard.indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("true", "checksum")))
.build();

IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
null, null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

final IndexShardRecoveryException indexShardRecoveryException =
expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
assertThat(indexShardRecoveryException.getMessage(), equalTo("failed recovery"));

// check that corrupt marker is there
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1));

try {
closeShards(corruptedShard);
} catch (RuntimeException e) {
assertThat(e.getMessage(), equalTo("CheckIndex failed"));
}
}

public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception {
final IndexShard indexShard = newStartedShard(true);

final long numDocs = between(10, 100);
for (long i = 0; i < numDocs; i++) {
indexDoc(indexShard, "_doc", Long.toString(i), "{}");
}
indexShard.flush(new FlushRequest());
closeShards(indexShard);

final ShardPath shardPath = indexShard.shardPath();

final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE
);
final IndexMetaData indexMetaData = indexShard.indexSettings().getIndexMetaData();

final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);

// create corrupted marker
final String corruptionMessage = "fake ioexception";
try(Store store = createStore(indexShard.indexSettings(), shardPath)) {
store.markStoreCorrupted(new IOException(corruptionMessage));
}

// try to start shard on corrupted files
final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
null, null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

final IndexShardRecoveryException exception1 = expectThrows(IndexShardRecoveryException.class,
() -> newStartedShard(p -> corruptedShard, true));
assertThat(exception1.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)"));
closeShards(corruptedShard);

final AtomicInteger corruptedMarkerCount = new AtomicInteger();
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
corruptedMarkerCount.incrementAndGet();
}
return FileVisitResult.CONTINUE;
}
};
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1));

// try to start another time shard on corrupted files
final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData,
null, null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

final IndexShardRecoveryException exception2 = expectThrows(IndexShardRecoveryException.class,
() -> newStartedShard(p -> corruptedShard2, true));
assertThat(exception2.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)"));
closeShards(corruptedShard2);

// check that corrupt marker is there
corruptedMarkerCount.set(0);
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store still has a single corrupt marker", corruptedMarkerCount.get(), equalTo(1));
}

private Path corruptIndexFile(ShardPath shardPath) throws IOException {
final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
final Path[] filesToCorrupt =
Files.walk(indexPath)
.filter(p -> {
final String name = p.getFileName().toString();
return Files.isRegularFile(p)
&& name.startsWith("extra") == false // Skip files added by Lucene's ExtrasFS
&& IndexWriter.WRITE_LOCK_NAME.equals(name) == false
&& name.startsWith("segments_") == false && name.endsWith(".si") == false;
})
.toArray(Path[]::new);
CorruptionUtils.corruptFile(random(), filesToCorrupt);
return indexPath;
}

/**
* Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService
* and checking index concurrently. This should always be possible without any exception.
Expand All @@ -2613,7 +2756,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
.settings(Settings.builder()
.put(indexShard.indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum")))
.build();
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
Expand Down Expand Up @@ -2655,6 +2798,16 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
closeShards(newShard);
}

public void testCheckOnStartupDeprecatedValue() throws Exception {
final Settings settings = Settings.builder().put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "fix").build();

final IndexShard newShard = newShard(true, settings);
closeShards(newShard);

assertWarnings("Setting [index.shard.check_on_startup] is set to deprecated value [fix], "
+ "which has no effect and will not be accepted in future");
}

class Result {
private final int localCheckpoint;
private final int maxSeqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -156,7 +157,6 @@ public Settings threadPoolSettings() {
return Settings.EMPTY;
}


protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()));
}
Expand All @@ -169,7 +169,6 @@ public Directory newDirectory() throws IOException {
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));

}

/**
Expand All @@ -179,7 +178,17 @@ public Directory newDirectory() throws IOException {
* another shard)
*/
protected IndexShard newShard(boolean primary) throws IOException {
return newShard(primary, Settings.EMPTY, new InternalEngineFactory());
return newShard(primary, Settings.EMPTY);
}

/**
* Creates a new initializing shard. The shard will have its own unique data path.
*
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from
* another shard)
*/
protected IndexShard newShard(final boolean primary, final Settings settings) throws IOException {
return newShard(primary, settings, new InternalEngineFactory());
}

/**
Expand Down Expand Up @@ -318,23 +327,25 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param store an optional custom store to use. If null a default file based store will be created
* @param storeProvider an optional custom store provider to use. If null a default file based store will be created
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index event listener
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
@Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable CheckedFunction<IndexSettings, Store, IOException> storeProvider,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable EngineFactory engineFactory,
Runnable globalCheckpointSyncer,
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
final IndexShard indexShard;
if (store == null) {
store = createStore(indexSettings, shardPath);
if (storeProvider == null) {
storeProvider = is -> createStore(is, shardPath);
}
final Store store = storeProvider.apply(indexSettings);
boolean success = false;
try {
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
Expand Down Expand Up @@ -424,7 +435,18 @@ protected IndexShard newStartedShard(final boolean primary) throws IOException {
*/
protected IndexShard newStartedShard(
final boolean primary, final Settings settings, final EngineFactory engineFactory) throws IOException {
IndexShard shard = newShard(primary, settings, engineFactory);
return newStartedShard(p -> newShard(p, settings, engineFactory), primary);
}

/**
* creates a new empty shard and starts it.
*
* @param shardFunction shard factory function
* @param primary controls whether the shard will be a primary or a replica.
*/
protected IndexShard newStartedShard(CheckedFunction<Boolean, IndexShard, IOException> shardFunction,
boolean primary) throws IOException {
IndexShard shard = shardFunction.apply(primary);
if (primary) {
recoverShardFromStore(shard);
} else {
Expand Down

0 comments on commit 3d82a30

Please sign in to comment.