Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drop index.shard.check_on_startup: fix #32279

Merged
merged 21 commits into from
Aug 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
843f977
drop `index.shard.check_on_startup: fix`
Jul 23, 2018
4f01609
Merge remote-tracking branch 'remotes/origin/master' into fix/31389_1
Jul 31, 2018
153e4f2
create corrupted marker on `check_on_startup: true`; split testIndexC…
Aug 21, 2018
2964fef
Merge remote-tracking branch 'remotes/origin/master' into fix/31389_1
Aug 21, 2018
c71e306
create manually corruption marker (but don't corrupt index files) to …
Aug 21, 2018
a7668d6
checkstyle fix
Aug 21, 2018
97fa399
Merge remote-tracking branch 'remotes/origin/master' into fix/31389_1
Aug 24, 2018
c155b36
addressed unit test comments
Aug 27, 2018
85b7eef
keep `fix` for 6.x branch
Aug 27, 2018
3231803
added `fix` deprecation log message + test
Aug 28, 2018
c2b5b8a
added `fix` deprecation log message + test
Aug 28, 2018
14e6175
adjusted `fix` deprecation log message
Aug 28, 2018
fee8a5b
dropped `fix` to avoid deprecation warnings
Aug 28, 2018
5cee2b9
skip files added by Lucene's ExtrasFS
Aug 28, 2018
ad62da0
Merge remote-tracking branch 'remotes/origin/master' into fix/31389_1
Aug 28, 2018
6f6ca5a
Merge remote-tracking branch 'remotes/origin/master' into fix/31389_1
Aug 29, 2018
6763cf9
Merge remote-tracking branch 'remotes/origin/master' into fix/31389_1
Aug 29, 2018
5083e83
Merge remote-tracking branch 'remotes/origin/master' into fix/31389_1
Aug 31, 2018
2a9dbeb
resolved conflicts on Merge remote-tracking branch 'remotes/origin/ma…
Aug 31, 2018
aa16487
Merge remote-tracking branch 'remotes/origin/master' into fix/31389_1
Aug 31, 2018
d26fbfb
Merge remote-tracking branch 'remotes/origin/master' into fix/31389_1
Aug 31, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ corruption is detected, it will prevent the shard from being opened. Accepts:

`fix`::

Check for both physical and logical corruption. Segments that were reported
as corrupted will be automatically removed. This option *may result in data loss*.
Use with extreme caution!
The same as `false`. This option is deprecated and will be completely removed in 7.0.

WARNING: Expert only. Checking shards may take a lot of time on large indices.
--
Expand Down
22 changes: 9 additions & 13 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ public IndexShard(
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
if ("fix".equals(checkIndexOnStartup)) {
deprecationLogger.deprecated("Setting [index.shard.check_on_startup] is set to deprecated value [fix], "
+ "which has no effect and will not be accepted in future");
}
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
final String aId = shardRouting.allocationId().getId();
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger);
Expand Down Expand Up @@ -1325,7 +1329,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
}
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
// also check here, before we apply the translog
if (Booleans.isTrue(checkIndexOnStartup)) {
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
try {
checkIndex();
} catch (IOException ex) {
Expand Down Expand Up @@ -1933,6 +1937,9 @@ void checkIndex() throws IOException {
if (store.tryIncRef()) {
try {
doCheckIndex();
} catch (IOException e) {
store.markStoreCorrupted(e);
throw e;
} finally {
store.decRef();
}
Expand Down Expand Up @@ -1976,18 +1983,7 @@ private void doCheckIndex() throws IOException {
return;
}
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
if ("fix".equals(checkIndexOnStartup)) {
if (logger.isDebugEnabled()) {
logger.debug("fixing index, writing new segments file ...");
}
store.exorciseIndex(status);
if (logger.isDebugEnabled()) {
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
}
} else {
// only throw a failure if we are not going to fix the index
throw new IllegalStateException("index check failure but can't fix it");
}
throw new IOException("index check failure");
}
}

Expand Down
15 changes: 2 additions & 13 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
static final int VERSION_START = 0;
static final int VERSION = VERSION_WRITE_THROWABLE;
static final String CORRUPTED = "corrupted_";
// public is for test purposes
public static final String CORRUPTED = "corrupted_";
public static final Setting<TimeValue> INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING =
Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope);

Expand Down Expand Up @@ -360,18 +361,6 @@ public CheckIndex.Status checkIndex(PrintStream out) throws IOException {
}
}

/**
* Repairs the index using the previous returned status from {@link #checkIndex(PrintStream)}.
*/
public void exorciseIndex(CheckIndex.Status status) throws IOException {
metadataLock.writeLock().lock();
try (CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.exorciseIndex(status);
} finally {
metadataLock.writeLock().unlock();
}
}

public StoreStats stats() throws IOException {
ensureOpen();
return new StoreStats(directory.estimateSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
Expand Down Expand Up @@ -118,6 +119,7 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.VersionUtils;
Expand All @@ -126,7 +128,11 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1239,7 +1245,7 @@ public String[] listAll() throws IOException {
};

try (Store store = createStore(shardId, new IndexSettings(metaData, Settings.EMPTY), directory)) {
IndexShard shard = newShard(shardRouting, shardPath, metaData, store,
IndexShard shard = newShard(shardRouting, shardPath, metaData, i -> store,
null, new InternalEngineFactory(), () -> {
}, EMPTY_EVENT_LISTENER);
AtomicBoolean failureCallbackTriggered = new AtomicBoolean(false);
Expand Down Expand Up @@ -2590,6 +2596,143 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
closeShards(newShard);
}

public void testIndexCheckOnStartup() throws Exception {
final IndexShard indexShard = newStartedShard(true);

final long numDocs = between(10, 100);
for (long i = 0; i < numDocs; i++) {
indexDoc(indexShard, "_doc", Long.toString(i), "{}");
}
indexShard.flush(new FlushRequest());
closeShards(indexShard);

final ShardPath shardPath = indexShard.shardPath();

final Path indexPath = corruptIndexFile(shardPath);

final AtomicInteger corruptedMarkerCount = new AtomicInteger();
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
corruptedMarkerCount.incrementAndGet();
}
return FileVisitResult.CONTINUE;
}
};
Files.walkFileTree(indexPath, corruptedVisitor);

assertThat("corruption marker should not be there", corruptedMarkerCount.get(), equalTo(0));

final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE
);
// start shard and perform index check on startup. It enforce shard to fail due to corrupted index files
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
.settings(Settings.builder()
.put(indexShard.indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("true", "checksum")))
.build();

IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
null, null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

final IndexShardRecoveryException indexShardRecoveryException =
expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true));
assertThat(indexShardRecoveryException.getMessage(), equalTo("failed recovery"));

// check that corrupt marker is there
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1));

try {
closeShards(corruptedShard);
} catch (RuntimeException e) {
assertThat(e.getMessage(), equalTo("CheckIndex failed"));
}
}

public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception {
final IndexShard indexShard = newStartedShard(true);

final long numDocs = between(10, 100);
for (long i = 0; i < numDocs; i++) {
indexDoc(indexShard, "_doc", Long.toString(i), "{}");
}
indexShard.flush(new FlushRequest());
closeShards(indexShard);

final ShardPath shardPath = indexShard.shardPath();

final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE
);
final IndexMetaData indexMetaData = indexShard.indexSettings().getIndexMetaData();

final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);

// create corrupted marker
final String corruptionMessage = "fake ioexception";
try(Store store = createStore(indexShard.indexSettings(), shardPath)) {
store.markStoreCorrupted(new IOException(corruptionMessage));
}

// try to start shard on corrupted files
final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
null, null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

final IndexShardRecoveryException exception1 = expectThrows(IndexShardRecoveryException.class,
() -> newStartedShard(p -> corruptedShard, true));
assertThat(exception1.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)"));
closeShards(corruptedShard);

final AtomicInteger corruptedMarkerCount = new AtomicInteger();
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
corruptedMarkerCount.incrementAndGet();
}
return FileVisitResult.CONTINUE;
}
};
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store has to be marked as corrupted", corruptedMarkerCount.get(), equalTo(1));

// try to start another time shard on corrupted files
final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData,
null, null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

final IndexShardRecoveryException exception2 = expectThrows(IndexShardRecoveryException.class,
() -> newStartedShard(p -> corruptedShard2, true));
assertThat(exception2.getCause().getMessage(), equalTo(corruptionMessage + " (resource=preexisting_corruption)"));
closeShards(corruptedShard2);

// check that corrupt marker is there
corruptedMarkerCount.set(0);
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store still has a single corrupt marker", corruptedMarkerCount.get(), equalTo(1));
}

private Path corruptIndexFile(ShardPath shardPath) throws IOException {
final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME);
final Path[] filesToCorrupt =
Files.walk(indexPath)
.filter(p -> {
final String name = p.getFileName().toString();
return Files.isRegularFile(p)
&& name.startsWith("extra") == false // Skip files added by Lucene's ExtrasFS
&& IndexWriter.WRITE_LOCK_NAME.equals(name) == false
&& name.startsWith("segments_") == false && name.endsWith(".si") == false;
})
.toArray(Path[]::new);
CorruptionUtils.corruptFile(random(), filesToCorrupt);
return indexPath;
}

/**
* Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService
* and checking index concurrently. This should always be possible without any exception.
Expand All @@ -2613,7 +2756,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
.settings(Settings.builder()
.put(indexShard.indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum")))
.build();
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
null, null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
Expand Down Expand Up @@ -2655,6 +2798,16 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
closeShards(newShard);
}

public void testCheckOnStartupDeprecatedValue() throws Exception {
final Settings settings = Settings.builder().put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "fix").build();

final IndexShard newShard = newShard(true, settings);
closeShards(newShard);

assertWarnings("Setting [index.shard.check_on_startup] is set to deprecated value [fix], "
+ "which has no effect and will not be accepted in future");
}

class Result {
private final int localCheckpoint;
private final int maxSeqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -156,7 +157,6 @@ public Settings threadPoolSettings() {
return Settings.EMPTY;
}


protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException {
return createStore(shardPath.getShardId(), indexSettings, newFSDirectory(shardPath.resolveIndex()));
}
Expand All @@ -169,7 +169,6 @@ public Directory newDirectory() throws IOException {
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));

}

/**
Expand All @@ -179,7 +178,17 @@ public Directory newDirectory() throws IOException {
* another shard)
*/
protected IndexShard newShard(boolean primary) throws IOException {
return newShard(primary, Settings.EMPTY, new InternalEngineFactory());
return newShard(primary, Settings.EMPTY);
}

/**
* Creates a new initializing shard. The shard will have its own unique data path.
*
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from
* another shard)
*/
protected IndexShard newShard(final boolean primary, final Settings settings) throws IOException {
return newShard(primary, settings, new InternalEngineFactory());
}

/**
Expand Down Expand Up @@ -318,23 +327,25 @@ protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData,
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param store an optional custom store to use. If null a default file based store will be created
* @param storeProvider an optional custom store provider to use. If null a default file based store will be created
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index event listener
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
@Nullable Store store, @Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable CheckedFunction<IndexSettings, Store, IOException> storeProvider,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable EngineFactory engineFactory,
Runnable globalCheckpointSyncer,
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
final IndexShard indexShard;
if (store == null) {
store = createStore(indexSettings, shardPath);
if (storeProvider == null) {
storeProvider = is -> createStore(is, shardPath);
}
final Store store = storeProvider.apply(indexSettings);
boolean success = false;
try {
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
Expand Down Expand Up @@ -424,7 +435,18 @@ protected IndexShard newStartedShard(final boolean primary) throws IOException {
*/
protected IndexShard newStartedShard(
final boolean primary, final Settings settings, final EngineFactory engineFactory) throws IOException {
IndexShard shard = newShard(primary, settings, engineFactory);
return newStartedShard(p -> newShard(p, settings, engineFactory), primary);
}

/**
* creates a new empty shard and starts it.
*
* @param shardFunction shard factory function
* @param primary controls whether the shard will be a primary or a replica.
*/
protected IndexShard newStartedShard(CheckedFunction<Boolean, IndexShard, IOException> shardFunction,
boolean primary) throws IOException {
IndexShard shard = shardFunction.apply(primary);
if (primary) {
recoverShardFromStore(shard);
} else {
Expand Down