Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete shard store files before restoring a snapshot #27476

Merged
merged 2 commits into from
Nov 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ public String toString() {

/**
* Represents a snapshot of the current directory build from the latest Lucene commit.
* Only files that are part of the last commit are considered in this datastrucutre.
* Only files that are part of the last commit are considered in this datastructure.
* For backwards compatibility the snapshot might include legacy checksums that
* are derived from a dedicated checksum file written by older elasticsearch version pre 1.3
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
Expand Down Expand Up @@ -110,6 +109,7 @@
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -1451,6 +1451,9 @@ public void restore() throws IOException {
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
Store.MetadataSnapshot recoveryTargetMetadata;
try {
// this will throw an IOException if the store has no segments infos file. The
// store can still have existing files but they will be deleted just before being
// restored.
recoveryTargetMetadata = targetShard.snapshotStoreMetadata();
} catch (IndexNotFoundException e) {
// happens when restore to an empty shard, not a big deal
Expand Down Expand Up @@ -1478,7 +1481,14 @@ public void restore() throws IOException {
snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata());
fileInfos.put(fileInfo.metadata().name(), fileInfo);
}

final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0);

final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile();
if (restoredSegmentsFile == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you fixed the check here (before it read recoveryTargetMetadata == null which did not make any sense as that one was always non-null). I wonder what impact this bug fix has. What change in behavior do we expect by this (it's a bit unclear to me what this check does)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly there's no test for this and as you noticed the previous check did not make any sense. I saw it as a bug and I think that we can't restore a snapshot without segments file so I don't expect any impact for this. Maybe @imotov has more knowledge?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot really think of a scenario where snapshot would have no segments. So, removing check for recoveryTargetMetadata and replacing it with check for no snapshots shouldn't have any change in behavior expect in some pathological cases that would fail anyway. Now they will at least fail with a reasonable error message.

throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
}

final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata);
for (StoreFileMetaData md : diff.identical) {
BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name());
Expand All @@ -1505,29 +1515,31 @@ public void restore() throws IOException {
logger.trace("no files to recover, all exists within the local store");
}

if (logger.isTraceEnabled()) {
logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId,
index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount()));
}
try {
// first, delete pre-existing files in the store that have the same name but are
// different (i.e. different length/checksum) from those being restored in the snapshot
for (final StoreFileMetaData storeFileMetaData : diff.different) {
IOUtils.deleteFiles(store.directory(), storeFileMetaData.name());
}
// list of all existing store files
final List<String> deleteIfExistFiles = Arrays.asList(store.directory().listAll());

// restore the files from the snapshot to the Lucene store
for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) {
// if a file with a same physical name already exist in the store we need to delete it
// before restoring it from the snapshot. We could be lenient and try to reuse the existing
// store files (and compare their names/length/checksum again with the snapshot files) but to
// avoid extra complexity we simply delete them and restore them again like StoreRecovery
// does with dangling indices. Any existing store file that is not restored from the snapshot
// will be clean up by RecoveryTarget.cleanFiles().
final String physicalName = fileToRecover.physicalName();
if (deleteIfExistFiles.contains(physicalName)) {
logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName);
store.directory().deleteFile(physicalName);
}

logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name());
restoreFile(fileToRecover, store);
}
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
}
final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile();
if (recoveryTargetMetadata == null) {
throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
}
assert restoredSegmentsFile != null;

// read the snapshot data persisted
final SegmentInfos segmentCommitInfos;
try {
Expand Down Expand Up @@ -1602,5 +1614,4 @@ private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, fi
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -189,7 +188,7 @@ public void restoreSnapshot(final RestoreRequest request, final ActionListener<R
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
final Snapshot snapshot = new Snapshot(request.repositoryName, snapshotId);
List<String> filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions());
MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices));
final MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices));

// Make sure that we can restore from this snapshot
validateSnapshotRestorable(request.repositoryName, snapshotInfo);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories.blobstore;

import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;

import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE;

/**
* This class tests the behavior of {@link BlobStoreRepository} when it
* restores a shard from a snapshot but some files with same names already
* exist on disc.
*/
public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {

/**
* Restoring a snapshot that contains multiple files must succeed even when
* some files already exist in the shard's store.
*/
public void testRestoreSnapshotWithExistingFiles() throws IOException {
final IndexId indexId = new IndexId(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
final ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), 0);

IndexShard shard = newShard(shardId, true);
try {
// index documents in the shards
final int numDocs = scaledRandomIntBetween(1, 500);
recoverShardFromStore(shard);
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "doc", Integer.toString(i));
if (rarely()) {
flushShard(shard, false);
}
}
assertDocCount(shard, numDocs);

// snapshot the shard
final Repository repository = createRepository();
final Snapshot snapshot = new Snapshot(repository.getMetadata().name(), new SnapshotId(randomAlphaOfLength(10), "_uuid"));
snapshotShard(shard, snapshot, repository);

// capture current store files
final Store.MetadataSnapshot storeFiles = shard.snapshotStoreMetadata();
assertFalse(storeFiles.asMap().isEmpty());

// close the shard
closeShards(shard);

// delete some random files in the store
List<String> deletedFiles = randomSubsetOf(randomIntBetween(1, storeFiles.size() - 1), storeFiles.asMap().keySet());
for (String deletedFile : deletedFiles) {
Files.delete(shard.shardPath().resolveIndex().resolve(deletedFile));
}

// build a new shard using the same store directory as the closed shard
ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(shard.routingEntry(), EXISTING_STORE_INSTANCE);
shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, () -> {});

// restore the shard
recoverShardFromSnapshot(shard, snapshot, repository);

// check that the shard is not corrupted
TestUtil.checkIndex(shard.store().directory());

// check that all files have been restored
final Directory directory = shard.store().directory();
final List<String> directoryFiles = Arrays.asList(directory.listAll());

for (StoreFileMetaData storeFile : storeFiles) {
String fileName = storeFile.name();
assertTrue("File [" + fileName + "] does not exist in store directory", directoryFiles.contains(fileName));
assertEquals(storeFile.length(), shard.store().directory().fileLength(fileName));
}
} finally {
if (shard != null && shard.state() != IndexShardState.CLOSED) {
try {
shard.close("test", false);
} finally {
IOUtils.close(shard.store());
}
}
}
}

/** Create a {@link Repository} with a random name **/
private Repository createRepository() throws IOException {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry());
}

/** Create a {@link Environment} with random path.home and path.repo **/
private Environment createEnvironment() {
Path home = createTempDir();
return TestEnvironment.newEnvironment(Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath())
.put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath())
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.VersionType;
Expand All @@ -60,6 +61,7 @@
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
Expand All @@ -69,6 +71,9 @@
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.node.Node;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand All @@ -85,6 +90,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;

import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;

Expand Down Expand Up @@ -583,6 +589,38 @@ protected void flushShard(IndexShard shard, boolean force) {
shard.flush(new FlushRequest(shard.shardId().getIndexName()).force(force));
}

/** Recover a shard from a snapshot using a given repository **/
protected void recoverShardFromSnapshot(final IndexShard shard,
final Snapshot snapshot,
final Repository repository) throws IOException {
final Version version = Version.CURRENT;
final ShardId shardId = shard.shardId();
final String index = shardId.getIndexName();
final IndexId indexId = new IndexId(shardId.getIndex().getName(), shardId.getIndex().getUUID());
final DiscoveryNode node = getFakeDiscoNode(shard.routingEntry().currentNodeId());
final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource(snapshot, version, index);
final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, recoverySource, ShardRoutingState.INITIALIZING);

shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null));
repository.restoreShard(shard, snapshot.getSnapshotId(), version, indexId, shard.shardId(), shard.recoveryState());
}

/** Snapshot a shard using a given repository **/
protected void snapshotShard(final IndexShard shard,
final Snapshot snapshot,
final Repository repository) throws IOException {
final IndexShardSnapshotStatus snapshotStatus = new IndexShardSnapshotStatus();
try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(true)) {
Index index = shard.shardId().getIndex();
IndexId indexId = new IndexId(index.getName(), index.getUUID());

repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus);
}
assertEquals(IndexShardSnapshotStatus.Stage.DONE, snapshotStatus.stage());
assertEquals(shard.snapshotStoreMetadata().size(), snapshotStatus.numberOfFiles());
assertNull(snapshotStatus.failure());
}

/**
* Helper method to access (package-protected) engine from tests
*/
Expand Down