Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check all actions preventing snapshot delete during retention run #45992

Merged
merged 9 commits into from
Sep 3, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private Entry(StreamInput in) throws IOException {
repositoryStateId = in.readLong();
}

private Entry(String repository, long repositoryStateId) {
public Entry(String repository, long repositoryStateId) {
this.repository = repository;
this.repositoryStateId = repositoryStateId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
// the list of snapshot deletion request entries
private final List<Entry> entries;

private SnapshotDeletionsInProgress(List<Entry> entries) {
public SnapshotDeletionsInProgress(List<Entry> entries) {
this.entries = Collections.unmodifiableList(entries);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryItem.DELETE_OPERATION;
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_INDEX_PREFIX;
import static org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT.getStepKeyForIndex;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -383,7 +384,11 @@ public void testSnapshotInProgress() throws Exception {
});

// Cancel the snapshot since it is not going to complete quickly
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + snapshotName)));
try {
client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + snapshotName));
} catch (Exception e) {
// ignore
}
}
}

Expand All @@ -403,9 +408,9 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
initializeRepo(slowRepo, "1b");
initializeRepo(fastRepo, "10mb");

createSnapshotPolicy(slowPolicy, "snap", "1 2 3 4 5 ?", slowRepo, indexName, true,
createSnapshotPolicy(slowPolicy, "slow-snap", "1 2 3 4 5 ?", slowRepo, indexName, true,
new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null));
createSnapshotPolicy(fastPolicy, "snap", "1 2 3 4 5 ?", fastRepo, indexName, true,
createSnapshotPolicy(fastPolicy, "fast-snap", "1 2 3 4 5 ?", fastRepo, indexName, true,
new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null));

// Create a snapshot and wait for it to be complete (need something that can be deleted)
Expand All @@ -419,6 +424,19 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
logger.info("--> waiting for snapshot {} to be successful, got: {}", completedSnapshotName, snaps);
List<Map<String, Object>> snaps2 = (List<Map<String, Object>>) snaps.get("snapshots");
assertThat(snaps2.get(0).get("state"), equalTo("SUCCESS"));

// Check that no in_progress snapshots show up
Response response = client().performRequest(new Request("GET", "/_slm/policy"));
Map<String, Object> policyResponseMap;
try (InputStream content2 = response.getEntity().getContent()) {
policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content2, true);
}
assertThat(policyResponseMap.size(), greaterThan(0));
Optional<Map<String, Object>> inProgress = Optional.ofNullable((Map<String, Object>) policyResponseMap.get(slowPolicy))
.map(policy -> (Map<String, Object>) policy.get("in_progress"));

// Ensure no snapshots are running
assertFalse("expected no in progress snapshots but got " + inProgress.orElse(null), inProgress.isPresent());
}
} catch (NullPointerException | ResponseException e) {
fail("unable to retrieve completed snapshot: " + e);
Expand All @@ -431,11 +449,12 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
// Check that the executed snapshot shows up in the SLM output as in_progress
assertBusy(() -> {
try {
Response response = client().performRequest(new Request("GET", "/_slm/policy" + (randomBoolean() ? "" : "?human")));
Response response = client().performRequest(new Request("GET", "/_slm/policy"));
Map<String, Object> policyResponseMap;
try (InputStream content = response.getEntity().getContent()) {
policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true);
}
logger.info("--> checking for 'slow-*' snapshot to show up in policy response, got: " + policyResponseMap);
assertThat(policyResponseMap.size(), greaterThan(0));
Optional<Map<String, Object>> inProgress = Optional.ofNullable((Map<String, Object>) policyResponseMap.get(slowPolicy))
.map(policy -> (Map<String, Object>) policy.get("in_progress"));
Expand All @@ -444,7 +463,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
Map<String, Object> inProgressMap = inProgress.get();
assertThat(inProgressMap.get("name"), equalTo(slowSnapshotName));
assertNotNull(inProgressMap.get("uuid"));
assertThat(inProgressMap.get("state"), equalTo("STARTED"));
assertThat(inProgressMap.get("state"), anyOf(equalTo("STARTED"), equalTo("INIT")));
assertThat((long) inProgressMap.get("start_time_millis"), greaterThan(0L));
assertNull(inProgressMap.get("failure"));
} else {
Expand Down Expand Up @@ -481,9 +500,10 @@ public void testRetentionWhileSnapshotInProgress() throws Exception {
assertBusy(() -> {
// We expect a failed response because the snapshot should not exist
try {
logger.info("--> checking to see if snapshot has been deleted...");
Response response = client().performRequest(new Request("GET", "/_snapshot/" + slowRepo + "/" + completedSnapshotName));
assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception"));
String resp = EntityUtils.toString(response.getEntity());
logger.info("--> checking to see if snapshot has been deleted, got: " + resp);
assertThat(resp, containsString("snapshot_missing_exception"));
} catch (ResponseException e) {
assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class SnapshotRetentionService implements LocalNodeMasterListener, Closea
private final SchedulerEngine scheduler;

private volatile String slmRetentionSchedule;
private volatile boolean isMaster = false;

public SnapshotRetentionService(Settings settings,
Supplier<SnapshotRetentionTask> taskSupplier,
Expand All @@ -63,17 +64,19 @@ SchedulerEngine getScheduler() {

@Override
public void onMaster() {
this.isMaster = true;
rescheduleRetentionJob();
}

@Override
public void offMaster() {
this.isMaster = false;
cancelRetentionJob();
}

private void rescheduleRetentionJob() {
final String schedule = this.slmRetentionSchedule;
if (Strings.hasText(schedule)) {
if (this.isMaster && Strings.hasText(schedule)) {
final SchedulerEngine.Job retentionJob = new SchedulerEngine.Job(SLM_RETENTION_JOB_ID,
new CronSchedule(schedule));
logger.debug("scheduling SLM retention job for [{}]", schedule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -280,19 +283,21 @@ private void maybeDeleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDel
}

ClusterState state = clusterService.state();
if (snapshotInProgress(state)) {
if (okayToDeleteSnapshots(state)) {
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
} else {
logger.debug("a snapshot is currently running, rescheduling SLM retention for after snapshot has completed");
ClusterStateObserver observer = new ClusterStateObserver(clusterService, maximumTime, logger, threadPool.getThreadContext());
CountDownLatch latch = new CountDownLatch(1);
observer.waitForNextChange(
new NoSnapshotRunningListener(observer,
newState -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> {
try {
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
} finally {
latch.countDown();
}
}),
try {
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
} finally {
latch.countDown();
}
}),
e -> {
latch.countDown();
throw new ElasticsearchException(e);
Expand All @@ -302,8 +307,6 @@ private void maybeDeleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDel
} catch (InterruptedException e) {
throw new ElasticsearchException(e);
}
} else {
deleteSnapshots(snapshotsToDelete, maximumTime, slmStats);
}
}

Expand Down Expand Up @@ -412,14 +415,32 @@ void updateStateWithStats(SnapshotLifecycleStats newStats) {
clusterService.submitStateUpdateTask("update_slm_stats", new UpdateSnapshotLifecycleStatsTask(newStats));
}

public static boolean snapshotInProgress(ClusterState state) {
SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
// No snapshots are running, new state is acceptable to proceed
public static boolean okayToDeleteSnapshots(ClusterState state) {
// Cannot delete during a snapshot
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().size() > 0) {
return false;
}

// Cannot delete during an existing delete
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
return false;
}

// There are snapshots
// Cannot delete while a repository is being cleaned
final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
return false;
}

// Cannot delete during a restore
final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null) {
return false;
}

// It's okay to delete snapshots
return true;
}

Expand All @@ -445,11 +466,11 @@ class NoSnapshotRunningListener implements ClusterStateObserver.Listener {
@Override
public void onNewClusterState(ClusterState state) {
try {
if (snapshotInProgress(state)) {
observer.waitForNextChange(this);
} else {
logger.debug("retrying SLM snapshot retention deletion after snapshot has completed");
if (okayToDeleteSnapshots(state)) {
logger.debug("retrying SLM snapshot retention deletion after snapshot operation has completed");
reRun.accept(state);
} else {
observer.waitForNextChange(this);
}
} catch (Exception e) {
exceptionConsumer.accept(e);
Expand All @@ -464,7 +485,7 @@ public void onClusterServiceClose() {
@Override
public void onTimeout(TimeValue timeout) {
exceptionConsumer.accept(
new IllegalStateException("slm retention snapshot deletion out while waiting for ongoing snapshots to complete"));
new IllegalStateException("slm retention snapshot deletion out while waiting for ongoing snapshot operations to complete"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testJobsAreScheduled() {
FakeRetentionTask::new, clusterService, clock)) {
assertThat(service.getScheduler().jobCount(), equalTo(0));

service.onMaster();
service.setUpdateSchedule(SnapshotLifecycleServiceTests.randomSchedule());
assertThat(service.getScheduler().scheduledJobIds(), containsInAnyOrder(SnapshotRetentionService.SLM_RETENTION_JOB_ID));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ClusterServiceUtils;
Expand Down Expand Up @@ -57,6 +65,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;

public class SnapshotRetentionTaskTests extends ESTestCase {

Expand Down Expand Up @@ -317,6 +326,43 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception
}
}

public void testOkToDeleteSnapshots() {
final Snapshot snapshot = new Snapshot("repo", new SnapshotId("name", "uuid"));

SnapshotsInProgress inProgress = new SnapshotsInProgress(
new SnapshotsInProgress.Entry(
snapshot, true, false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId("name", "id")), 0, 0,
ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().build(), Collections.emptyMap()));
ClusterState state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(SnapshotsInProgress.TYPE, inProgress)
.build();

assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));

SnapshotDeletionsInProgress delInProgress = new SnapshotDeletionsInProgress(
Collections.singletonList(new SnapshotDeletionsInProgress.Entry(snapshot, 0, 0)));
state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(SnapshotDeletionsInProgress.TYPE, delInProgress)
.build();

assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));

RepositoryCleanupInProgress cleanupInProgress = new RepositoryCleanupInProgress(new RepositoryCleanupInProgress.Entry("repo", 0));
state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(RepositoryCleanupInProgress.TYPE, cleanupInProgress)
.build();

assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));

RestoreInProgress restoreInProgress = mock(RestoreInProgress.class);
state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(RestoreInProgress.TYPE, restoreInProgress)
.build();

assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));
}

public void testSkipWhileStopping() throws Exception {
doTestSkipDuringMode(OperationMode.STOPPING);
}
Expand Down