diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index fa992e12ef220..902cf4b992b11 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -731,7 +731,7 @@ public String toString() { /** * Represents a snapshot of the current directory build from the latest Lucene commit. - * Only files that are part of the last commit are considered in this datastrucutre. + * Only files that are part of the last commit are considered in this datastructure. * For backwards compatibility the snapshot might include legacy checksums that * are derived from a dedicated checksum file written by older elasticsearch version pre 1.3 *

diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 939c33d00a8d4..84d3d743f6402 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -35,7 +35,6 @@ import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; -import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; @@ -110,6 +109,7 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -1451,6 +1451,9 @@ public void restore() throws IOException { SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); Store.MetadataSnapshot recoveryTargetMetadata; try { + // this will throw an IOException if the store has no segments infos file. The + // store can still have existing files but they will be deleted just before being + // restored. recoveryTargetMetadata = targetShard.snapshotStoreMetadata(); } catch (IndexNotFoundException e) { // happens when restore to an empty shard, not a big deal @@ -1478,7 +1481,14 @@ public void restore() throws IOException { snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); fileInfos.put(fileInfo.metadata().name(), fileInfo); } + final Store.MetadataSnapshot sourceMetaData = new Store.MetadataSnapshot(unmodifiableMap(snapshotMetaData), emptyMap(), 0); + + final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); + if (restoredSegmentsFile == null) { + throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); + } + final Store.RecoveryDiff diff = sourceMetaData.recoveryDiff(recoveryTargetMetadata); for (StoreFileMetaData md : diff.identical) { BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfos.get(md.name()); @@ -1505,29 +1515,31 @@ public void restore() throws IOException { logger.trace("no files to recover, all exists within the local store"); } - if (logger.isTraceEnabled()) { - logger.trace("[{}] [{}] recovering_files [{}] with total_size [{}], reusing_files [{}] with reused_size [{}]", shardId, snapshotId, - index.totalRecoverFiles(), new ByteSizeValue(index.totalRecoverBytes()), index.reusedFileCount(), new ByteSizeValue(index.reusedFileCount())); - } try { - // first, delete pre-existing files in the store that have the same name but are - // different (i.e. different length/checksum) from those being restored in the snapshot - for (final StoreFileMetaData storeFileMetaData : diff.different) { - IOUtils.deleteFiles(store.directory(), storeFileMetaData.name()); - } + // list of all existing store files + final List deleteIfExistFiles = Arrays.asList(store.directory().listAll()); + // restore the files from the snapshot to the Lucene store for (final BlobStoreIndexShardSnapshot.FileInfo fileToRecover : filesToRecover) { + // if a file with a same physical name already exist in the store we need to delete it + // before restoring it from the snapshot. We could be lenient and try to reuse the existing + // store files (and compare their names/length/checksum again with the snapshot files) but to + // avoid extra complexity we simply delete them and restore them again like StoreRecovery + // does with dangling indices. Any existing store file that is not restored from the snapshot + // will be clean up by RecoveryTarget.cleanFiles(). + final String physicalName = fileToRecover.physicalName(); + if (deleteIfExistFiles.contains(physicalName)) { + logger.trace("[{}] [{}] deleting pre-existing file [{}]", shardId, snapshotId, physicalName); + store.directory().deleteFile(physicalName); + } + logger.trace("[{}] [{}] restoring file [{}]", shardId, snapshotId, fileToRecover.name()); restoreFile(fileToRecover, store); } } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); } - final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); - if (recoveryTargetMetadata == null) { - throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); - } - assert restoredSegmentsFile != null; + // read the snapshot data persisted final SegmentInfos segmentCommitInfos; try { @@ -1602,5 +1614,4 @@ private void restoreFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, fi } } } - } diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index e6e6bc82173bf..f88299661828c 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -188,7 +188,7 @@ public void restoreSnapshot(final RestoreRequest request, final ActionListener filteredIndices = SnapshotUtils.filterIndices(snapshotInfo.indices(), request.indices(), request.indicesOptions()); - MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices)); + final MetaData metaData = repository.getSnapshotMetaData(snapshotInfo, repositoryData.resolveIndices(filteredIndices)); // Make sure that we can restore from this snapshot validateSnapshotRestorable(request.repositoryName, snapshotInfo); diff --git a/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java new file mode 100644 index 0000000000000..8470b46ceb939 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories.blobstore; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.IOUtils; +import org.apache.lucene.util.TestUtil; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.IndexShardTestCase; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.snapshots.Snapshot; +import org.elasticsearch.snapshots.SnapshotId; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; + +import static org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE; + +/** + * This class tests the behavior of {@link BlobStoreRepository} when it + * restores a shard from a snapshot but some files with same names already + * exist on disc. + */ +public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { + + /** + * Restoring a snapshot that contains multiple files must succeed even when + * some files already exist in the shard's store. + */ + public void testRestoreSnapshotWithExistingFiles() throws IOException { + final IndexId indexId = new IndexId(randomAlphaOfLength(10), UUIDs.randomBase64UUID()); + final ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), 0); + + IndexShard shard = newShard(shardId, true); + try { + // index documents in the shards + final int numDocs = scaledRandomIntBetween(1, 500); + recoverShardFromStore(shard); + for (int i = 0; i < numDocs; i++) { + indexDoc(shard, "doc", Integer.toString(i)); + if (rarely()) { + flushShard(shard, false); + } + } + assertDocCount(shard, numDocs); + + // snapshot the shard + final Repository repository = createRepository(); + final Snapshot snapshot = new Snapshot(repository.getMetadata().name(), new SnapshotId(randomAlphaOfLength(10), "_uuid")); + snapshotShard(shard, snapshot, repository); + + // capture current store files + final Store.MetadataSnapshot storeFiles = shard.snapshotStoreMetadata(); + assertFalse(storeFiles.asMap().isEmpty()); + + // close the shard + closeShards(shard); + + // delete some random files in the store + List deletedFiles = randomSubsetOf(randomIntBetween(1, storeFiles.size() - 1), storeFiles.asMap().keySet()); + for (String deletedFile : deletedFiles) { + Files.delete(shard.shardPath().resolveIndex().resolve(deletedFile)); + } + + // build a new shard using the same store directory as the closed shard + ShardRouting shardRouting = ShardRoutingHelper.initWithSameId(shard.routingEntry(), EXISTING_STORE_INSTANCE); + shard = newShard(shardRouting, shard.shardPath(), shard.indexSettings().getIndexMetaData(), null, null, () -> {}); + + // restore the shard + recoverShardFromSnapshot(shard, snapshot, repository); + + // check that the shard is not corrupted + TestUtil.checkIndex(shard.store().directory()); + + // check that all files have been restored + final Directory directory = shard.store().directory(); + final List directoryFiles = Arrays.asList(directory.listAll()); + + for (StoreFileMetaData storeFile : storeFiles) { + String fileName = storeFile.name(); + assertTrue("File [" + fileName + "] does not exist in store directory", directoryFiles.contains(fileName)); + assertEquals(storeFile.length(), shard.store().directory().fileLength(fileName)); + } + } finally { + if (shard != null && shard.state() != IndexShardState.CLOSED) { + try { + shard.close("test", false); + } finally { + IOUtils.close(shard.store()); + } + } + } + } + + /** Create a {@link Repository} with a random name **/ + private Repository createRepository() throws IOException { + Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); + RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); + return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry()); + } + + /** Create a {@link Environment} with random path.home and path.repo **/ + private Environment createEnvironment() { + Path home = createTempDir(); + return new Environment(Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath()) + .build()); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index a448444edce1c..242cdfcfd7e5e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -46,6 +46,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; @@ -60,6 +61,7 @@ import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; @@ -69,6 +71,9 @@ import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.StartRecoveryRequest; import org.elasticsearch.node.Node; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -85,6 +90,7 @@ import java.util.function.BiFunction; import java.util.function.Consumer; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.hasSize; @@ -583,6 +589,38 @@ protected void flushShard(IndexShard shard, boolean force) { shard.flush(new FlushRequest(shard.shardId().getIndexName()).force(force)); } + /** Recover a shard from a snapshot using a given repository **/ + protected void recoverShardFromSnapshot(final IndexShard shard, + final Snapshot snapshot, + final Repository repository) throws IOException { + final Version version = Version.CURRENT; + final ShardId shardId = shard.shardId(); + final String index = shardId.getIndexName(); + final IndexId indexId = new IndexId(shardId.getIndex().getName(), shardId.getIndex().getUUID()); + final DiscoveryNode node = getFakeDiscoNode(shard.routingEntry().currentNodeId()); + final RecoverySource.SnapshotRecoverySource recoverySource = new RecoverySource.SnapshotRecoverySource(snapshot, version, index); + final ShardRouting shardRouting = newShardRouting(shardId, node.getId(), true, ShardRoutingState.INITIALIZING, recoverySource); + + shard.markAsRecovering("from snapshot", new RecoveryState(shardRouting, node, null)); + repository.restoreShard(shard, snapshot.getSnapshotId(), version, indexId, shard.shardId(), shard.recoveryState()); + } + + /** Snapshot a shard using a given repository **/ + protected void snapshotShard(final IndexShard shard, + final Snapshot snapshot, + final Repository repository) throws IOException { + final IndexShardSnapshotStatus snapshotStatus = new IndexShardSnapshotStatus(); + try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(true)) { + Index index = shard.shardId().getIndex(); + IndexId indexId = new IndexId(index.getName(), index.getUUID()); + + repository.snapshotShard(shard, snapshot.getSnapshotId(), indexId, indexCommitRef.getIndexCommit(), snapshotStatus); + } + assertEquals(IndexShardSnapshotStatus.Stage.DONE, snapshotStatus.stage()); + assertEquals(shard.snapshotStoreMetadata().size(), snapshotStatus.numberOfFiles()); + assertNull(snapshotStatus.failure()); + } + /** * Helper method to access (package-protected) engine from tests */