Skip to content

Commit

Permalink
drop index.shard.check_on_startup: fix
Browse files Browse the repository at this point in the history
Relates #31389
  • Loading branch information
Vladimir Dolzhenko committed Jul 23, 2018
1 parent ff87b7a commit 5828119
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 41 deletions.
6 changes: 0 additions & 6 deletions docs/reference/index-modules.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ corruption is detected, it will prevent the shard from being opened. Accepts:
Check for both physical and logical corruption. This is much more
expensive in terms of CPU and memory usage.

`fix`::

Check for both physical and logical corruption. Segments that were reported
as corrupted will be automatically removed. This option *may result in data loss*.
Use with extreme caution!

WARNING: Expert only. Checking shards may take a lot of time on large indices.
--

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ public final class IndexSettings {
switch(s) {
case "false":
case "true":
case "fix":
case "checksum":
return s;
default:
throw new IllegalArgumentException("unknown value for [index.shard.check_on_startup] must be one of [true, false, fix, checksum] but was: " + s);
throw new IllegalArgumentException("unknown value for [index.shard.check_on_startup] must be one of [true, false, checksum] but was: " + s);
}
}, Property.IndexScope);

Expand Down
18 changes: 5 additions & 13 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
}
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
// also check here, before we apply the translog
if (Booleans.isTrue(checkIndexOnStartup)) {
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
try {
checkIndex();
} catch (IOException ex) {
Expand Down Expand Up @@ -1889,6 +1889,9 @@ void checkIndex() throws IOException {
if (store.tryIncRef()) {
try {
doCheckIndex();
} catch (IOException e) {
store.markStoreCorrupted(e);
throw e;
} finally {
store.decRef();
}
Expand Down Expand Up @@ -1932,18 +1935,7 @@ private void doCheckIndex() throws IOException {
return;
}
logger.warn("check index [failure]\n{}", os.bytes().utf8ToString());
if ("fix".equals(checkIndexOnStartup)) {
if (logger.isDebugEnabled()) {
logger.debug("fixing index, writing new segments file ...");
}
store.exorciseIndex(status);
if (logger.isDebugEnabled()) {
logger.debug("index fixed, wrote new segments file \"{}\"", status.segmentsFileName);
}
} else {
// only throw a failure if we are not going to fix the index
throw new IllegalStateException("index check failure but can't fix it");
}
throw new IllegalStateException("index check failure");
}
}

Expand Down
15 changes: 2 additions & 13 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
static final int VERSION_START = 0;
static final int VERSION = VERSION_WRITE_THROWABLE;
static final String CORRUPTED = "corrupted_";
// public is for test purposes
public static final String CORRUPTED = "corrupted_";
public static final Setting<TimeValue> INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING =
Setting.timeSetting("index.store.stats_refresh_interval", TimeValue.timeValueSeconds(10), Property.IndexScope);

Expand Down Expand Up @@ -360,18 +361,6 @@ public CheckIndex.Status checkIndex(PrintStream out) throws IOException {
}
}

/**
* Repairs the index using the previous returned status from {@link #checkIndex(PrintStream)}.
*/
public void exorciseIndex(CheckIndex.Status status) throws IOException {
metadataLock.writeLock().lock();
try (CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.exorciseIndex(status);
} finally {
metadataLock.writeLock().unlock();
}
}

public StoreStats stats() throws IOException {
ensureOpen();
return new StoreStats(directory.estimateSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testIndexTemplateInvalidNumberOfShards() {
containsString("Failed to parse value [0] for setting [index.number_of_shards] must be >= 1"));
assertThat(throwables.get(0).getMessage(),
containsString("unknown value for [index.shard.check_on_startup] " +
"must be one of [true, false, fix, checksum] but was: blargh"));
"must be one of [true, false, checksum] but was: blargh"));
}

public void testIndexTemplateValidationAccumulatesValidationErrors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Constants;
import org.elasticsearch.Version;
Expand All @@ -51,6 +54,7 @@
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.CircuitBreaker;
Expand Down Expand Up @@ -91,6 +95,7 @@
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.translog.TestTranslog;
Expand All @@ -108,14 +113,19 @@
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotShardFailure;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -2480,6 +2490,106 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept
closeShards(newShard);
}

public void testIndexCheckChecksum() throws Exception {
final boolean primary = true;

IndexShard indexShard = newStartedShard(primary);

final long numDocs = between(10, 100);
for (long i = 0; i < numDocs; i++) {
indexDoc(indexShard, "_doc", Long.toString(i), "{}");
}
indexShard.flush(new FlushRequest());
closeShards(indexShard);

// start shard with checksum - it has to pass successfully

final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(),
primary ? RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE
);
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
.settings(Settings.builder()
.put(indexShard.indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum"))
.build();

final ShardPath shardPath = indexShard.shardPath();

final IndexShard newShard = newShard(shardRouting, shardPath, indexMetaData,
null, indexShard.engineFactory,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

closeShards(newStartedShard(p -> newShard, primary));

// corrupt files
final Path indexPath = shardPath.getDataPath().resolve("index");
final Path[] filesToCorrupt =
Files.walk(indexPath)
.filter(p -> Files.isRegularFile(p) && IndexWriter.WRITE_LOCK_NAME.equals(p.getFileName().toString()) == false)
.toArray(Path[]::new);
CorruptionUtils.corruptFile(random(), filesToCorrupt);

// check that corrupt marker is *NOT* there
final AtomicInteger corruptedMarkerCount = new AtomicInteger();
final SimpleFileVisitor<Path> corruptedVisitor = new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Files.isRegularFile(file) && file.getFileName().toString().startsWith(Store.CORRUPTED)) {
corruptedMarkerCount.incrementAndGet();
}
return FileVisitResult.CONTINUE;
}
};
Files.walkFileTree(indexPath, corruptedVisitor);

assertThat("store is clean",
corruptedMarkerCount.get(), equalTo(0));

// storeProvider that does not perform check index on close - it is corrupted
final CheckedFunction<IndexSettings, Store, IOException> storeProvider = indexSettings -> {
final ShardId shardId = shardPath.getShardId();
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
final BaseDirectoryWrapper baseDirectoryWrapper = newFSDirectory(shardPath.resolveIndex());
// index is corrupted - don't even try to check index on close - it fails
baseDirectoryWrapper.setCheckIndexOnClose(false);
return baseDirectoryWrapper;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
};

// try to start shard on corrupted files
final IndexShard corruptedShard = newShard(shardRouting, shardPath, indexMetaData,
null, indexShard.engineFactory,
storeProvider,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, primary));
closeShards(corruptedShard);

// check that corrupt marker is there
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store has to be marked as corrupted",
corruptedMarkerCount.get(), equalTo(1));

// try to start another time shard on corrupted files
final IndexShard corruptedShard2 = newShard(shardRouting, shardPath, indexMetaData,
null, indexShard.engineFactory,
storeProvider,
indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);

expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard2, primary));
closeShards(corruptedShard2);

// check that corrupt marker is there
corruptedMarkerCount.set(0);
Files.walkFileTree(indexPath, corruptedVisitor);
assertThat("store still has a single corrupt marker",
corruptedMarkerCount.get(), equalTo(1));
}

/**
* Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService
* and checking index concurrently. This should always be possible without any exception.
Expand All @@ -2503,7 +2613,7 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
final IndexMetaData indexMetaData = IndexMetaData.builder(indexShard.indexSettings().getIndexMetaData())
.settings(Settings.builder()
.put(indexShard.indexSettings.getSettings())
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum", "fix")))
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), randomFrom("false", "true", "checksum")))
.build();
final IndexShard newShard = newShard(shardRouting, indexShard.shardPath(), indexMetaData,
null, indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -179,12 +180,23 @@ public Directory newDirectory() throws IOException {
*
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica
* (ready to recover from another shard)
* @param indexSettings the {@link Settings} to use for this index
*/
protected IndexShard newShard(boolean primary) throws IOException {
protected IndexShard newShard(boolean primary, Settings indexSettings) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary,
ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting);
return newShard(shardRouting, indexSettings);
}

/**
* creates a new initializing shard. The shard will have its own unique data path.
*
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica
* (ready to recover from another shard)
*/
protected IndexShard newShard(boolean primary) throws IOException {
return newShard(primary, Settings.EMPTY);
}

/**
Expand All @@ -193,13 +205,29 @@ protected IndexShard newShard(boolean primary) throws IOException {
* @param shardRouting the {@link ShardRouting} to use for this shard
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(
final ShardRouting shardRouting,
final IndexingOperationListener... listeners) throws IOException {
return newShard(shardRouting, Settings.EMPTY, listeners);
}

/**
* creates a new initializing shard. The shard will have its own unique data path.
*
* @param shardRouting the {@link ShardRouting} to use for this shard
* @param indexSettings the {@link Settings} to use for this index
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(
final ShardRouting shardRouting,
final Settings indexSettings,
final IndexingOperationListener... listeners) throws IOException {
assert shardRouting.initializing() : shardRouting;
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(indexSettings)
.build();
IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName())
.settings(settings)
Expand Down Expand Up @@ -303,10 +331,32 @@ protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMe
@Nullable EngineFactory engineFactory,
Runnable globalCheckpointSyncer,
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
return newShard(routing, shardPath, indexMetaData, indexSearcherWrapper, engineFactory,
indexSettings -> createStore(indexSettings, shardPath),
globalCheckpointSyncer,
indexEventListener, listeners);
}

/**
* creates a new initializing shard.
* @param routing shard routing to use
* @param shardPath path to use for shard data
* @param indexMetaData indexMetaData for the shard, including any mapping
* @param indexSearcherWrapper an optional wrapper to be used during searchers
* @param globalCheckpointSyncer callback for syncing global checkpoints
* @param indexEventListener index even listener
* @param listeners an optional set of listeners to add to the shard
*/
protected IndexShard newShard(ShardRouting routing, ShardPath shardPath, IndexMetaData indexMetaData,
@Nullable IndexSearcherWrapper indexSearcherWrapper,
@Nullable EngineFactory engineFactory,
CheckedFunction<IndexSettings, Store, IOException> storeProvider,
Runnable globalCheckpointSyncer,
IndexEventListener indexEventListener, IndexingOperationListener... listeners) throws IOException {
final Settings nodeSettings = Settings.builder().put("node.name", routing.currentNodeId()).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, nodeSettings);
final IndexShard indexShard;
final Store store = createStore(indexSettings, shardPath);
final Store store = storeProvider.apply(indexSettings);
boolean success = false;
try {
IndexCache indexCache = new IndexCache(indexSettings, new DisabledQueryCache(indexSettings), null);
Expand Down Expand Up @@ -375,7 +425,18 @@ protected IndexShard newStartedShard() throws IOException {
* @param primary controls whether the shard will be a primary or a replica.
*/
protected IndexShard newStartedShard(boolean primary) throws IOException {
IndexShard shard = newShard(primary);
return newStartedShard(p -> newShard(p), primary);
}

/**
* creates a new empty shard and starts it.
*
* @param shardFunction shard factory function
* @param primary controls whether the shard will be a primary or a replica.
*/
protected IndexShard newStartedShard(CheckedFunction<Boolean, IndexShard, IOException> shardFunction,
boolean primary) throws IOException {
IndexShard shard = shardFunction.apply(primary);
if (primary) {
recoverShardFromStore(shard);
} else {
Expand Down

0 comments on commit 5828119

Please sign in to comment.