diff --git a/distribution/src/bin/elasticsearch-shard b/distribution/src/bin/elasticsearch-shard new file mode 100755 index 0000000000000..4c14a0434175b --- /dev/null +++ b/distribution/src/bin/elasticsearch-shard @@ -0,0 +1,5 @@ +#!/bin/bash + +ES_MAIN_CLASS=org.elasticsearch.index.shard.ShardToolCli \ + "`dirname "$0"`"/elasticsearch-cli \ + "$@" diff --git a/distribution/src/bin/elasticsearch-shard.bat b/distribution/src/bin/elasticsearch-shard.bat new file mode 100644 index 0000000000000..e861b197e873d --- /dev/null +++ b/distribution/src/bin/elasticsearch-shard.bat @@ -0,0 +1,12 @@ +@echo off + +setlocal enabledelayedexpansion +setlocal enableextensions + +set ES_MAIN_CLASS=org.elasticsearch.index.shard.ShardToolCli +call "%~dp0elasticsearch-cli.bat" ^ + %%* ^ + || exit /b 1 + +endlocal +endlocal diff --git a/docs/reference/commands/index.asciidoc b/docs/reference/commands/index.asciidoc index 134ac1edbd017..8f4d178a99296 100644 --- a/docs/reference/commands/index.asciidoc +++ b/docs/reference/commands/index.asciidoc @@ -12,6 +12,7 @@ tasks from the command line: * <> * <> * <> +* <> * <> * <> @@ -22,5 +23,6 @@ include::certutil.asciidoc[] include::migrate-tool.asciidoc[] include::saml-metadata.asciidoc[] include::setup-passwords.asciidoc[] +include::shard-tool.asciidoc[] include::syskeygen.asciidoc[] include::users-command.asciidoc[] diff --git a/docs/reference/commands/shard-tool.asciidoc b/docs/reference/commands/shard-tool.asciidoc new file mode 100644 index 0000000000000..6fca1355a27be --- /dev/null +++ b/docs/reference/commands/shard-tool.asciidoc @@ -0,0 +1,107 @@ +[[shard-tool]] +== elasticsearch-shard + +In some cases the Lucene index or translog of a shard copy can become +corrupted. The `elasticsearch-shard` command enables you to remove corrupted +parts of the shard if a good copy of the shard cannot be recovered +automatically or restored from backup. + +[WARNING] +You will lose the corrupted data when you run `elasticsearch-shard`. This tool +should only be used as a last resort if there is no way to recover from another +copy of the shard or restore a snapshot. + +When Elasticsearch detects that a shard's data is corrupted, it fails that +shard copy and refuses to use it. Under normal conditions, the shard is +automatically recovered from another copy. If no good copy of the shard is +available and you cannot restore from backup, you can use `elasticsearch-shard` +to remove the corrupted data and restore access to any remaining data in +unaffected segments. + +[WARNING] +Stop Elasticsearch before running `elasticsearch-shard`. + +To remove corrupted shard data use the `remove-corrupted-data` subcommand. + +There are two ways to specify the path: + +* Specify the index name and shard name with the `--index` and `--shard-id` + options. +* Use the `--dir` option to specify the full path to the corrupted index or + translog files. + +[float] +=== Removing corrupted data + +`elasticsearch-shard` analyses the shard copy and provides an overview of the +corruption found. To proceed you must then confirm that you want to remove the +corrupted data. + +[WARNING] +Back up your data before running `elasticsearch-shard`. This is a destructive +operation that removes corrupted data from the shard. + +[source,txt] +-------------------------------------------------- +$ bin/elasticsearch-shard remove-corrupted-data --index twitter --shard-id 0 + + + WARNING: Elasticsearch MUST be stopped before running this tool. + + Please make a complete backup of your index before using this tool. + + +Opening Lucene index at /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/index/ + + >> Lucene index is corrupted at /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/index/ + +Opening translog at /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/ + + + >> Translog is clean at /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/translog/ + + + Corrupted Lucene index segments found - 32 documents will be lost. + + WARNING: YOU WILL LOSE DATA. + +Continue and remove docs from the index ? Y + +WARNING: 1 broken segments (containing 32 documents) detected +Took 0.056 sec total. +Writing... +OK +Wrote new segments file "segments_c" +Marking index with the new history uuid : 0pIBd9VTSOeMfzYT6p0AsA +Changing allocation id V8QXk-QXSZinZMT-NvEq4w to tjm9Ve6uTBewVFAlfUMWjA + +You should run the following command to allocate this shard: + +POST /_cluster/reroute +{ + "commands" : [ + { + "allocate_stale_primary" : { + "index" : "index42", + "shard" : 0, + "node" : "II47uXW2QvqzHBnMcl2o_Q", + "accept_data_loss" : false + } + } + ] +} + +You must accept the possibility of data loss by changing parameter `accept_data_loss` to `true`. + +Deleted corrupt marker corrupted_FzTSBSuxT7i3Tls_TgwEag from /var/lib/elasticsearchdata/nodes/0/indices/P45vf_YQRhqjfwLMUvSqDw/0/index/ + +-------------------------------------------------- + +When you use `elasticsearch-shard` to drop the corrupted data, the shard's +allocation ID changes. After restarting the node, you must use the +<> to tell Elasticsearch to use the new +ID. The `elasticsearch-shard` command shows the request that +you need to submit. + +You can also use the `-h` option to get a list of all options and parameters +that the `elasticsearch-shard` tool supports. diff --git a/docs/reference/index-modules/translog.asciidoc b/docs/reference/index-modules/translog.asciidoc index bed19bd5be1df..713a352210054 100644 --- a/docs/reference/index-modules/translog.asciidoc +++ b/docs/reference/index-modules/translog.asciidoc @@ -92,6 +92,10 @@ The maximum duration for which translog files will be kept. Defaults to `12h`. [[corrupt-translog-truncation]] === What to do if the translog becomes corrupted? +[WARNING] +This tool is deprecated and will be completely removed in 7.0. +Use the <> instead of this one. + In some cases (a bad drive, user error) the translog on a shard copy can become corrupted. When this corruption is detected by Elasticsearch due to mismatching checksums, Elasticsearch will fail that shard copy and refuse to use that copy diff --git a/libs/cli/src/main/java/org/elasticsearch/cli/Terminal.java b/libs/cli/src/main/java/org/elasticsearch/cli/Terminal.java index d9923def6ca0a..a0ebff5d67041 100644 --- a/libs/cli/src/main/java/org/elasticsearch/cli/Terminal.java +++ b/libs/cli/src/main/java/org/elasticsearch/cli/Terminal.java @@ -85,12 +85,17 @@ public final void println(Verbosity verbosity, String msg) { /** Prints message to the terminal at {@code verbosity} level, without a newline. */ public final void print(Verbosity verbosity, String msg) { - if (this.verbosity.ordinal() >= verbosity.ordinal()) { + if (isPrintable(verbosity)) { getWriter().print(msg); getWriter().flush(); } } + /** Checks if is enough {@code verbosity} level to be printed */ + public final boolean isPrintable(Verbosity verbosity) { + return this.verbosity.ordinal() >= verbosity.ordinal(); + } + /** * Prompt for a yes or no answer from the user. This method will loop until 'y' or 'n' * (or the default empty value) is entered. diff --git a/qa/vagrant/src/main/java/org/elasticsearch/packaging/test/ArchiveTestCase.java b/qa/vagrant/src/main/java/org/elasticsearch/packaging/test/ArchiveTestCase.java index 83edc8a0a9390..0108f88ecd166 100644 --- a/qa/vagrant/src/main/java/org/elasticsearch/packaging/test/ArchiveTestCase.java +++ b/qa/vagrant/src/main/java/org/elasticsearch/packaging/test/ArchiveTestCase.java @@ -325,4 +325,21 @@ public void test90SecurityCliPackaging() { } } + public void test100RepairIndexCliPackaging() { + assumeThat(installation, is(notNullValue())); + + final Installation.Executables bin = installation.executables(); + final Shell sh = new Shell(); + + Platforms.PlatformAction action = () -> { + final Result result = sh.run(bin.elasticsearchShard + " help"); + assertThat(result.stdout, containsString("A CLI tool to remove corrupted parts of unrecoverable shards")); + }; + + if (distribution().equals(Distribution.DEFAULT_TAR) || distribution().equals(Distribution.DEFAULT_ZIP)) { + Platforms.onLinux(action); + Platforms.onWindows(action); + } + } + } diff --git a/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Archives.java b/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Archives.java index 9e9a453ca8422..45629f286fcc4 100644 --- a/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Archives.java +++ b/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Archives.java @@ -186,6 +186,7 @@ private static void verifyOssInstallation(Installation es, Distribution distribu "elasticsearch-env", "elasticsearch-keystore", "elasticsearch-plugin", + "elasticsearch-shard", "elasticsearch-translog" ).forEach(executable -> { diff --git a/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Installation.java b/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Installation.java index 8bc3fc6e14d3b..620ccd5e442d7 100644 --- a/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Installation.java +++ b/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Installation.java @@ -100,8 +100,9 @@ public class Executables { public final Path elasticsearch = platformExecutable("elasticsearch"); public final Path elasticsearchPlugin = platformExecutable("elasticsearch-plugin"); public final Path elasticsearchKeystore = platformExecutable("elasticsearch-keystore"); - public final Path elasticsearchTranslog = platformExecutable("elasticsearch-translog"); public final Path elasticsearchCertutil = platformExecutable("elasticsearch-certutil"); + public final Path elasticsearchShard = platformExecutable("elasticsearch-shard"); + public final Path elasticsearchTranslog = platformExecutable("elasticsearch-translog"); private Path platformExecutable(String name) { final String platformExecutableName = Platforms.WINDOWS diff --git a/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Packages.java b/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Packages.java index be7edc5e8f9e4..56de822316634 100644 --- a/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Packages.java +++ b/qa/vagrant/src/main/java/org/elasticsearch/packaging/util/Packages.java @@ -187,6 +187,7 @@ private static void verifyOssInstallation(Installation es, Distribution distribu "elasticsearch", "elasticsearch-plugin", "elasticsearch-keystore", + "elasticsearch-shard", "elasticsearch-translog" ).forEach(executable -> assertThat(es.bin(executable), file(File, "root", "root", p755))); diff --git a/qa/vagrant/src/test/resources/packaging/utils/packages.bash b/qa/vagrant/src/test/resources/packaging/utils/packages.bash index 57f1ebd1c6106..f6ba68d84d483 100644 --- a/qa/vagrant/src/test/resources/packaging/utils/packages.bash +++ b/qa/vagrant/src/test/resources/packaging/utils/packages.bash @@ -95,6 +95,7 @@ verify_package_installation() { assert_file "$ESHOME/bin" d root root 755 assert_file "$ESHOME/bin/elasticsearch" f root root 755 assert_file "$ESHOME/bin/elasticsearch-plugin" f root root 755 + assert_file "$ESHOME/bin/elasticsearch-shard" f root root 755 assert_file "$ESHOME/bin/elasticsearch-translog" f root root 755 assert_file "$ESHOME/lib" d root root 755 assert_file "$ESCONFIG" d root elasticsearch 2750 diff --git a/qa/vagrant/src/test/resources/packaging/utils/tar.bash b/qa/vagrant/src/test/resources/packaging/utils/tar.bash index 4ded1f73514b2..23901cbae99b7 100644 --- a/qa/vagrant/src/test/resources/packaging/utils/tar.bash +++ b/qa/vagrant/src/test/resources/packaging/utils/tar.bash @@ -94,6 +94,7 @@ verify_archive_installation() { assert_file "$ESHOME/bin/elasticsearch-env" f elasticsearch elasticsearch 755 assert_file "$ESHOME/bin/elasticsearch-keystore" f elasticsearch elasticsearch 755 assert_file "$ESHOME/bin/elasticsearch-plugin" f elasticsearch elasticsearch 755 + assert_file "$ESHOME/bin/elasticsearch-shard" f elasticsearch elasticsearch 755 assert_file "$ESHOME/bin/elasticsearch-translog" f elasticsearch elasticsearch 755 assert_file "$ESCONFIG" d elasticsearch elasticsearch 755 assert_file "$ESCONFIG/elasticsearch.yml" f elasticsearch elasticsearch 660 diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 29d3207c73ac2..538a8edd995ae 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -30,6 +30,8 @@ import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.SimpleFSDirectory; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -75,6 +77,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static java.util.Collections.unmodifiableSet; @@ -171,6 +174,63 @@ public String toString() { public static final String INDICES_FOLDER = "indices"; public static final String NODE_LOCK_FILENAME = "node.lock"; + public static class NodeLock implements Releasable { + + private final int nodeId; + private final Lock[] locks; + private final NodePath[] nodePaths; + + /** + * Tries to acquire a node lock for a node id, throws {@code IOException} if it is unable to acquire it + * @param pathFunction function to check node path before attempt of acquiring a node lock + */ + public NodeLock(final int nodeId, final Logger logger, + final Environment environment, + final CheckedFunction pathFunction) throws IOException { + this.nodeId = nodeId; + nodePaths = new NodePath[environment.dataWithClusterFiles().length]; + locks = new Lock[nodePaths.length]; + try { + final Path[] dataPaths = environment.dataFiles(); + for (int dirIndex = 0; dirIndex < dataPaths.length; dirIndex++) { + Path dataDir = dataPaths[dirIndex]; + Path dir = resolveNodePath(dataDir, nodeId); + if (pathFunction.apply(dir) == false) { + continue; + } + try (Directory luceneDir = FSDirectory.open(dir, NativeFSLockFactory.INSTANCE)) { + logger.trace("obtaining node lock on {} ...", dir.toAbsolutePath()); + locks[dirIndex] = luceneDir.obtainLock(NODE_LOCK_FILENAME); + nodePaths[dirIndex] = new NodePath(dir); + } catch (IOException e) { + logger.trace(() -> new ParameterizedMessage( + "failed to obtain node lock on {}", dir.toAbsolutePath()), e); + // release all the ones that were obtained up until now + throw (e instanceof LockObtainFailedException ? e + : new IOException("failed to obtain lock on " + dir.toAbsolutePath(), e)); + } + } + } catch (IOException e) { + close(); + throw e; + } + } + + public NodePath[] getNodePaths() { + return nodePaths; + } + + @Override + public void close() { + for (int i = 0; i < locks.length; i++) { + if (locks[i] != null) { + IOUtils.closeWhileHandlingException(locks[i]); + } + locks[i] = null; + } + } + } + /** * Setup the environment. * @param settings settings from elasticsearch.yml @@ -188,51 +248,39 @@ public NodeEnvironment(Settings settings, Environment environment, Consumer new ParameterizedMessage( - "failed to obtain node lock on {}", dir.toAbsolutePath()), e); - lastException = new IOException("failed to obtain lock on " + dir.toAbsolutePath(), e); - // release all the ones that were obtained up until now - releaseAndNullLocks(locks); - break; - } - } - if (locks[0] != null) { - // we found a lock, break + final AtomicReference onCreateDirectoriesException = new AtomicReference<>(); + for (int possibleLockId = 0; possibleLockId < maxLocalStorageNodes; possibleLockId++) { + try { + nodeLock = new NodeLock(possibleLockId, logger, environment, + dir -> { + try { + Files.createDirectories(dir); + } catch (IOException e) { + onCreateDirectoriesException.set(e); + throw e; + } + return true; + }); break; + } catch (LockObtainFailedException e) { + // ignore any LockObtainFailedException + } catch (IOException e) { + if (onCreateDirectoriesException.get() != null) { + throw onCreateDirectoriesException.get(); + } + lastException = e; } } - if (locks[0] == null) { + if (nodeLock == null) { final String message = String.format( Locale.ROOT, "failed to obtain node locks, tried [%s] with lock id%s;" + @@ -243,13 +291,12 @@ public NodeEnvironment(Settings settings, Environment environment, Consumer getCleanStatus(ShardPath shardPath, + Directory indexDirectory, + Lock writeLock, + PrintStream printStream, + boolean verbose) throws IOException { + if (RemoveCorruptedShardDataCommand.isCorruptMarkerFileIsPresent(indexDirectory) == false) { + return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CLEAN, null); + } + + final CheckIndex.Status status; + try (CheckIndex checker = new CheckIndex(indexDirectory, writeLock)) { + checker.setChecksumsOnly(true); + checker.setInfoStream(printStream, verbose); + + status = checker.checkIndex(null); + + if (status.missingSegments) { + return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.UNRECOVERABLE, + "Index is unrecoverable - there are missing segments"); + } + + return status.clean + ? Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CLEAN_WITH_CORRUPTED_MARKER, null) + : Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CORRUPTED, + "Corrupted Lucene index segments found - " + status.totLoseDocCount + " documents will be lost."); + } + } + + public void execute(Terminal terminal, + ShardPath shardPath, + Directory indexDirectory, + Lock writeLock, + PrintStream printStream, + boolean verbose) throws IOException { + checkCorruptMarkerFileIsPresent(indexDirectory); + + final CheckIndex.Status status; + try (CheckIndex checker = new CheckIndex(indexDirectory, writeLock)) { + + checker.setChecksumsOnly(true); + checker.setInfoStream(printStream, verbose); + + status = checker.checkIndex(null); + + if (status.missingSegments == false) { + if (status.clean == false) { + terminal.println("Writing..."); + checker.exorciseIndex(status); + + terminal.println("OK"); + terminal.println("Wrote new segments file \"" + status.segmentsFileName + "\""); + } + } else { + throw new ElasticsearchException("Index is unrecoverable - there are missing segments"); + } + } + } + + protected void checkCorruptMarkerFileIsPresent(Directory directory) throws IOException { + if (RemoveCorruptedShardDataCommand.isCorruptMarkerFileIsPresent(directory) == false) { + throw new ElasticsearchException("There is no corruption file marker"); + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java new file mode 100644 index 0000000000000..de22903efb334 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java @@ -0,0 +1,545 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.store.NativeFSLockFactory; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cli.EnvironmentAwareCommand; +import org.elasticsearch.cli.Terminal; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.NodeMetaData; +import org.elasticsearch.gateway.MetaDataStateFormat; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.TruncateTranslogAction; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class RemoveCorruptedShardDataCommand extends EnvironmentAwareCommand { + + private static final Logger logger = Loggers.getLogger(RemoveCorruptedShardDataCommand.class); + + private final OptionSpec folderOption; + private final OptionSpec indexNameOption; + private final OptionSpec shardIdOption; + + private final RemoveCorruptedLuceneSegmentsAction removeCorruptedLuceneSegmentsAction; + private final TruncateTranslogAction truncateTranslogAction; + private final NamedXContentRegistry namedXContentRegistry; + + public RemoveCorruptedShardDataCommand() { + this(false); + } + + public RemoveCorruptedShardDataCommand(boolean translogOnly) { + super("Removes corrupted shard files"); + + folderOption = parser.acceptsAll(Arrays.asList("d", "dir"), + "Index directory location on disk") + .withRequiredArg(); + + indexNameOption = parser.accepts("index", "Index name") + .withRequiredArg(); + + shardIdOption = parser.accepts("shard-id", "Shard id") + .withRequiredArg() + .ofType(Integer.class); + + namedXContentRegistry = new NamedXContentRegistry(ClusterModule.getNamedXWriteables()); + + removeCorruptedLuceneSegmentsAction = translogOnly ? null : new RemoveCorruptedLuceneSegmentsAction(); + truncateTranslogAction = new TruncateTranslogAction(namedXContentRegistry); + } + + @Override + protected void printAdditionalHelp(Terminal terminal) { + if (removeCorruptedLuceneSegmentsAction == null) { + // that's only for 6.x branch for bwc with elasticsearch-translog + terminal.println("This tool truncates the translog and translog checkpoint files to create a new translog"); + } else { + terminal.println("This tool attempts to detect and remove unrecoverable corrupted data in a shard."); + } + } + + // Visible for testing + public OptionParser getParser() { + return this.parser; + } + + @SuppressForbidden(reason = "Necessary to use the path passed in") + protected Path getPath(String dirValue) { + return PathUtils.get(dirValue, "", ""); + } + + protected void findAndProcessShardPath(OptionSet options, Environment environment, CheckedConsumer consumer) + throws IOException { + final Settings settings = environment.settings(); + + final String indexName; + final int shardId; + final int fromNodeId; + final int toNodeId; + + if (options.has(folderOption)) { + final Path path = getPath(folderOption.value(options)).getParent(); + final Path shardParent = path.getParent(); + final Path shardParentParent = shardParent.getParent(); + final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME); + if (Files.exists(indexPath) == false || Files.isDirectory(indexPath) == false) { + throw new ElasticsearchException("index directory [" + indexPath + "], must exist and be a directory"); + } + + final IndexMetaData indexMetaData = + IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, shardParent); + + final String shardIdFileName = path.getFileName().toString(); + final String nodeIdFileName = shardParentParent.getParent().getFileName().toString(); + if (Files.isDirectory(path) && shardIdFileName.chars().allMatch(Character::isDigit) // SHARD-ID path element check + && NodeEnvironment.INDICES_FOLDER.equals(shardParentParent.getFileName().toString()) // `indices` check + && nodeIdFileName.chars().allMatch(Character::isDigit) // NODE-ID check + && NodeEnvironment.NODES_FOLDER.equals(shardParentParent.getParent().getParent().getFileName().toString()) // `nodes` check + ) { + shardId = Integer.parseInt(shardIdFileName); + indexName = indexMetaData.getIndex().getName(); + fromNodeId = Integer.parseInt(nodeIdFileName); + toNodeId = fromNodeId + 1; + } else { + throw new ElasticsearchException("Unable to resolve shard id. Wrong folder structure at [ " + path.toString() + + " ], expected .../nodes/[NODE-ID]/indices/[INDEX-UUID]/[SHARD-ID]"); + } + } else { + // otherwise resolve shardPath based on the index name and shard id + indexName = Objects.requireNonNull(indexNameOption.value(options), "Index name is required"); + shardId = Objects.requireNonNull(shardIdOption.value(options), "Shard ID is required"); + + // resolve shard path in case of multi-node layout per environment + fromNodeId = 0; + toNodeId = NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.get(settings); + } + + // have to iterate over possibleLockId as NodeEnvironment; on a contrast to it - we have to fail if node is busy + for (int possibleLockId = fromNodeId; possibleLockId < toNodeId; possibleLockId++) { + try { + try (NodeEnvironment.NodeLock nodeLock = new NodeEnvironment.NodeLock(possibleLockId, logger, environment, Files::exists)) { + final NodeEnvironment.NodePath[] nodePaths = nodeLock.getNodePaths(); + for (NodeEnvironment.NodePath nodePath : nodePaths) { + if (Files.exists(nodePath.indicesPath)) { + // have to scan all index uuid folders to resolve from index name + try (DirectoryStream stream = Files.newDirectoryStream(nodePath.indicesPath)) { + for (Path file : stream) { + if (Files.exists(file.resolve(MetaDataStateFormat.STATE_DIR_NAME)) == false) { + continue; + } + + final IndexMetaData indexMetaData = + IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, file); + if (indexMetaData == null) { + continue; + } + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + final Index index = indexMetaData.getIndex(); + if (indexName.equals(index.getName()) == false) { + continue; + } + final ShardId shId = new ShardId(index, shardId); + + final Path shardPathLocation = nodePath.resolve(shId); + if (Files.exists(shardPathLocation) == false) { + continue; + } + final ShardPath shardPath = ShardPath.loadShardPath(logger, shId, indexSettings, + new Path[]{shardPathLocation}, possibleLockId, nodePath.path); + if (shardPath != null) { + consumer.accept(shardPath); + return; + } + } + } + } + } + } + } catch (LockObtainFailedException lofe) { + throw new ElasticsearchException("Failed to lock node's directory [" + lofe.getMessage() + + "], is Elasticsearch still running ?"); + } + } + throw new ElasticsearchException("Unable to resolve shard path for index [" + indexName + "] and shard id [" + shardId + "]"); + } + + public static boolean isCorruptMarkerFileIsPresent(final Directory directory) throws IOException { + boolean found = false; + + final String[] files = directory.listAll(); + for (String file : files) { + if (file.startsWith(Store.CORRUPTED)) { + found = true; + break; + } + } + + return found; + } + + protected void dropCorruptMarkerFiles(Terminal terminal, Path path, Directory directory, boolean clean) throws IOException { + if (clean) { + confirm("This shard has been marked as corrupted but no corruption can now be detected.\n" + + "This may indicate an intermittent hardware problem. The corruption marker can be \n" + + "removed, but there is a risk that data has been undetectably lost.\n\n" + + "Are you taking a risk of losing documents and proceed with removing a corrupted marker ?", + terminal); + } + String[] files = directory.listAll(); + boolean found = false; + for (String file : files) { + if (file.startsWith(Store.CORRUPTED)) { + directory.deleteFile(file); + + terminal.println("Deleted corrupt marker " + file + " from " + path); + } + } + } + + private static void loseDataDetailsBanner(Terminal terminal, Tuple cleanStatus) { + if (cleanStatus.v2() != null) { + terminal.println(""); + terminal.println(" " + cleanStatus.v2()); + terminal.println(""); + } + } + + private static void confirm(String msg, Terminal terminal) { + terminal.println(msg); + String text = terminal.readText("Confirm [y/N] "); + if (text.equalsIgnoreCase("y") == false) { + throw new ElasticsearchException("aborted by user"); + } + } + + private void warnAboutESShouldBeStopped(Terminal terminal) { + terminal.println("-----------------------------------------------------------------------"); + terminal.println(""); + terminal.println(" WARNING: Elasticsearch MUST be stopped before running this tool."); + terminal.println(""); + // that's only for 6.x branch for bwc with elasticsearch-translog + if (removeCorruptedLuceneSegmentsAction == null) { + terminal.println(" This tool is deprecated and will be completely removed in 7.0."); + terminal.println(" It is replaced by the elasticsearch-shard tool. "); + terminal.println(""); + } + terminal.println(" Please make a complete backup of your index before using this tool."); + terminal.println(""); + terminal.println("-----------------------------------------------------------------------"); + } + + // Visible for testing + @Override + public void execute(Terminal terminal, OptionSet options, Environment environment) throws Exception { + warnAboutESShouldBeStopped(terminal); + + findAndProcessShardPath(options, environment, shardPath -> { + final Path indexPath = shardPath.resolveIndex(); + final Path translogPath = shardPath.resolveTranslog(); + final Path nodePath = getNodePath(shardPath); + if (Files.exists(translogPath) == false || Files.isDirectory(translogPath) == false) { + throw new ElasticsearchException("translog directory [" + translogPath + "], must exist and be a directory"); + } + + final PrintWriter writer = terminal.getWriter(); + final PrintStream printStream = new PrintStream(new OutputStream() { + @Override + public void write(int b) { + writer.write(b); + } + }, false, "UTF-8"); + final boolean verbose = terminal.isPrintable(Terminal.Verbosity.VERBOSE); + + final Directory indexDirectory = getDirectory(indexPath); + + final Tuple indexCleanStatus; + final Tuple translogCleanStatus; + try (Directory indexDir = indexDirectory) { + // keep the index lock to block any runs of older versions of this tool + try (Lock writeIndexLock = indexDir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + ////////// Index + // that's only for 6.x branch for bwc with elasticsearch-translog + if (removeCorruptedLuceneSegmentsAction != null) { + terminal.println(""); + terminal.println("Opening Lucene index at " + indexPath); + terminal.println(""); + try { + indexCleanStatus = removeCorruptedLuceneSegmentsAction.getCleanStatus(shardPath, indexDir, + writeIndexLock, printStream, verbose); + } catch (Exception e) { + terminal.println(e.getMessage()); + throw e; + } + + terminal.println(""); + terminal.println(" >> Lucene index is " + indexCleanStatus.v1().getMessage() + " at " + indexPath); + terminal.println(""); + } else { + indexCleanStatus = Tuple.tuple(CleanStatus.CLEAN, null); + } + + ////////// Translog + // as translog relies on data stored in an index commit - we have to have non unrecoverable index to truncate translog + if (indexCleanStatus.v1() != CleanStatus.UNRECOVERABLE) { + terminal.println(""); + terminal.println("Opening translog at " + translogPath); + terminal.println(""); + try { + translogCleanStatus = truncateTranslogAction.getCleanStatus(shardPath, indexDir); + } catch (Exception e) { + terminal.println(e.getMessage()); + throw e; + } + + terminal.println(""); + terminal.println(" >> Translog is " + translogCleanStatus.v1().getMessage() + " at " + translogPath); + terminal.println(""); + } else { + translogCleanStatus = Tuple.tuple(CleanStatus.UNRECOVERABLE, null); + } + + ////////// Drop corrupted data + final CleanStatus indexStatus = indexCleanStatus.v1(); + final CleanStatus translogStatus = translogCleanStatus.v1(); + + if (indexStatus == CleanStatus.CLEAN && translogStatus == CleanStatus.CLEAN) { + throw new ElasticsearchException("Shard does not seem to be corrupted at " + shardPath.getDataPath()); + } + + if (indexStatus == CleanStatus.UNRECOVERABLE) { + if (indexCleanStatus.v2() != null) { + terminal.println("Details: " + indexCleanStatus.v2()); + } + + terminal.println("You can allocate a new, empty, primary shard with the following command:"); + + printRerouteCommand(shardPath, terminal, false); + + throw new ElasticsearchException("Index is unrecoverable"); + } + + + terminal.println("-----------------------------------------------------------------------"); + if (indexStatus != CleanStatus.CLEAN) { + loseDataDetailsBanner(terminal, indexCleanStatus); + } + if (translogStatus != CleanStatus.CLEAN) { + loseDataDetailsBanner(terminal, translogCleanStatus); + } + terminal.println(" WARNING: YOU MAY LOSE DATA."); + terminal.println("-----------------------------------------------------------------------"); + + + confirm("Continue and remove corrupted data from the shard ?", terminal); + + if (indexStatus != CleanStatus.CLEAN) { + removeCorruptedLuceneSegmentsAction.execute(terminal, shardPath, indexDir, + writeIndexLock, printStream, verbose); + } + + if (translogStatus != CleanStatus.CLEAN) { + truncateTranslogAction.execute(terminal, shardPath, indexDir); + } + } catch (LockObtainFailedException lofe) { + final String msg = "Failed to lock shard's directory at [" + indexPath + "], is Elasticsearch still running?"; + terminal.println(msg); + throw new ElasticsearchException(msg); + } + + final CleanStatus indexStatus = indexCleanStatus.v1(); + final CleanStatus translogStatus = translogCleanStatus.v1(); + + // newHistoryCommit obtains its own lock + addNewHistoryCommit(indexDir, terminal, translogStatus != CleanStatus.CLEAN); + newAllocationId(environment, shardPath, terminal); + if (indexStatus != CleanStatus.CLEAN) { + dropCorruptMarkerFiles(terminal, indexPath, indexDir, indexStatus == CleanStatus.CLEAN_WITH_CORRUPTED_MARKER); + } + } + }); + } + + private Directory getDirectory(Path indexPath) { + Directory directory; + try { + directory = FSDirectory.open(indexPath, NativeFSLockFactory.INSTANCE); + } catch (Throwable t) { + throw new ElasticsearchException("ERROR: could not open directory \"" + indexPath + "\"; exiting"); + } + return directory; + } + + protected void addNewHistoryCommit(Directory indexDirectory, Terminal terminal, boolean updateLocalCheckpoint) throws IOException { + final String historyUUID = UUIDs.randomBase64UUID(); + + terminal.println("Marking index with the new history uuid : " + historyUUID); + // commit the new history id + final IndexWriterConfig iwc = new IndexWriterConfig(null) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setCommitOnClose(false) + .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND); + // IndexWriter acquires directory lock by its own + try (IndexWriter indexWriter = new IndexWriter(indexDirectory, iwc)) { + final Map userData = new HashMap<>(); + indexWriter.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); + + if (updateLocalCheckpoint) { + // In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit. + // We can only safely do it because we will generate a new history uuid this shard. + final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet()); + // Also advances the local checkpoint of the last commit to its max_seqno. + userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(commitInfo.maxSeqNo)); + } + + // commit the new history id + userData.put(Engine.HISTORY_UUID_KEY, historyUUID); + + indexWriter.setLiveCommitData(userData.entrySet()); + indexWriter.commit(); + } + } + + protected void newAllocationId(Environment environment, ShardPath shardPath, Terminal terminal) throws IOException { + final Path shardStatePath = shardPath.getShardStatePath(); + final ShardStateMetaData shardStateMetaData = + ShardStateMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, shardStatePath); + + if (shardStateMetaData == null) { + throw new ElasticsearchException("No shard state meta data at " + shardStatePath); + } + + final AllocationId newAllocationId = AllocationId.newInitializing(); + + terminal.println("Changing allocation id " + shardStateMetaData.allocationId.getId() + + " to " + newAllocationId.getId()); + + final ShardStateMetaData newShardStateMetaData = + new ShardStateMetaData(shardStateMetaData.primary, shardStateMetaData.indexUUID, newAllocationId); + + ShardStateMetaData.FORMAT.write(newShardStateMetaData, shardStatePath); + + terminal.println(""); + terminal.println("You should run the following command to allocate this shard:"); + + printRerouteCommand(shardPath, terminal, true); + } + + private void printRerouteCommand(ShardPath shardPath, Terminal terminal, boolean allocateStale) throws IOException { + final IndexMetaData indexMetaData = + IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, + shardPath.getDataPath().getParent()); + + final Path nodePath = getNodePath(shardPath); + final NodeMetaData nodeMetaData = + NodeMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodePath); + + if (nodeMetaData == null) { + throw new ElasticsearchException("No node meta data at " + nodePath); + } + + final String nodeId = nodeMetaData.nodeId(); + final String index = indexMetaData.getIndex().getName(); + final int id = shardPath.getShardId().id(); + final AllocationCommands commands = new AllocationCommands( + allocateStale + ? new AllocateStalePrimaryAllocationCommand(index, id, nodeId, false) + : new AllocateEmptyPrimaryAllocationCommand(index, id, nodeId, false)); + + terminal.println(""); + terminal.println("POST /_cluster/reroute'\n" + + Strings.toString(commands, true, true) + "'"); + terminal.println(""); + terminal.println("You must accept the possibility of data loss by changing parameter `accept_data_loss` to `true`."); + terminal.println(""); + } + + private Path getNodePath(ShardPath shardPath) { + final Path nodePath = shardPath.getDataPath().getParent().getParent().getParent(); + if (Files.exists(nodePath) == false || Files.exists(nodePath.resolve(MetaDataStateFormat.STATE_DIR_NAME)) == false) { + throw new ElasticsearchException("Unable to resolve node path for " + shardPath); + } + return nodePath; + } + + public enum CleanStatus { + CLEAN("clean"), + CLEAN_WITH_CORRUPTED_MARKER("marked corrupted, but no corruption detected"), + CORRUPTED("corrupted"), + UNRECOVERABLE("corrupted and unrecoverable"); + + private final String msg; + + CleanStatus(String msg) { + this.msg = msg; + } + + public String getMessage() { + return msg; + } + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java index 7cb719e41f433..ffce215646443 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java @@ -112,16 +112,31 @@ public boolean isCustomDataPath() { * Note: this method resolves custom data locations for the shard. */ public static ShardPath loadShardPath(Logger logger, NodeEnvironment env, ShardId shardId, IndexSettings indexSettings) throws IOException { - final String indexUUID = indexSettings.getUUID(); final Path[] paths = env.availableShardPaths(shardId); + final int nodeLockId = env.getNodeLockId(); + final Path sharedDataPath = env.sharedDataPath(); + return loadShardPath(logger, shardId, indexSettings, paths, nodeLockId, sharedDataPath); + } + + /** + * This method walks through the nodes shard paths to find the data and state path for the given shard. If multiple + * directories with a valid shard state exist the one with the highest version will be used. + * Note: this method resolves custom data locations for the shard. + */ + public static ShardPath loadShardPath(Logger logger, ShardId shardId, IndexSettings indexSettings, Path[] availableShardPaths, + int nodeLockId, Path sharedDataPath) throws IOException { + final String indexUUID = indexSettings.getUUID(); Path loadedPath = null; - for (Path path : paths) { + for (Path path : availableShardPaths) { // EMPTY is safe here because we never call namedObject ShardStateMetaData load = ShardStateMetaData.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path); if (load != null) { if (load.indexUUID.equals(indexUUID) == false && IndexMetaData.INDEX_UUID_NA_VALUE.equals(load.indexUUID) == false) { - logger.warn("{} found shard on path: [{}] with a different index UUID - this shard seems to be leftover from a different index with the same name. Remove the leftover shard in order to reuse the path with the current index", shardId, path); - throw new IllegalStateException(shardId + " index UUID in shard state was: " + load.indexUUID + " expected: " + indexUUID + " on shard path: " + path); + logger.warn("{} found shard on path: [{}] with a different index UUID - this " + + "shard seems to be leftover from a different index with the same name. " + + "Remove the leftover shard in order to reuse the path with the current index", shardId, path); + throw new IllegalStateException(shardId + " index UUID in shard state was: " + load.indexUUID + + " expected: " + indexUUID + " on shard path: " + path); } if (loadedPath == null) { loadedPath = path; @@ -137,7 +152,7 @@ public static ShardPath loadShardPath(Logger logger, NodeEnvironment env, ShardI final Path dataPath; final Path statePath = loadedPath; if (indexSettings.hasCustomDataPath()) { - dataPath = env.resolveCustomLocation(indexSettings, shardId); + dataPath = NodeEnvironment.resolveCustomLocation(indexSettings, shardId, sharedDataPath, nodeLockId); } else { dataPath = statePath; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardToolCli.java b/server/src/main/java/org/elasticsearch/index/shard/ShardToolCli.java new file mode 100644 index 0000000000000..62693d2b60b78 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardToolCli.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import org.elasticsearch.cli.LoggingAwareMultiCommand; +import org.elasticsearch.cli.Terminal; + +/** + * Class encapsulating and dispatching commands from the {@code elasticsearch-shard} command line tool + */ +public class ShardToolCli extends LoggingAwareMultiCommand { + + private ShardToolCli() { + super("A CLI tool to remove corrupted parts of unrecoverable shards"); + subcommands.put("remove-corrupted-data", new RemoveCorruptedShardDataCommand()); + } + + public static void main(String[] args) throws Exception { + exit(new ShardToolCli().main(args, Terminal.DEFAULT)); + } + +} + diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java index 20aadf21bcb48..d80a6729d30bc 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogHeader.java @@ -144,7 +144,6 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil final long primaryTerm; if (version == VERSION_PRIMARY_TERM) { primaryTerm = in.readLong(); - assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]"; } else { assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]"; primaryTerm = UNKNOWN_PRIMARY_TERM; @@ -153,6 +152,8 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil if (version >= VERSION_PRIMARY_TERM) { Translog.verifyChecksum(in); } + assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]"; + final int headerSizeInBytes = headerSizeInBytes(version, uuid.length); assert channel.position() == headerSizeInBytes : "Header is not fully read; header size [" + headerSizeInBytes + "], position [" + channel.position() + "]"; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java index f7d830a32ec1b..a8a8d735f9a0c 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java @@ -21,15 +21,18 @@ import org.elasticsearch.cli.LoggingAwareMultiCommand; import org.elasticsearch.cli.Terminal; +import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommand; /** * Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool */ +@Deprecated public class TranslogToolCli extends LoggingAwareMultiCommand { private TranslogToolCli() { + // that's only for 6.x branch for bwc with elasticsearch-translog super("A CLI tool for various Elasticsearch translog actions"); - subcommands.put("truncate", new TruncateTranslogCommand()); + subcommands.put("truncate", new RemoveCorruptedShardDataCommand(true)); } public static void main(String[] args) throws Exception { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java new file mode 100644 index 0000000000000..0b9c365509685 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java @@ -0,0 +1,245 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.translog; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.store.Directory; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cli.Terminal; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.RemoveCorruptedShardDataCommand; +import org.elasticsearch.index.shard.ShardPath; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +public class TruncateTranslogAction { + + protected static final Logger logger = Loggers.getLogger(TruncateTranslogAction.class); + private final NamedXContentRegistry namedXContentRegistry; + + public TruncateTranslogAction(NamedXContentRegistry namedXContentRegistry) { + this.namedXContentRegistry = namedXContentRegistry; + } + + public Tuple getCleanStatus(ShardPath shardPath, + Directory indexDirectory) throws IOException { + final Path indexPath = shardPath.resolveIndex(); + final Path translogPath = shardPath.resolveTranslog(); + final List commits; + try { + commits = DirectoryReader.listCommits(indexDirectory); + } catch (IndexNotFoundException infe) { + throw new ElasticsearchException("unable to find a valid shard at [" + indexPath + "]", infe); + } + + // Retrieve the generation and UUID from the existing data + final Map commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData()); + final String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); + + if (translogUUID == null) { + throw new ElasticsearchException("shard must have a valid translog UUID but got: [null]"); + } + + final boolean clean = isTranslogClean(shardPath, translogUUID); + + if (clean) { + return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CLEAN, null); + } + + // Hold the lock open for the duration of the tool running + Set translogFiles; + try { + translogFiles = filesInDirectory(translogPath); + } catch (IOException e) { + throw new ElasticsearchException("failed to find existing translog files", e); + } + final String details = deletingFilesDetails(translogPath, translogFiles); + + return Tuple.tuple(RemoveCorruptedShardDataCommand.CleanStatus.CORRUPTED, details); + } + + public void execute(Terminal terminal, ShardPath shardPath, Directory indexDirectory) throws IOException { + final Path indexPath = shardPath.resolveIndex(); + final Path translogPath = shardPath.resolveTranslog(); + + final String historyUUID = UUIDs.randomBase64UUID(); + final Map commitData; + // Hold the lock open for the duration of the tool running + Set translogFiles; + try { + terminal.println("Checking existing translog files"); + translogFiles = filesInDirectory(translogPath); + } catch (IOException e) { + terminal.println("encountered IOException while listing directory, aborting..."); + throw new ElasticsearchException("failed to find existing translog files", e); + } + + List commits; + try { + terminal.println("Reading translog UUID information from Lucene commit from shard at [" + indexPath + "]"); + commits = DirectoryReader.listCommits(indexDirectory); + } catch (IndexNotFoundException infe) { + throw new ElasticsearchException("unable to find a valid shard at [" + indexPath + "]", infe); + } + + // Retrieve the generation and UUID from the existing data + commitData = commits.get(commits.size() - 1).getUserData(); + final String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); + final String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); + if (translogGeneration == null || translogUUID == null) { + throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", + translogGeneration, translogUUID); + } + + final long globalCheckpoint = commitData.containsKey(SequenceNumbers.MAX_SEQ_NO) + ? Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO)) + : SequenceNumbers.UNASSIGNED_SEQ_NO; + + terminal.println("Translog Generation: " + translogGeneration); + terminal.println("Translog UUID : " + translogUUID); + terminal.println("History UUID : " + historyUUID); + + Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME); + Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME); + Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + + translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); + Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + + translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); + + // Write empty checkpoint and translog to empty files + long gen = Long.parseLong(translogGeneration); + int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); + writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint); + + terminal.println("Removing existing translog files"); + IOUtils.rm(translogFiles.toArray(new Path[]{})); + + terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]"); + Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE); + terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]"); + Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE); + + // Fsync the translog directory after rename + IOUtils.fsync(translogPath, true); + } + + private boolean isTranslogClean(ShardPath shardPath, String translogUUID) throws IOException { + // perform clean check of translog instead of corrupted marker file + boolean clean = true; + try { + final Path translogPath = shardPath.resolveTranslog(); + final long translogGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID); + final IndexMetaData indexMetaData = + IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, shardPath.getDataPath().getParent()); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, Settings.EMPTY); + final TranslogConfig translogConfig = new TranslogConfig(shardPath.getShardId(), translogPath, + indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardPath.getShardId().id()); + final TranslogDeletionPolicy translogDeletionPolicy = + new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(), + indexSettings.getTranslogRetentionAge().getMillis()); + try (Translog translog = new Translog(translogConfig, translogUUID, + translogDeletionPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm); + Translog.Snapshot snapshot = translog.newSnapshot()) { + while (snapshot.next() != null) { + // just iterate over snapshot + } + } + } catch (TranslogCorruptedException e) { + clean = false; + } + return clean; + } + + /** Write a checkpoint file to the given location with the given generation */ + static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException { + Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, + globalCheckpoint, translogGeneration); + Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, + StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); + // fsync with metadata here to make sure. + IOUtils.fsync(filename, false); + } + + /** + * Write a translog containing the given translog UUID to the given location. Returns the number of bytes written. + */ + private static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException { + try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) { + TranslogHeader header = new TranslogHeader(translogUUID, TranslogHeader.UNKNOWN_PRIMARY_TERM); + header.write(fc); + return header.sizeInBytes(); + } + } + + /** Show a warning about deleting files, asking for a confirmation if {@code batchMode} is false */ + private String deletingFilesDetails(Path translogPath, Set files) { + StringBuilder builder = new StringBuilder(); + + builder + .append("Documents inside of translog files will be lost.\n") + .append(" The following files will be DELETED at ") + .append(translogPath) + .append("\n\n"); + for(Iterator it = files.iterator();it.hasNext();) { + builder.append(" --> ").append(it.next().getFileName()); + if (it.hasNext()) { + builder.append("\n"); + } + } + return builder.toString(); + } + + /** Return a Set of all files in a given directory */ + public static Set filesInDirectory(Path directory) throws IOException { + Set files = new TreeSet<>(); + try (DirectoryStream stream = Files.newDirectoryStream(directory)) { + for (Path file : stream) { + files.add(file); + } + } + return files; + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java deleted file mode 100644 index a90f8af0af42c..0000000000000 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import joptsimple.OptionParser; -import joptsimple.OptionSet; -import joptsimple.OptionSpec; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.Lock; -import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.store.NativeFSLockFactory; -import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cli.EnvironmentAwareCommand; -import org.elasticsearch.cli.Terminal; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.env.Environment; -import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.seqno.SequenceNumbers; - -import java.io.IOException; -import java.nio.channels.FileChannel; -import java.nio.file.DirectoryStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class TruncateTranslogCommand extends EnvironmentAwareCommand { - - private final OptionSpec translogFolder; - private final OptionSpec batchMode; - - public TruncateTranslogCommand() { - super("Truncates a translog to create a new, empty translog"); - this.translogFolder = parser.acceptsAll(Arrays.asList("d", "dir"), - "Translog Directory location on disk") - .withRequiredArg() - .required(); - this.batchMode = parser.acceptsAll(Arrays.asList("b", "batch"), - "Enable batch mode explicitly, automatic confirmation of warnings"); - } - - // Visible for testing - public OptionParser getParser() { - return this.parser; - } - - @Override - protected void printAdditionalHelp(Terminal terminal) { - terminal.println("This tool truncates the translog and translog"); - terminal.println("checkpoint files to create a new translog"); - } - - @SuppressForbidden(reason = "Necessary to use the path passed in") - private Path getTranslogPath(OptionSet options) { - return PathUtils.get(translogFolder.value(options), "", ""); - } - - @Override - protected void execute(Terminal terminal, OptionSet options, Environment env) throws Exception { - boolean batch = options.has(batchMode); - - Path translogPath = getTranslogPath(options); - Path idxLocation = translogPath.getParent().resolve("index"); - - if (Files.exists(translogPath) == false || Files.isDirectory(translogPath) == false) { - throw new ElasticsearchException("translog directory [" + translogPath + "], must exist and be a directory"); - } - - if (Files.exists(idxLocation) == false || Files.isDirectory(idxLocation) == false) { - throw new ElasticsearchException("unable to find a shard at [" + idxLocation + "], which must exist and be a directory"); - } - try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE)) { - final String historyUUID = UUIDs.randomBase64UUID(); - final Map commitData; - // Hold the lock open for the duration of the tool running - try (Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - Set translogFiles; - try { - terminal.println("Checking existing translog files"); - translogFiles = filesInDirectory(translogPath); - } catch (IOException e) { - terminal.println("encountered IOException while listing directory, aborting..."); - throw new ElasticsearchException("failed to find existing translog files", e); - } - - // Warn about ES being stopped and files being deleted - warnAboutDeletingFiles(terminal, translogFiles, batch); - - List commits; - try { - terminal.println("Reading translog UUID information from Lucene commit from shard at [" + idxLocation + "]"); - commits = DirectoryReader.listCommits(dir); - } catch (IndexNotFoundException infe) { - throw new ElasticsearchException("unable to find a valid shard at [" + idxLocation + "]", infe); - } - - // Retrieve the generation and UUID from the existing data - commitData = new HashMap<>(commits.get(commits.size() - 1).getUserData()); - String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); - String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); - final long globalCheckpoint; - // In order to have a safe commit invariant, we have to assign the global checkpoint to the max_seqno of the last commit. - // We can only safely do it because we will generate a new history uuid this shard. - if (commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)) { - globalCheckpoint = Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO)); - // Also advances the local checkpoint of the last commit to its max_seqno. - commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(globalCheckpoint)); - } else { - globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; - } - if (translogGeneration == null || translogUUID == null) { - throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", - translogGeneration, translogUUID); - } - terminal.println("Translog Generation: " + translogGeneration); - terminal.println("Translog UUID : " + translogUUID); - terminal.println("History UUID : " + historyUUID); - - Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME); - Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME); - Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); - Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); - - // Write empty checkpoint and translog to empty files - long gen = Long.parseLong(translogGeneration); - int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); - writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint); - - terminal.println("Removing existing translog files"); - IOUtils.rm(translogFiles.toArray(new Path[]{})); - - terminal.println("Creating new empty checkpoint at [" + realEmptyCheckpoint + "]"); - Files.move(tempEmptyCheckpoint, realEmptyCheckpoint, StandardCopyOption.ATOMIC_MOVE); - terminal.println("Creating new empty translog at [" + realEmptyTranslog + "]"); - Files.move(tempEmptyTranslog, realEmptyTranslog, StandardCopyOption.ATOMIC_MOVE); - - // Fsync the translog directory after rename - IOUtils.fsync(translogPath, true); - } - - terminal.println("Marking index with the new history uuid"); - // commit the new histroy id - IndexWriterConfig iwc = new IndexWriterConfig(null) - .setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) - .setCommitOnClose(false) - // we don't want merges to happen here - we call maybe merge on the engine - // later once we stared it up otherwise we would need to wait for it here - // we also don't specify a codec here and merges should use the engines for this index - .setMergePolicy(NoMergePolicy.INSTANCE) - .setOpenMode(IndexWriterConfig.OpenMode.APPEND); - try (IndexWriter writer = new IndexWriter(dir, iwc)) { - Map newCommitData = new HashMap<>(commitData); - newCommitData.put(Engine.HISTORY_UUID_KEY, historyUUID); - writer.setLiveCommitData(newCommitData.entrySet()); - writer.commit(); - } - } catch (LockObtainFailedException lofe) { - throw new ElasticsearchException("Failed to lock shard's directory at [" + idxLocation + "], is Elasticsearch still running?"); - } - - terminal.println("Done."); - } - - /** Write a checkpoint file to the given location with the given generation */ - static void writeEmptyCheckpoint(Path filename, int translogLength, long translogGeneration, long globalCheckpoint) throws IOException { - Checkpoint emptyCheckpoint = Checkpoint.emptyTranslogCheckpoint(translogLength, translogGeneration, - globalCheckpoint, translogGeneration); - Checkpoint.write(FileChannel::open, filename, emptyCheckpoint, - StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); - // fsync with metadata here to make sure. - IOUtils.fsync(filename, false); - } - - /** - * Write a translog containing the given translog UUID to the given location. Returns the number of bytes written. - */ - public static int writeEmptyTranslog(Path filename, String translogUUID) throws IOException { - try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) { - TranslogHeader header = new TranslogHeader(translogUUID, TranslogHeader.UNKNOWN_PRIMARY_TERM); - header.write(fc); - return header.sizeInBytes(); - } - } - - /** Show a warning about deleting files, asking for a confirmation if {@code batchMode} is false */ - public static void warnAboutDeletingFiles(Terminal terminal, Set files, boolean batchMode) { - terminal.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); - terminal.println("! WARNING: Elasticsearch MUST be stopped before running this tool !"); - terminal.println("! !"); - terminal.println("! WARNING: Documents inside of translog files will be lost !"); - terminal.println("! !"); - terminal.println("! WARNING: The following files will be DELETED! !"); - terminal.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); - for (Path file : files) { - terminal.println("--> " + file); - } - terminal.println(""); - if (batchMode == false) { - String text = terminal.readText("Continue and DELETE files? [y/N] "); - if (!text.equalsIgnoreCase("y")) { - throw new ElasticsearchException("aborted by user"); - } - } - } - - /** Return a Set of all files in a given directory */ - public static Set filesInDirectory(Path directory) throws IOException { - Set files = new HashSet<>(); - try (DirectoryStream stream = Files.newDirectoryStream(directory)) { - for (Path file : stream) { - files.add(file); - } - } - return files; - } - -} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9a5df39a970a9..b74b5343a82a1 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -23,7 +23,6 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexableField; -import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -2641,7 +2640,8 @@ public void testIndexCheckOnStartup() throws Exception { final ShardPath shardPath = indexShard.shardPath(); - final Path indexPath = corruptIndexFile(shardPath); + final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME); + CorruptionUtils.corruptIndex(random(), indexPath, false); final AtomicInteger corruptedMarkerCount = new AtomicInteger(); final SimpleFileVisitor corruptedVisitor = new SimpleFileVisitor() { @@ -2750,22 +2750,6 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO assertThat("store still has a single corrupt marker", corruptedMarkerCount.get(), equalTo(1)); } - private Path corruptIndexFile(ShardPath shardPath) throws IOException { - final Path indexPath = shardPath.getDataPath().resolve(ShardPath.INDEX_FOLDER_NAME); - final Path[] filesToCorrupt = - Files.walk(indexPath) - .filter(p -> { - final String name = p.getFileName().toString(); - return Files.isRegularFile(p) - && name.startsWith("extra") == false // Skip files added by Lucene's ExtrasFS - && IndexWriter.WRITE_LOCK_NAME.equals(name) == false - && name.startsWith("segments_") == false && name.endsWith(".si") == false; - }) - .toArray(Path[]::new); - CorruptionUtils.corruptFile(random(), filesToCorrupt); - return indexPath; - } - /** * Simulates a scenario that happens when we are async fetching snapshot metadata from GatewayService * and checking index concurrently. This should always be possible without any exception. diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java new file mode 100644 index 0000000000000..dc3be31734d5c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandIT.java @@ -0,0 +1,652 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.Lock; +import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.store.NativeFSLockFactory; +import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplanation; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.cli.MockTerminal; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.allocation.AllocationDecision; +import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision; +import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.index.MockEngineFactoryPlugin; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.translog.TestTranslog; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.monitor.fs.FsInfo; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.CorruptionUtils; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.engine.MockEngineSupport; +import org.elasticsearch.test.transport.MockTransportService; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) +public class RemoveCorruptedShardDataCommandIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class, InternalSettingsPlugin.class); + } + + public void testCorruptIndex() throws Exception { + final String node = internalCluster().startNode(); + + final String indexName = "index42"; + assertAcked(prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1") + .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum") + )); + + // index some docs in several segments + int numDocs = 0; + for (int k = 0, attempts = randomIntBetween(5, 10); k < attempts; k++) { + final int numExtraDocs = between(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numExtraDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar"); + } + + numDocs += numExtraDocs; + + indexRandom(false, false, false, Arrays.asList(builders)); + flush(indexName); + } + + logger.info("--> indexed {} docs", numDocs); + + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(); + final MockTerminal terminal = new MockTerminal(); + final OptionParser parser = command.getParser(); + + final Environment environment = TestEnvironment.newEnvironment(internalCluster().getDefaultSettings()); + final OptionSet options = parser.parse("-index", indexName, "-shard-id", "0"); + + // Try running it before the node is stopped (and shard is closed) + try { + command.execute(terminal, options, environment); + fail("expected the command to fail as node is locked"); + } catch (Exception e) { + assertThat(e.getMessage(), + allOf(containsString("Failed to lock node's directory"), + containsString("is Elasticsearch still running ?"))); + } + + final Set indexDirs = getDirs(indexName, ShardPath.INDEX_FOLDER_NAME); + assertThat(indexDirs, hasSize(1)); + + internalCluster().restartNode(node, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + // Try running it before the shard is corrupted, it should flip out because there is no corruption file marker + try { + command.execute(terminal, options, environment); + fail("expected the command to fail as there is no corruption file marker"); + } catch (Exception e) { + assertThat(e.getMessage(), startsWith("Shard does not seem to be corrupted at")); + } + + CorruptionUtils.corruptIndex(random(), indexDirs.iterator().next(), false); + return super.onNodeStopped(nodeName); + } + }); + + // shard should be failed due to a corrupted index + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(0).setPrimary(true) + .get().getExplanation(); + + final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); + assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); + assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), + equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); + }); + + internalCluster().restartNode(node, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + terminal.addTextInput("y"); + command.execute(terminal, options, environment); + + return super.onNodeStopped(nodeName); + } + }); + + waitNoPendingTasksOnAll(); + + String nodeId = null; + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + final DiscoveryNodes nodes = state.nodes(); + for (ObjectObjectCursor cursor : nodes.getNodes()) { + final String name = cursor.value.getName(); + if (name.equals(node)) { + nodeId = cursor.key; + break; + } + } + assertThat(nodeId, notNullValue()); + + logger.info("--> output:\n{}", terminal.getOutput()); + + assertThat(terminal.getOutput(), containsString("allocate_stale_primary")); + assertThat(terminal.getOutput(), containsString("\"node\" : \"" + nodeId + "\"")); + + // there is only _stale_ primary (due to new allocation id) + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(0).setPrimary(true) + .get().getExplanation(); + + final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); + assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); + assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), + equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); + }); + + client().admin().cluster().prepareReroute() + .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, nodeId, true)) + .get(); + + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(0).setPrimary(true) + .get().getExplanation(); + + assertThat(explanation.getCurrentNode(), notNullValue()); + assertThat(explanation.getShardState(), equalTo(ShardRoutingState.STARTED)); + }); + + final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?\\d+) documents will be lost."); + final Matcher matcher = pattern.matcher(terminal.getOutput()); + assertThat(matcher.find(), equalTo(true)); + final int expectedNumDocs = numDocs - Integer.parseInt(matcher.group("docs")); + + ensureGreen(indexName); + + assertHitCount(client().prepareSearch(indexName).setQuery(matchAllQuery()).get(), expectedNumDocs); + } + + public void testCorruptTranslogTruncation() throws Exception { + internalCluster().startNodes(2, Settings.EMPTY); + + final String node1 = internalCluster().getNodeNames()[0]; + final String node2 = internalCluster().getNodeNames()[1]; + + final String indexName = "test"; + assertAcked(prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1") + .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog + .put("index.routing.allocation.exclude._name", node2) + )); + ensureYellow(); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder() + .put("index.routing.allocation.exclude._name", (String)null) + )); + ensureGreen(); + + // Index some documents + int numDocsToKeep = randomIntBetween(10, 100); + logger.info("--> indexing [{}] docs to be kept", numDocsToKeep); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar"); + } + indexRandom(false, false, false, Arrays.asList(builders)); + flush(indexName); + + disableTranslogFlush(indexName); + // having no extra docs is an interesting case for seq no based recoveries - test it more often + int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100); + logger.info("--> indexing [{}] more doc to be truncated", numDocsToTruncate); + builders = new IndexRequestBuilder[numDocsToTruncate]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar"); + } + indexRandom(false, false, false, Arrays.asList(builders)); + Set translogDirs = getDirs(indexName, ShardPath.TRANSLOG_FOLDER_NAME); + + // that's only for 6.x branch for bwc with elasticsearch-translog + final boolean translogOnly = randomBoolean(); + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(translogOnly); + final MockTerminal terminal = new MockTerminal(); + final OptionParser parser = command.getParser(); + + if (randomBoolean() && numDocsToTruncate > 0) { + // flush the replica, so it will have more docs than what the primary will have + Index index = resolveIndex(indexName); + IndexShard replica = internalCluster().getInstance(IndicesService.class, node2).getShardOrNull(new ShardId(index, 0)); + replica.flush(new FlushRequest()); + logger.info("--> performed extra flushing on replica"); + } + + // shut down the replica node to be tested later + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2)); + + // Corrupt the translog file(s) + logger.info("--> corrupting translog"); + corruptRandomTranslogFiles(indexName); + + // Restart the single node + logger.info("--> restarting node"); + internalCluster().restartRandomDataNode(); + + // all shards should be failed due to a corrupted translog + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(0).setPrimary(true) + .get().getExplanation(); + + final UnassignedInfo unassignedInfo = explanation.getUnassignedInfo(); + assertThat(unassignedInfo.getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED)); + }); + + // have to shut down primary node - otherwise node lock is present + final InternalTestCluster.RestartCallback callback = + new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + // and we can actually truncate the translog + for (Path translogDir : translogDirs) { + final Path idxLocation = translogDir.getParent().resolve(ShardPath.INDEX_FOLDER_NAME); + assertBusy(() -> { + logger.info("--> checking that lock has been released for {}", idxLocation); + try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE); + Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + // Great, do nothing, we just wanted to obtain the lock + } catch (LockObtainFailedException lofe) { + logger.info("--> failed acquiring lock for {}", idxLocation); + fail("still waiting for lock release at [" + idxLocation + "]"); + } catch (IOException ioe) { + fail("Got an IOException: " + ioe); + } + }); + + final Settings defaultSettings = internalCluster().getDefaultSettings(); + final Environment environment = TestEnvironment.newEnvironment(defaultSettings); + + terminal.addTextInput("y"); + OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString()); + logger.info("--> running command for [{}]", translogDir.toAbsolutePath()); + command.execute(terminal, options, environment); + logger.info("--> output:\n{}", terminal.getOutput()); + } + + return super.onNodeStopped(nodeName); + } + }; + internalCluster().restartNode(node1, callback); + + String primaryNodeId = null; + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + final DiscoveryNodes nodes = state.nodes(); + for (ObjectObjectCursor cursor : nodes.getNodes()) { + final String name = cursor.value.getName(); + if (name.equals(node1)) { + primaryNodeId = cursor.key; + break; + } + } + assertThat(primaryNodeId, notNullValue()); + + assertThat(terminal.getOutput(), containsString("allocate_stale_primary")); + assertThat(terminal.getOutput(), containsString("\"node\" : \"" + primaryNodeId + "\"")); + + // there is only _stale_ primary (due to new allocation id) + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(0).setPrimary(true) + .get().getExplanation(); + + final ShardAllocationDecision shardAllocationDecision = explanation.getShardAllocationDecision(); + assertThat(shardAllocationDecision.isDecisionTaken(), equalTo(true)); + assertThat(shardAllocationDecision.getAllocateDecision().getAllocationDecision(), + equalTo(AllocationDecision.NO_VALID_SHARD_COPY)); + }); + + client().admin().cluster().prepareReroute() + .add(new AllocateStalePrimaryAllocationCommand(indexName, 0, primaryNodeId, true)) + .get(); + + assertBusy(() -> { + final ClusterAllocationExplanation explanation = + client().admin().cluster().prepareAllocationExplain() + .setIndex(indexName).setShard(0).setPrimary(true) + .get().getExplanation(); + + assertThat(explanation.getCurrentNode(), notNullValue()); + assertThat(explanation.getShardState(), equalTo(ShardRoutingState.STARTED)); + }); + + ensureYellow(indexName); + + // Run a search and make sure it succeeds + assertHitCount(client().prepareSearch(indexName).setQuery(matchAllQuery()).get(), numDocsToKeep); + + logger.info("--> starting the replica node to test recovery"); + internalCluster().startNode(); + ensureGreen(indexName); + for (String node : internalCluster().nodesInclude(indexName)) { + SearchRequestBuilder q = client().prepareSearch(indexName).setPreference("_only_nodes:" + node).setQuery(matchAllQuery()); + assertHitCount(q.get(), numDocsToKeep); + } + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).setActiveOnly(false).get(); + final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get(indexName).stream() + .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); + assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); + // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit. + final SeqNoStats seqNoStats = getSeqNoStats(indexName, 0); + assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + } + + public void testCorruptTranslogTruncationOfReplica() throws Exception { + internalCluster().startNodes(2, Settings.EMPTY); + + final String node1 = internalCluster().getNodeNames()[0]; + final String node2 = internalCluster().getNodeNames()[1]; + logger.info("--> nodes name: {}, {}", node1, node2); + + final String indexName = "test"; + assertAcked(prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1") + .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog + .put("index.routing.allocation.exclude._name", node2) + )); + ensureYellow(); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName).setSettings(Settings.builder() + .put("index.routing.allocation.exclude._name", (String)null) + )); + ensureGreen(); + + // Index some documents + int numDocsToKeep = randomIntBetween(0, 100); + logger.info("--> indexing [{}] docs to be kept", numDocsToKeep); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar"); + } + indexRandom(false, false, false, Arrays.asList(builders)); + flush(indexName); + disableTranslogFlush(indexName); + // having no extra docs is an interesting case for seq no based recoveries - test it more often + int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100); + logger.info("--> indexing [{}] more docs to be truncated", numDocsToTruncate); + builders = new IndexRequestBuilder[numDocsToTruncate]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(indexName, "type").setSource("foo", "bar"); + } + indexRandom(false, false, false, Arrays.asList(builders)); + final int totalDocs = numDocsToKeep + numDocsToTruncate; + + // sample the replica node translog dirs + final ShardId shardId = new ShardId(resolveIndex(indexName), 0); + final Set translogDirs = getDirs(node2, shardId, ShardPath.TRANSLOG_FOLDER_NAME); + + // stop the cluster nodes. we don't use full restart so the node start up order will be the same + // and shard roles will be maintained + internalCluster().stopRandomDataNode(); + internalCluster().stopRandomDataNode(); + + // Corrupt the translog file(s) + logger.info("--> corrupting translog"); + TestTranslog.corruptRandomTranslogFile(logger, random(), translogDirs); + + // Restart the single node + logger.info("--> starting node"); + internalCluster().startNode(); + + ensureYellow(); + + // Run a search and make sure it succeeds + assertHitCount(client().prepareSearch(indexName).setQuery(matchAllQuery()).get(), totalDocs); + + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(); + final MockTerminal terminal = new MockTerminal(); + final OptionParser parser = command.getParser(); + + final Environment environment = TestEnvironment.newEnvironment(internalCluster().getDefaultSettings()); + + internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + logger.info("--> node {} stopped", nodeName); + for (Path translogDir : translogDirs) { + final Path idxLocation = translogDir.getParent().resolve(ShardPath.INDEX_FOLDER_NAME); + assertBusy(() -> { + logger.info("--> checking that lock has been released for {}", idxLocation); + try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE); + Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + // Great, do nothing, we just wanted to obtain the lock + } catch (LockObtainFailedException lofe) { + logger.info("--> failed acquiring lock for {}", idxLocation); + fail("still waiting for lock release at [" + idxLocation + "]"); + } catch (IOException ioe) { + fail("Got an IOException: " + ioe); + } + }); + + terminal.addTextInput("y"); + OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString()); + logger.info("--> running command for [{}]", translogDir.toAbsolutePath()); + command.execute(terminal, options, environment); + logger.info("--> output:\n{}", terminal.getOutput()); + } + + return super.onNodeStopped(nodeName); + } + }); + + logger.info("--> starting the replica node to test recovery"); + internalCluster().startNode(); + ensureGreen(indexName); + for (String node : internalCluster().nodesInclude(indexName)) { + assertHitCount(client().prepareSearch(indexName) + .setPreference("_only_nodes:" + node).setQuery(matchAllQuery()).get(), totalDocs); + } + + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).setActiveOnly(false).get(); + final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get(indexName).stream() + .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); + // the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery + assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); + // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit. + final SeqNoStats seqNoStats = getSeqNoStats(indexName, 0); + assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); + } + + public void testResolvePath() throws Exception { + final int numOfNodes = randomIntBetween(1, 5); + final List nodeNames = internalCluster().startNodes(numOfNodes, Settings.EMPTY); + + final String indexName = "test" + randomInt(100); + assertAcked(prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numOfNodes - 1) + )); + flush(indexName); + + ensureGreen(indexName); + + final Map nodeNameToNodeId = new HashMap<>(); + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + final DiscoveryNodes nodes = state.nodes(); + for (ObjectObjectCursor cursor : nodes.getNodes()) { + nodeNameToNodeId.put(cursor.value.getName(), cursor.key); + } + + final GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false); + final List iterators = iterableAsArrayList(shardIterators); + final ShardRouting shardRouting = iterators.iterator().next().nextOrNull(); + assertThat(shardRouting, notNullValue()); + final ShardId shardId = shardRouting.shardId(); + + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(); + final OptionParser parser = command.getParser(); + + final Environment environment = TestEnvironment.newEnvironment(internalCluster().getDefaultSettings()); + + final Map indexPathByNodeName = new HashMap<>(); + for (String nodeName : nodeNames) { + final String nodeId = nodeNameToNodeId.get(nodeName); + final Set indexDirs = getDirs(nodeId, shardId, ShardPath.INDEX_FOLDER_NAME); + assertThat(indexDirs, hasSize(1)); + indexPathByNodeName.put(nodeName, indexDirs.iterator().next()); + + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeName)); + logger.info(" -- stopped {}", nodeName); + } + + for (String nodeName : nodeNames) { + final Path indexPath = indexPathByNodeName.get(nodeName); + final OptionSet options = parser.parse("--dir", indexPath.toAbsolutePath().toString()); + command.findAndProcessShardPath(options, environment, + shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath))); + } + } + + private Set getDirs(String indexName, String dirSuffix) { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false); + List iterators = iterableAsArrayList(shardIterators); + ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators); + ShardRouting shardRouting = shardIterator.nextOrNull(); + assertNotNull(shardRouting); + assertTrue(shardRouting.primary()); + assertTrue(shardRouting.assignedToNode()); + String nodeId = shardRouting.currentNodeId(); + ShardId shardId = shardRouting.shardId(); + return getDirs(nodeId, shardId, dirSuffix); + } + + private Set getDirs(String nodeId, ShardId shardId, String dirSuffix) { + final NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get(); + final Set translogDirs = new TreeSet<>(); + final NodeStats nodeStats = nodeStatses.getNodes().get(0); + for (FsInfo.Path fsPath : nodeStats.getFs()) { + final String path = fsPath.getPath(); + final Path p = PathUtils.get(path) + .resolve(NodeEnvironment.INDICES_FOLDER) + .resolve(shardId.getIndex().getUUID()) + .resolve(Integer.toString(shardId.getId())) + .resolve(dirSuffix); + if (Files.isDirectory(p)) { + translogDirs.add(p); + } + } + return translogDirs; + } + + private void corruptRandomTranslogFiles(String indexName) throws IOException { + Set translogDirs = getDirs(indexName, ShardPath.TRANSLOG_FOLDER_NAME); + TestTranslog.corruptRandomTranslogFile(logger, random(), translogDirs); + } + + /** Disables translog flushing for the specified index */ + private static void disableTranslogFlush(String index) { + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) + .build(); + client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); + } + + private SeqNoStats getSeqNoStats(String index, int shardId) { + final ShardStats[] shardStats = client().admin().indices() + .prepareStats(index).get() + .getIndices().get(index).getShards(); + return shardStats[shardId].getSeqNoStats(); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java new file mode 100644 index 0000000000000..7b43d42b2c216 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -0,0 +1,409 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.shard; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import org.apache.lucene.store.BaseDirectoryWrapper; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.cli.MockTerminal; +import org.elasticsearch.cli.Terminal; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.TestTranslog; +import org.elasticsearch.index.translog.TranslogCorruptedException; +import org.elasticsearch.test.CorruptionUtils; +import org.elasticsearch.test.DummyShardLock; +import org.junit.Before; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; + +public class RemoveCorruptedShardDataCommandTests extends IndexShardTestCase { + + private ShardId shardId; + private ShardRouting routing; + private Path dataDir; + private Environment environment; + private Settings settings; + private ShardPath shardPath; + private IndexMetaData indexMetaData; + private IndexShard indexShard; + private Path translogPath; + private Path indexPath; + + @Before + public void setup() throws IOException { + shardId = new ShardId("index0", "_na_", 0); + final String nodeId = randomAlphaOfLength(10); + routing = TestShardRouting.newShardRouting(shardId, nodeId, true, ShardRoutingState.INITIALIZING, + RecoverySource.EmptyStoreRecoverySource.INSTANCE); + + dataDir = createTempDir(); + + environment = + TestEnvironment.newEnvironment(Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), dataDir) + .putList(Environment.PATH_DATA_SETTING.getKey(), dataDir.toAbsolutePath().toString()).build()); + + // create same directory structure as prod does + final Path path = NodeEnvironment.resolveNodePath(dataDir, 0); + Files.createDirectories(path); + settings = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + + final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(path); + shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); + final IndexMetaData.Builder metaData = IndexMetaData.builder(routing.getIndexName()) + .settings(settings) + .primaryTerm(0, randomIntBetween(1, 100)) + .putMapping("_doc", "{ \"properties\": {} }"); + indexMetaData = metaData.build(); + + indexShard = newStartedShard(p -> + newShard(routing, shardPath, indexMetaData, null, null, + new InternalEngineFactory(), () -> { + }, EMPTY_EVENT_LISTENER), + true); + + translogPath = shardPath.resolveTranslog(); + indexPath = shardPath.resolveIndex(); + } + + public void testShardLock() throws Exception { + indexDocs(indexShard, true); + + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(); + final MockTerminal t = new MockTerminal(); + final OptionParser parser = command.getParser(); + + // Try running it before the shard is closed, it should flip out because it can't acquire the lock + try { + final OptionSet options = parser.parse("-d", indexPath.toString()); + command.execute(t, options, environment); + fail("expected the command to fail not being able to acquire the lock"); + } catch (Exception e) { + assertThat(e.getMessage(), containsString("Failed to lock shard's directory")); + } + + // close shard + closeShards(indexShard); + + // Try running it before the shard is corrupted + try { + final OptionSet options = parser.parse("-d", indexPath.toString()); + command.execute(t, options, environment); + fail("expected the command to fail not being able to find a corrupt file marker"); + } catch (ElasticsearchException e) { + assertThat(e.getMessage(), startsWith("Shard does not seem to be corrupted at")); + assertThat(t.getOutput(), containsString("Lucene index is clean at")); + } + } + + public void testCorruptedIndex() throws Exception { + final int numDocs = indexDocs(indexShard, true); + + // close shard + closeShards(indexShard); + + final boolean corruptSegments = randomBoolean(); + CorruptionUtils.corruptIndex(random(), indexPath, corruptSegments); + + // test corrupted shard + final IndexShard corruptedShard = reopenIndexShard(true); + allowShardFailures(); + expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true)); + closeShards(corruptedShard); + + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(); + final MockTerminal t = new MockTerminal(); + final OptionParser parser = command.getParser(); + + // run command with dry-run + t.addTextInput("n"); // mean dry run + final OptionSet options = parser.parse("-d", indexPath.toString()); + t.setVerbosity(Terminal.Verbosity.VERBOSE); + try { + command.execute(t, options, environment); + fail(); + } catch (ElasticsearchException e) { + if (corruptSegments) { + assertThat(e.getMessage(), is("Index is unrecoverable")); + } else { + assertThat(e.getMessage(), containsString("aborted by user")); + } + } + + logger.info("--> output:\n{}", t.getOutput()); + + if (corruptSegments == false) { + + // run command without dry-run + t.addTextInput("y"); + command.execute(t, options, environment); + + final String output = t.getOutput(); + logger.info("--> output:\n{}", output); + + // reopen shard + failOnShardFailures(); + final IndexShard newShard = newStartedShard(p -> reopenIndexShard(false), true); + + final Set shardDocUIDs = getShardDocUIDs(newShard); + + final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?\\d+) documents will be lost."); + final Matcher matcher = pattern.matcher(output); + assertThat(matcher.find(), equalTo(true)); + final int expectedNumDocs = numDocs - Integer.parseInt(matcher.group("docs")); + + assertThat(shardDocUIDs.size(), equalTo(expectedNumDocs)); + + closeShards(newShard); + } + } + + public void testCorruptedTranslog() throws Exception { + final int numDocsToKeep = indexDocs(indexShard, false); + + // close shard + closeShards(indexShard); + + TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath)); + + // test corrupted shard + final IndexShard corruptedShard = reopenIndexShard(true); + + allowShardFailures(); + // it has to fail on start up due to index.shard.check_on_startup = checksum + final Exception exception = expectThrows(Exception.class, () -> newStartedShard(p -> corruptedShard, true)); + final Throwable cause = exception.getCause() instanceof EngineException ? exception.getCause().getCause() : exception.getCause(); + assertThat(cause, instanceOf(TranslogCorruptedException.class)); + + closeShards(corruptedShard); + + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(); + final MockTerminal t = new MockTerminal(); + final OptionParser parser = command.getParser(); + + final OptionSet options = parser.parse("-d", translogPath.toString()); + // run command with dry-run + t.addTextInput("n"); // mean dry run + t.setVerbosity(Terminal.Verbosity.VERBOSE); + try { + command.execute(t, options, environment); + fail(); + } catch (ElasticsearchException e) { + assertThat(e.getMessage(), containsString("aborted by user")); + assertThat(t.getOutput(), containsString("Continue and remove corrupted data from the shard ?")); + } + + logger.info("--> output:\n{}", t.getOutput()); + + // run command without dry-run + t.reset(); + t.addTextInput("y"); + command.execute(t, options, environment); + + final String output = t.getOutput(); + logger.info("--> output:\n{}", output); + + // reopen shard + failOnShardFailures(); + final IndexShard newShard = newStartedShard(p -> reopenIndexShard(false), true); + + final Set shardDocUIDs = getShardDocUIDs(newShard); + + assertThat(shardDocUIDs.size(), equalTo(numDocsToKeep)); + + closeShards(newShard); + } + + public void testCorruptedBothIndexAndTranslog() throws Exception { + // index some docs in several segments + final int numDocsToKeep = indexDocs(indexShard, false); + + // close shard + closeShards(indexShard); + + CorruptionUtils.corruptIndex(random(), indexPath, false); + + // test corrupted shard + final IndexShard corruptedShard = reopenIndexShard(true); + allowShardFailures(); + expectThrows(IndexShardRecoveryException.class, () -> newStartedShard(p -> corruptedShard, true)); + closeShards(corruptedShard); + + TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogPath)); + + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(); + final MockTerminal t = new MockTerminal(); + final OptionParser parser = command.getParser(); + + final OptionSet options = parser.parse("-d", translogPath.toString()); + // run command with dry-run + t.addTextInput("n"); // mean dry run + t.addTextInput("n"); // mean dry run + t.setVerbosity(Terminal.Verbosity.VERBOSE); + try { + command.execute(t, options, environment); + fail(); + } catch (ElasticsearchException e) { + assertThat(e.getMessage(), containsString("aborted by user")); + assertThat(t.getOutput(), containsString("Continue and remove corrupted data from the shard ?")); + } + + logger.info("--> output:\n{}", t.getOutput()); + + // run command without dry-run + t.reset(); + t.addTextInput("y"); + command.execute(t, options, environment); + + final String output = t.getOutput(); + logger.info("--> output:\n{}", output); + + // reopen shard + failOnShardFailures(); + final IndexShard newShard = newStartedShard(p -> reopenIndexShard(false), true); + + final Set shardDocUIDs = getShardDocUIDs(newShard); + + final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?\\d+) documents will be lost."); + final Matcher matcher = pattern.matcher(output); + assertThat(matcher.find(), equalTo(true)); + final int expectedNumDocs = numDocsToKeep - Integer.parseInt(matcher.group("docs")); + + assertThat(shardDocUIDs.size(), equalTo(expectedNumDocs)); + + closeShards(newShard); + } + + public void testResolveIndexDirectory() throws Exception { + // index a single doc to have files on a disk + indexDoc(indexShard, "_doc", "0", "{}"); + flushShard(indexShard, true); + writeIndexState(); + + // close shard + closeShards(indexShard); + + final RemoveCorruptedShardDataCommand command = new RemoveCorruptedShardDataCommand(); + final OptionParser parser = command.getParser(); + + // `--index index_name --shard-id 0` has to be resolved to indexPath + final OptionSet options = parser.parse("--index", shardId.getIndex().getName(), + "--shard-id", Integer.toString(shardId.id())); + + command.findAndProcessShardPath(options, environment, + shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath))); + + final OptionSet options2 = parser.parse("--dir", indexPath.toAbsolutePath().toString()); + command.findAndProcessShardPath(options2, environment, + shardPath -> assertThat(shardPath.resolveIndex(), equalTo(indexPath))); + } + + private IndexShard reopenIndexShard(boolean corrupted) throws IOException { + // open shard with the same location + final ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), + RecoverySource.ExistingStoreRecoverySource.INSTANCE + ); + + final IndexMetaData metaData = IndexMetaData.builder(indexMetaData) + .settings(Settings.builder() + .put(indexShard.indexSettings().getSettings()) + .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum")) + .build(); + + CheckedFunction storeProvider = + corrupted == false ? null : + indexSettings -> { + final ShardId shardId = shardPath.getShardId(); + final BaseDirectoryWrapper baseDirectoryWrapper = newFSDirectory(shardPath.resolveIndex()); + // index is corrupted - don't even try to check index on close - it fails + baseDirectoryWrapper.setCheckIndexOnClose(false); + return new Store(shardId, indexSettings, baseDirectoryWrapper, new DummyShardLock(shardId)); + }; + + return newShard(shardRouting, shardPath, metaData, storeProvider, null, + indexShard.engineFactory, indexShard.getGlobalCheckpointSyncer(), EMPTY_EVENT_LISTENER); + } + + private int indexDocs(IndexShard indexShard, boolean flushLast) throws IOException { + // index some docs in several segments + int numDocs = 0; + int numDocsToKeep = 0; + for (int i = 0, attempts = randomIntBetween(5, 10); i < attempts; i++) { + final int numExtraDocs = between(10, 100); + for (long j = 0; j < numExtraDocs; j++) { + indexDoc(indexShard, "_doc", Long.toString(numDocs + j), "{}"); + } + numDocs += numExtraDocs; + + if (flushLast || i < attempts - 1) { + numDocsToKeep += numExtraDocs; + flushShard(indexShard, true); + } + } + + logger.info("--> indexed {} docs, {} to keep", numDocs, numDocsToKeep); + + writeIndexState(); + return numDocsToKeep; + } + + private void writeIndexState() throws IOException { + // create _state of IndexMetaData + try(NodeEnvironment nodeEnvironment = new NodeEnvironment(environment.settings(), environment, nId -> {})) { + final Path[] paths = nodeEnvironment.indexPaths(indexMetaData.getIndex()); + IndexMetaData.FORMAT.write(indexMetaData, paths); + logger.info("--> index metadata persisted to {} ", Arrays.toString(paths)); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java b/server/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java index 7d548fc42d695..71284792a6817 100644 --- a/server/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java +++ b/server/src/test/java/org/elasticsearch/index/store/CorruptedTranslogIT.java @@ -125,7 +125,7 @@ private void corruptRandomTranslogFile() throws IOException { } } Path translogDir = RandomPicks.randomFrom(random(), translogDirs); - TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir, TestTranslog.minTranslogGenUsedInRecovery(translogDir)); + TestTranslog.corruptRandomTranslogFile(logger, random(), Arrays.asList(translogDir)); } /** Disables translog flushing for the specified index */ diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index f37ec5a8e55d5..0e114233856c0 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -34,6 +34,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Collection; import java.util.List; import java.util.Random; import java.util.Set; @@ -52,13 +53,19 @@ public class TestTranslog { static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("translog-(\\d+)\\.tlog"); + public static void corruptRandomTranslogFile(Logger logger, Random random, Collection translogDirs) throws IOException { + for (Path translogDir : translogDirs) { + final long minTranslogGen = minTranslogGenUsedInRecovery(translogDir); + corruptRandomTranslogFile(logger, random, translogDir, minTranslogGen); + } + } + /** * Corrupts random translog file (translog-N.tlog) from the given translog directory. * * @return a translog file which has been corrupted. */ - public static Path corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws - IOException { + public static Path corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws IOException { Set candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic logger.info("--> Translog dir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration); try (DirectoryStream stream = Files.newDirectoryStream(translogDir)) { diff --git a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java b/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java deleted file mode 100644 index cd4605b7e2d01..0000000000000 --- a/server/src/test/java/org/elasticsearch/index/translog/TruncateTranslogIT.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.translog; - -import com.carrotsearch.randomizedtesting.generators.RandomPicks; -import joptsimple.OptionParser; -import joptsimple.OptionSet; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.apache.lucene.store.Lock; -import org.apache.lucene.store.LockObtainFailedException; -import org.apache.lucene.store.NativeFSLockFactory; -import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.cli.MockTerminal; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.MockEngineFactoryPlugin; -import org.elasticsearch.index.seqno.SeqNoStats; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.monitor.fs.FsInfo; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.engine.MockEngineSupport; -import org.elasticsearch.test.transport.MockTransportService; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList; -import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 0) -public class TruncateTranslogIT extends ESIntegTestCase { - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class, MockEngineFactoryPlugin.class); - } - - public void testCorruptTranslogTruncation() throws Exception { - internalCluster().startNodes(2, Settings.EMPTY); - - final String replicaNode = internalCluster().getNodeNames()[1]; - - assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - .put("index.refresh_interval", "-1") - .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog - .put("index.routing.allocation.exclude._name", replicaNode) - )); - ensureYellow(); - - assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() - .put("index.routing.allocation.exclude._name", (String)null) - )); - ensureGreen(); - - // Index some documents - int numDocsToKeep = randomIntBetween(0, 100); - logger.info("--> indexing [{}] docs to be kept", numDocsToKeep); - IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep]; - for (int i = 0; i < builders.length; i++) { - builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); - } - indexRandom(false, false, false, Arrays.asList(builders)); - flush("test"); - disableTranslogFlush("test"); - // having no extra docs is an interesting case for seq no based recoveries - test it more often - int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100); - logger.info("--> indexing [{}] more doc to be truncated", numDocsToTruncate); - builders = new IndexRequestBuilder[numDocsToTruncate]; - for (int i = 0; i < builders.length; i++) { - builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); - } - indexRandom(false, false, false, Arrays.asList(builders)); - Set translogDirs = getTranslogDirs("test"); - - TruncateTranslogCommand ttc = new TruncateTranslogCommand(); - MockTerminal t = new MockTerminal(); - OptionParser parser = ttc.getParser(); - - for (Path translogDir : translogDirs) { - OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b"); - // Try running it before the shard is closed, it should flip out because it can't acquire the lock - try { - logger.info("--> running truncate while index is open on [{}]", translogDir.toAbsolutePath()); - ttc.execute(t, options, null /* TODO: env should be real here, and ttc should actually use it... */); - fail("expected the truncate command to fail not being able to acquire the lock"); - } catch (Exception e) { - assertThat(e.getMessage(), containsString("Failed to lock shard's directory")); - } - } - - if (randomBoolean() && numDocsToTruncate > 0) { - // flush the replica, so it will have more docs than what the primary will have - Index index = resolveIndex("test"); - IndexShard replica = internalCluster().getInstance(IndicesService.class, replicaNode).getShardOrNull(new ShardId(index, 0)); - replica.flush(new FlushRequest()); - logger.info("--> performed extra flushing on replica"); - } - - // shut down the replica node to be tested later - internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode)); - - // Corrupt the translog file - logger.info("--> corrupting translog"); - corruptRandomTranslogFile("test"); - - // Restart the single node - logger.info("--> restarting node"); - internalCluster().restartRandomDataNode(); - client().admin().cluster().prepareHealth().setWaitForYellowStatus() - .setTimeout(new TimeValue(1000, TimeUnit.MILLISECONDS)) - .setWaitForEvents(Priority.LANGUID) - .get(); - - try { - client().prepareSearch("test").setQuery(matchAllQuery()).get(); - fail("all shards should be failed due to a corrupted translog"); - } catch (SearchPhaseExecutionException e) { - // Good, all shards should be failed because there is only a - // single shard and its translog is corrupt - } - - // Close the index so we can actually truncate the translog - logger.info("--> closing 'test' index"); - client().admin().indices().prepareClose("test").get(); - - for (Path translogDir : translogDirs) { - final Path idxLocation = translogDir.getParent().resolve("index"); - assertBusy(() -> { - logger.info("--> checking that lock has been released for {}", idxLocation); - try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE); - Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - // Great, do nothing, we just wanted to obtain the lock - } catch (LockObtainFailedException lofe) { - logger.info("--> failed acquiring lock for {}", idxLocation); - fail("still waiting for lock release at [" + idxLocation + "]"); - } catch (IOException ioe) { - fail("Got an IOException: " + ioe); - } - }); - - OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b"); - logger.info("--> running truncate translog command for [{}]", translogDir.toAbsolutePath()); - ttc.execute(t, options, null /* TODO: env should be real here, and ttc should actually use it... */); - logger.info("--> output:\n{}", t.getOutput()); - } - - // Re-open index - logger.info("--> opening 'test' index"); - client().admin().indices().prepareOpen("test").get(); - ensureYellow("test"); - - // Run a search and make sure it succeeds - assertHitCount(client().prepareSearch("test").setQuery(matchAllQuery()).get(), numDocsToKeep); - - logger.info("--> starting the replica node to test recovery"); - internalCluster().startNode(); - ensureGreen("test"); - for (String node : internalCluster().nodesInclude("test")) { - SearchRequestBuilder q = client().prepareSearch("test").setPreference("_only_nodes:" + node).setQuery(matchAllQuery()); - assertHitCount(q.get(), numDocsToKeep); - } - final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get(); - final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() - .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); - assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); - // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit. - final SeqNoStats seqNoStats = getSeqNoStats("test", 0); - assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); - assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); - } - - public void testCorruptTranslogTruncationOfReplica() throws Exception { - internalCluster().startNodes(2, Settings.EMPTY); - - final String primaryNode = internalCluster().getNodeNames()[0]; - final String replicaNode = internalCluster().getNodeNames()[1]; - - assertAcked(prepareCreate("test").setSettings(Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - .put("index.refresh_interval", "-1") - .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) // never flush - always recover from translog - .put("index.routing.allocation.exclude._name", replicaNode) - )); - ensureYellow(); - - assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() - .put("index.routing.allocation.exclude._name", (String)null) - )); - ensureGreen(); - - // Index some documents - int numDocsToKeep = randomIntBetween(0, 100); - logger.info("--> indexing [{}] docs to be kept", numDocsToKeep); - IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocsToKeep]; - for (int i = 0; i < builders.length; i++) { - builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); - } - indexRandom(false, false, false, Arrays.asList(builders)); - flush("test"); - disableTranslogFlush("test"); - // having no extra docs is an interesting case for seq no based recoveries - test it more often - int numDocsToTruncate = randomBoolean() ? 0 : randomIntBetween(0, 100); - logger.info("--> indexing [{}] more docs to be truncated", numDocsToTruncate); - builders = new IndexRequestBuilder[numDocsToTruncate]; - for (int i = 0; i < builders.length; i++) { - builders[i] = client().prepareIndex("test", "type").setSource("foo", "bar"); - } - indexRandom(false, false, false, Arrays.asList(builders)); - final int totalDocs = numDocsToKeep + numDocsToTruncate; - - - // sample the replica node translog dirs - final ShardId shardId = new ShardId(resolveIndex("test"), 0); - Set translogDirs = getTranslogDirs(replicaNode, shardId); - Path tdir = randomFrom(translogDirs); - - // stop the cluster nodes. we don't use full restart so the node start up order will be the same - // and shard roles will be maintained - internalCluster().stopRandomDataNode(); - internalCluster().stopRandomDataNode(); - - // Corrupt the translog file - logger.info("--> corrupting translog"); - TestTranslog.corruptRandomTranslogFile(logger, random(), tdir, TestTranslog.minTranslogGenUsedInRecovery(tdir)); - - // Restart the single node - logger.info("--> starting node"); - internalCluster().startNode(); - - ensureYellow(); - - // Run a search and make sure it succeeds - assertHitCount(client().prepareSearch("test").setQuery(matchAllQuery()).get(), totalDocs); - - TruncateTranslogCommand ttc = new TruncateTranslogCommand(); - MockTerminal t = new MockTerminal(); - OptionParser parser = ttc.getParser(); - - for (Path translogDir : translogDirs) { - final Path idxLocation = translogDir.getParent().resolve("index"); - assertBusy(() -> { - logger.info("--> checking that lock has been released for {}", idxLocation); - try (Directory dir = FSDirectory.open(idxLocation, NativeFSLockFactory.INSTANCE); - Lock writeLock = dir.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - // Great, do nothing, we just wanted to obtain the lock - } catch (LockObtainFailedException lofe) { - logger.info("--> failed acquiring lock for {}", idxLocation); - fail("still waiting for lock release at [" + idxLocation + "]"); - } catch (IOException ioe) { - fail("Got an IOException: " + ioe); - } - }); - - OptionSet options = parser.parse("-d", translogDir.toAbsolutePath().toString(), "-b"); - logger.info("--> running truncate translog command for [{}]", translogDir.toAbsolutePath()); - ttc.execute(t, options, null /* TODO: env should be real here, and ttc should actually use it... */); - logger.info("--> output:\n{}", t.getOutput()); - } - - logger.info("--> starting the replica node to test recovery"); - internalCluster().startNode(); - ensureGreen("test"); - for (String node : internalCluster().nodesInclude("test")) { - assertHitCount(client().prepareSearch("test").setPreference("_only_nodes:" + node).setQuery(matchAllQuery()).get(), totalDocs); - } - - final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get(); - final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() - .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); - // the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery - assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0)); - // Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit. - final SeqNoStats seqNoStats = getSeqNoStats("test", 0); - assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); - assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo())); - } - - private Set getTranslogDirs(String indexName) throws IOException { - ClusterState state = client().admin().cluster().prepareState().get().getState(); - GroupShardsIterator shardIterators = state.getRoutingTable().activePrimaryShardsGrouped(new String[]{indexName}, false); - List iterators = iterableAsArrayList(shardIterators); - ShardIterator shardIterator = RandomPicks.randomFrom(random(), iterators); - ShardRouting shardRouting = shardIterator.nextOrNull(); - assertNotNull(shardRouting); - assertTrue(shardRouting.primary()); - assertTrue(shardRouting.assignedToNode()); - String nodeId = shardRouting.currentNodeId(); - ShardId shardId = shardRouting.shardId(); - return getTranslogDirs(nodeId, shardId); - } - - private Set getTranslogDirs(String nodeId, ShardId shardId) { - NodesStatsResponse nodeStatses = client().admin().cluster().prepareNodesStats(nodeId).setFs(true).get(); - Set translogDirs = new TreeSet<>(); // treeset makes sure iteration order is deterministic - for (FsInfo.Path fsPath : nodeStatses.getNodes().get(0).getFs()) { - String path = fsPath.getPath(); - final String relativeDataLocationPath = "indices/"+ shardId.getIndex().getUUID() +"/" + Integer.toString(shardId.getId()) - + "/translog"; - Path translogPath = PathUtils.get(path).resolve(relativeDataLocationPath); - if (Files.isDirectory(translogPath)) { - translogDirs.add(translogPath); - } - } - return translogDirs; - } - - private void corruptRandomTranslogFile(String indexName) throws IOException { - Set translogDirs = getTranslogDirs(indexName); - Path translogDir = randomFrom(translogDirs); - TestTranslog.corruptRandomTranslogFile(logger, random(), translogDir, TestTranslog.minTranslogGenUsedInRecovery(translogDir)); - } - - /** Disables translog flushing for the specified index */ - private static void disableTranslogFlush(String index) { - Settings settings = Settings.builder() - .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) - .build(); - client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get(); - } - - private SeqNoStats getSeqNoStats(String index, int shardId) { - final ShardStats[] shardStats = client().admin().indices() - .prepareStats(index).get() - .getIndices().get(index).getShards(); - return shardStats[shardId].getSeqNoStats(); - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 78ce5bc500ce8..540b68ee40932 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -133,7 +133,7 @@ public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName(), threadPoolSettings()); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards - failOnShardFailures.set(true); + failOnShardFailures(); } @Override @@ -154,6 +154,10 @@ protected void allowShardFailures() { failOnShardFailures.set(false); } + protected void failOnShardFailures() { + failOnShardFailures.set(true); + } + public Settings threadPoolSettings() { return Settings.EMPTY; } @@ -233,7 +237,7 @@ protected IndexShard newShard( .settings(indexSettings) .primaryTerm(0, primaryTerm) .putMapping("_doc", "{ \"properties\": {} }"); - return newShard(shardRouting, metaData.build(), engineFactory, listeners); + return newShard(shardRouting, metaData.build(), null, engineFactory, () -> {}, listeners); } /** @@ -279,7 +283,6 @@ protected IndexShard newShard(ShardId shardId, boolean primary, String nodeId, I return newShard(shardRouting, indexMetaData, searcherWrapper, new InternalEngineFactory(), globalCheckpointSyncer); } - /** * creates a new initializing shard. The shard will will be put in its proper path under the * current node id the shard is assigned to. diff --git a/test/framework/src/main/java/org/elasticsearch/test/CorruptionUtils.java b/test/framework/src/main/java/org/elasticsearch/test/CorruptionUtils.java index df306dfc9e32c..59c4c942fe259 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/CorruptionUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/CorruptionUtils.java @@ -21,6 +21,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; @@ -47,6 +48,23 @@ public final class CorruptionUtils { private static Logger logger = ESLoggerFactory.getLogger("test"); private CorruptionUtils() {} + public static void corruptIndex(Random random, Path indexPath, boolean corruptSegments) throws IOException { + // corrupt files + final Path[] filesToCorrupt = + Files.walk(indexPath) + .filter(p -> { + final String name = p.getFileName().toString(); + boolean segmentFile = name.startsWith("segments_") || name.endsWith(".si"); + return Files.isRegularFile(p) + && name.startsWith("extra") == false // Skip files added by Lucene's ExtrasFS + && IndexWriter.WRITE_LOCK_NAME.equals(name) == false + && (corruptSegments ? segmentFile : segmentFile == false); + } + ) + .toArray(Path[]::new); + corruptFile(random, filesToCorrupt); + } + /** * Corrupts a random file at a random position */