Skip to content

Commit

Permalink
correct handling unclean full cluster restart
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Dec 16, 2019
1 parent 3dfcf10 commit e3f7ff2
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final ClusterService clusterService;

private boolean uncleanStart;

/**
* This flag indicates that the repository can not exclusively rely on the value stored in {@link #latestKnownRepoGen} to determine the
* latest repository generation but must inspect its physical contents as well via {@link #latestIndexBlobId()}.
Expand Down Expand Up @@ -274,6 +276,8 @@ protected BlobStoreRepository(

@Override
protected void doStart() {
uncleanStart = metadata.pendingGeneration() > RepositoryData.EMPTY_REPO_GEN &&
metadata.generation() != metadata.pendingGeneration();
ByteSizeValue chunkSize = chunkSize();
if (chunkSize != null && chunkSize.getBytes() <= 0) {
throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]");
Expand Down Expand Up @@ -305,7 +309,9 @@ protected void doClose() {
@Override
public void updateState(ClusterState state) {
metadata = getRepoMetaData(state);
bestEffortConsistency = isReadOnly() || state.nodes().getMinNodeVersion().before(RepositoryMetaData.REPO_GEN_IN_CS_VERSION)
uncleanStart = uncleanStart && metadata.generation() != metadata.pendingGeneration();
bestEffortConsistency = uncleanStart || isReadOnly()
|| state.nodes().getMinNodeVersion().before(RepositoryMetaData.REPO_GEN_IN_CS_VERSION)
|| metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN || ALLOW_CONCURRENT_MODIFICATION.get(metadata.settings());
if (isReadOnly()) {
// No need to waste cycles, no operations can run against a read-only repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
package org.elasticsearch.snapshots;

import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -148,6 +154,82 @@ public void testConcurrentlyChangeRepositoryContentsInBwCMode() throws Exception
.addSnapshots(snapshot).get().getSnapshots(repoName));
}

public void testFindDanglingLatestGeneration() throws Exception {
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex("test-idx-2").setSource("foo", "bar"));

final String snapshot = "test-snap";

logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repository);
final long beforeMoveGen = repositoryData.getGenId();
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));

logger.info("--> set next generation as pending in the cluster state");
final PlainActionFuture<Void> csUpdateFuture = PlainActionFuture.newFuture();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
.putCustom(RepositoriesMetaData.TYPE,
currentState.metaData().<RepositoriesMetaData>custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build();
}

@Override
public void onFailure(String source, Exception e) {
csUpdateFuture.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
csUpdateFuture.onResponse(null);
}
}
);
csUpdateFuture.get();

logger.info("--> full cluster restart");
internalCluster().fullRestart();

Repository repositoryAfterRestart = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);

logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 1));

logger.info("--> delete snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();

logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 2));

logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get().getSnapshots(repoName));
}

private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
logger.info("--> try to delete snapshot");
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
Expand Down

0 comments on commit e3f7ff2

Please sign in to comment.