Skip to content

Commit

Permalink
Use ClusterState as Consistency Source for Snapshot Repositories (ela…
Browse files Browse the repository at this point in the history
…stic#49060)

Follow up to elastic#49729

This change removes falling back to listing out the repository contents to find the latest `index-N` in write-mounted blob store repositories.
This saves 2-3 list operations on each snapshot create and delete operation. Also it makes all the snapshot status APIs cheaper (and faster) by saving one list operation there as well in many cases.
This removes the resiliency to concurrent modifications of the repository as a result and puts a repository in a `corrupted` state in case loading `RepositoryData` failed from the assumed generation.
  • Loading branch information
original-brownbear committed Dec 17, 2019
1 parent ce294e1 commit 51adb68
Show file tree
Hide file tree
Showing 8 changed files with 457 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public final class RepositoryData {
*/
public static final long UNKNOWN_REPO_GEN = -2L;

/**
* The generation value indicating that the repository generation could not be determined.
*/
public static final long CORRUPTED_REPO_GEN = -3L;

/**
* An instance initialized for an empty repository.
*/
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -193,13 +194,16 @@ public void testSnapshotWithConflictingName() throws IOException {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
BlobStoreTestUtil.mockClusterService(repositoryMetaData)) {
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually
}
};
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
repository.start();
return repository;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;

import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;

import java.nio.file.Files;
import java.nio.file.Path;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCase {

public void testConcurrentlyChangeRepositoryContents() throws Exception {
Client client = client();

Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData =
getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName));
Files.move(repo.resolve("index-" + repositoryData.getGenId()), repo.resolve("index-" + (repositoryData.getGenId() + 1)));

assertRepositoryBlocked(client, repoName, snapshot);

if (randomBoolean()) {
logger.info("--> move index-N blob back to initial generation");
Files.move(repo.resolve("index-" + (repositoryData.getGenId() + 1)), repo.resolve("index-" + repositoryData.getGenId()));

logger.info("--> verify repository remains blocked");
assertRepositoryBlocked(client, repoName, snapshot);
}

logger.info("--> remove repository");
assertAcked(client.admin().cluster().prepareDeleteRepository(repoName));

logger.info("--> recreate repository");
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get());
}

public void testConcurrentlyChangeRepositoryContentsInBwCMode() throws Exception {
Client client = client();

Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put(BlobStoreRepository.ALLOW_CONCURRENT_MODIFICATION.getKey(), true)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

final Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repository);
final long beforeMoveGen = repositoryData.getGenId();
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));

logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 1));

logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 2));

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get());
}

public void testFindDanglingLatestGeneration() throws Exception {
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repository);
final long beforeMoveGen = repositoryData.getGenId();
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));

logger.info("--> set next generation as pending in the cluster state");
final PlainActionFuture<Void> csUpdateFuture = PlainActionFuture.newFuture();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
.putCustom(RepositoriesMetaData.TYPE,
currentState.metaData().<RepositoriesMetaData>custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build();
}

@Override
public void onFailure(String source, Exception e) {
csUpdateFuture.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
csUpdateFuture.onResponse(null);
}
}
);
csUpdateFuture.get();

logger.info("--> full cluster restart");
internalCluster().fullRestart();
ensureGreen();

Repository repositoryAfterRestart = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 1));

logger.info("--> delete snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 2));

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get());
}

private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
logger.info("--> try to delete snapshot");
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
() -> client.admin().cluster().prepareDeleteSnapshot(repo, existingSnapshot).execute().actionGet());
assertThat(repositoryException3.getMessage(),
containsString("Could not read repository data because the contents of the repository do not match its expected state."));

logger.info("--> try to create snapshot");
final RepositoryException repositoryException4 = expectThrows(RepositoryException.class,
() -> client.admin().cluster().prepareCreateSnapshot(repo, existingSnapshot).execute().actionGet());
assertThat(repositoryException4.getMessage(),
containsString("Could not read repository data because the contents of the repository do not match its expected state."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public void testOverwriteSnapshotInfoBlob() {
try (BlobStoreRepository repository =
new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) {
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
repository.start();

// We create a snap- blob for snapshot "foo" in the first generation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
deleteAndAssertEmpty(getRepository().basePath());
client().admin().cluster().prepareDeleteRepository("test-repo").get();
super.tearDown();
}

Expand Down Expand Up @@ -168,8 +169,6 @@ protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, Blo
}

public void testCleanup() throws Exception {
createRepository("test-repo");

createIndex("test-idx-1");
createIndex("test-idx-2");
createIndex("test-idx-3");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.repositories.blobstore;

import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand All @@ -29,6 +30,8 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -69,6 +72,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.elasticsearch.test.ESTestCase.buildNewFakeTransportAddress;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -326,7 +330,11 @@ private static ClusterService mockClusterService(ClusterState initialState) {
final ClusterService clusterService = mock(ClusterService.class);
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
final AtomicReference<ClusterState> currentState = new AtomicReference<>(initialState);
// Setting local node as master so it may update the repository metadata in the cluster state
final DiscoveryNode localNode = new DiscoveryNode("", buildNewFakeTransportAddress(), Version.CURRENT);
final AtomicReference<ClusterState> currentState = new AtomicReference<>(
ClusterState.builder(initialState).nodes(
DiscoveryNodes.builder().add(localNode).masterNodeId(localNode.getId()).localNodeId(localNode.getId()).build()).build());
when(clusterService.state()).then(invocationOnMock -> currentState.get());
final List<ClusterStateApplier> appliers = new CopyOnWriteArrayList<>();
doAnswer(invocation -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
Expand Down Expand Up @@ -353,8 +354,12 @@ private Environment createEnvironment() {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
BlobStoreTestUtil.mockClusterService(repositoryMetaData));
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
final Repository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService);
clusterService.addStateApplier(e -> repository.updateState(e.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
return repository;
}

private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {
Expand Down

0 comments on commit 51adb68

Please sign in to comment.