From cfb516c6d4d1edd79b94fa77a488af8488946ec8 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 14 Aug 2019 16:49:47 -0600 Subject: [PATCH 1/4] Retry SLM retention after currently running snapshot completes This commit adds a ClusterStateObserver to wait until the currently running snapshot is complete before proceeding with snapshot deletion. SLM retention waits for the maximum allowed deletion time for the snapshot to complete, however, the waiting time is not factored into the limit on actual deletions. Relates to #43663 --- .../xpack/slm/SnapshotLifecycleIT.java | 216 +++++++++++++----- .../xpack/ilm/IndexLifecycle.java | 2 +- .../xpack/slm/SnapshotRetentionTask.java | 119 +++++++++- .../slm/SnapshotRetentionServiceTests.java | 2 +- .../xpack/slm/SnapshotRetentionTaskTests.java | 43 ++-- 5 files changed, 300 insertions(+), 82 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index fcabbc70c09ff..7751313de865b 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -37,8 +37,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.Optional; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; @@ -206,38 +206,33 @@ public void testPolicyManualExecution() throws Exception { assertThat(EntityUtils.toString(badResp.getResponse().getEntity()), containsString("no such snapshot lifecycle policy [" + policyName + "-bad]")); - Response goodResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute")); - - try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(goodResp.getEntity()))) { - final String snapshotName = parser.mapStrings().get("snapshot_name"); + final String snapshotName = executePolicy(policyName); - // Check that the executed snapshot is created - assertBusy(() -> { - try { - Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); - Map snapshotResponseMap; - try (InputStream is = response.getEntity().getContent()) { - snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); - } - assertThat(snapshotResponseMap.size(), greaterThan(0)); - final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); - assertNotNull(metadata); - assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId); - } catch (ResponseException e) { - fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + // Check that the executed snapshot is created + assertBusy(() -> { + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + Map snapshotResponseMap; + try (InputStream is = response.getEntity().getContent()) { + snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); } + assertThat(snapshotResponseMap.size(), greaterThan(0)); + final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); + assertNotNull(metadata); + assertThat(metadata.get("policy"), equalTo(policyName)); + assertHistoryIsPresent(policyName, true, repoId); + } catch (ResponseException e) { + fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + } - Map stats = getSLMStats(); - Map policyStats = (Map) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName()); - Map policyIdStats = (Map) policyStats.get(policyName); - int snapsTaken = (int) policyIdStats.get(SnapshotLifecycleStats.SnapshotPolicyStats.SNAPSHOTS_TAKEN.getPreferredName()); - int totalTaken = (int) stats.get(SnapshotLifecycleStats.TOTAL_TAKEN.getPreferredName()); - assertThat(snapsTaken, equalTo(1)); - assertThat(totalTaken, equalTo(1)); - }); - } + Map stats = getSLMStats(); + Map policyStats = (Map) stats.get(SnapshotLifecycleStats.POLICY_STATS.getPreferredName()); + Map policyIdStats = (Map) policyStats.get(policyName); + int snapsTaken = (int) policyIdStats.get(SnapshotLifecycleStats.SnapshotPolicyStats.SNAPSHOTS_TAKEN.getPreferredName()); + int totalTaken = (int) stats.get(SnapshotLifecycleStats.TOTAL_TAKEN.getPreferredName()); + assertThat(snapsTaken, equalTo(1)); + assertThat(totalTaken, equalTo(1)); + }); } @SuppressWarnings("unchecked") @@ -259,31 +254,24 @@ public void testBasicTimeBasedRetenion() throws Exception { new SnapshotRetentionConfiguration(TimeValue.timeValueMillis(1), null, null)); // Manually create a snapshot - Response executeResp = client().performRequest(new Request("PUT", "/_slm/policy/" + policyName + "/_execute")); - - final String snapshotName; - try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, - DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeResp.getEntity()))) { - snapshotName = parser.mapStrings().get("snapshot_name"); - - // Check that the executed snapshot is created - assertBusy(() -> { - try { - Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); - Map snapshotResponseMap; - try (InputStream is = response.getEntity().getContent()) { - snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); - } - assertThat(snapshotResponseMap.size(), greaterThan(0)); - final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); - assertNotNull(metadata); - assertThat(metadata.get("policy"), equalTo(policyName)); - assertHistoryIsPresent(policyName, true, repoId); - } catch (ResponseException e) { - fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + final String snapshotName = executePolicy(policyName); + // Check that the executed snapshot is created + assertBusy(() -> { + try { + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + snapshotName)); + Map snapshotResponseMap; + try (InputStream is = response.getEntity().getContent()) { + snapshotResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true); } - }); - } + assertThat(snapshotResponseMap.size(), greaterThan(0)); + final Map metadata = extractMetadata(snapshotResponseMap, snapshotName); + assertNotNull(metadata); + assertThat(metadata.get("policy"), equalTo(policyName)); + assertHistoryIsPresent(policyName, true, repoId); + } catch (ResponseException e) { + fail("expected snapshot to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + } + }); // Run retention every second ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest(); @@ -388,6 +376,126 @@ public void testSnapshotInProgress() throws Exception { } } + @SuppressWarnings("unchecked") + public void testRetentionWhileSnapshotInProgress() throws Exception { + final String indexName = "test"; + final String policyName = "test-policy"; + final String policyName2 = "test-policy2"; + final String repoId = "my-repo"; + final String repoId2 = "my-repo2"; + int docCount = 20; + for (int i = 0; i < docCount; i++) { + index(client(), indexName, "" + i, "foo", "bar"); + } + + // Create snapshot repos, one fast and one slow + initializeRepo(repoId, "1b"); + initializeRepo(repoId2, "1mb"); + + createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true, + new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); + createSnapshotPolicy(policyName2, "snap", "1 2 3 4 5 ?", repoId2, indexName, true, + new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); + + // Create a snapshot and wait for it to be complete (need something that can be deleted) + final String completedSnapshotName = executePolicy(policyName2); + assertBusy(() -> { + try { + Response getResp = client().performRequest(new Request("GET", + "/_snapshot/" + repoId2 + "/" + completedSnapshotName + "/_status")); + try (InputStream content = getResp.getEntity().getContent()) { + Map snaps = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); + List> snaps2 = (List>) snaps.get("snapshots"); + assertThat(snaps2.get(0).get("state"), equalTo("SUCCESS")); + } + } catch (NullPointerException | ResponseException e) { + fail("unable to retrieve completed snapshot: " + e); + } + }); + + // Take another snapshot + final String slowSnapshotName = executePolicy(policyName); + + // Check that the executed snapshot shows up in the SLM output as in_progress + assertBusy(() -> { + try { + Response response = client().performRequest(new Request("GET", "/_slm/policy" + (randomBoolean() ? "" : "?human"))); + Map policyResponseMap; + try (InputStream content = response.getEntity().getContent()) { + policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); + } + assertThat(policyResponseMap.size(), greaterThan(0)); + Optional> inProgress = Optional.ofNullable((Map) policyResponseMap.get(policyName)) + .map(policy -> (Map) policy.get("in_progress")); + + if (inProgress.isPresent()) { + Map inProgressMap = inProgress.get(); + assertThat(inProgressMap.get("name"), equalTo(slowSnapshotName)); + assertNotNull(inProgressMap.get("uuid")); + assertThat(inProgressMap.get("state"), equalTo("STARTED")); + assertThat((long) inProgressMap.get("start_time_millis"), greaterThan(0L)); + assertNull(inProgressMap.get("failure")); + } else { + fail("expected in_progress to contain a running snapshot, but the response was " + policyResponseMap); + } + } catch (ResponseException e) { + fail("expected policy to exist but it does not: " + EntityUtils.toString(e.getResponse().getEntity())); + } + }); + + // Run retention every second + ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest(); + req.transientSettings(Settings.builder().put(LifecycleSettings.SLM_RETENTION_SCHEDULE, "*/1 * * * * ?")); + try (XContentBuilder builder = jsonBuilder()) { + req.toXContent(builder, ToXContent.EMPTY_PARAMS); + Request r = new Request("PUT", "/_cluster/settings"); + r.setJsonEntity(Strings.toString(builder)); + client().performRequest(r); + } + + // Cancel the snapshot since it is not going to complete quickly, do it in a thread because + // cancelling the snapshot can take a long time and we might as well check retention while + // its deleting + Thread t = new Thread(() -> { + try { + assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + slowSnapshotName))); + } catch (IOException e) { + fail("should not have thrown " + e); + } + }); + t.start(); + + // Check that the snapshot created by the policy has been removed by retention + assertBusy(() -> { + // We expect a failed response because the snapshot should not exist + try { + logger.info("--> checking to see if snapshot has been deleted..."); + Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + completedSnapshotName)); + assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception")); + } catch (ResponseException e) { + assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception")); + } + }, 60, TimeUnit.SECONDS); + + t.join(5000); + } + + /** + * Execute the given policy and return the generated snapshot name + */ + private String executePolicy(String policyId) { + try { + Response executeRepsonse = client().performRequest(new Request("PUT", "/_slm/policy/" + policyId + "/_execute")); + try (XContentParser parser = JsonXContent.jsonXContent.createParser(NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, EntityUtils.toByteArray(executeRepsonse.getEntity()))) { + return parser.mapStrings().get("snapshot_name"); + } + } catch (Exception e) { + fail("failed to execute policy " + policyId + " - got: " + e); + throw new RuntimeException(e); + } + } + @SuppressWarnings("unchecked") private static Map extractMetadata(Map snapshotResponseMap, String snapshotPrefix) { List> snapResponse = ((List>) snapshotResponseMap.get("responses")).stream() diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java index 99dcdfc2a7b17..43725682419a1 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java @@ -161,7 +161,7 @@ public Collection createComponents(Client client, ClusterService cluster snapshotLifecycleService.set(new SnapshotLifecycleService(settings, () -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock())); snapshotRetentionService.set(new SnapshotRetentionService(settings, - () -> new SnapshotRetentionTask(client, clusterService, System::nanoTime), + () -> new SnapshotRetentionTask(client, clusterService, System::nanoTime, threadPool), clusterService, getClock())); return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get(), snapshotRetentionService.get()); diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index 0d3f70851e402..0a8afd9652b97 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; @@ -16,12 +17,15 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; @@ -56,11 +60,13 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { private final Client client; private final ClusterService clusterService; private final LongSupplier nowNanoSupplier; + private final ThreadPool threadPool; - public SnapshotRetentionTask(Client client, ClusterService clusterService, LongSupplier nowNanoSupplier) { + public SnapshotRetentionTask(Client client, ClusterService clusterService, LongSupplier nowNanoSupplier, ThreadPool threadPool) { this.client = new OriginSettingClient(client, ClientHelper.INDEX_LIFECYCLE_ORIGIN); this.clusterService = clusterService; this.nowNanoSupplier = nowNanoSupplier; + this.threadPool = threadPool; } @Override @@ -116,7 +122,7 @@ public void onResponse(Map> allSnapshots) { .collect(Collectors.toList()))); // Finally, delete the snapshots that need to be deleted - deleteSnapshots(snapshotsToBeDeleted, maxDeletionTime, slmStats); + maybeDeleteSnapshots(snapshotsToBeDeleted, maxDeletionTime, slmStats); updateStateWithStats(slmStats); } finally { @@ -236,15 +242,60 @@ static String getPolicyId(SnapshotInfo snapshotInfo) { " to have a policy in its metadata, but it did not")); } - void deleteSnapshots(Map> snapshotsToDelete, - TimeValue maximumTime, - SnapshotLifecycleStats slmStats) { + /** + * Maybe delete the given snapshots. If a snapshot is currently running according to the cluster + * state, this waits (using a {@link ClusterStateObserver} until a cluster state with no running + * snapshots before executing the blocking + * {@link #deleteSnapshots(Map, TimeValue, SnapshotLifecycleStats)} request. At most, we wait + * for the maximum allowed deletion time before timing out waiting for a state with no + * running snapshots. + * + * It's possible the task may still run into a SnapshotInProgressException, if a snapshot is + * started between the state retrieved here and the actual deletion. Since is is expected to be + * a rare case, no special handling is present. + */ + private void maybeDeleteSnapshots(Map> snapshotsToDelete, + TimeValue maximumTime, + SnapshotLifecycleStats slmStats) { int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum(); if (count == 0) { logger.debug("no snapshots are eligible for deletion"); return; } + ClusterState state = clusterService.state(); + if (snapshotInProgress(state)) { + logger.debug("a snapshot is currently running, rescheduling SLM retention for after snapshot has completed"); + ClusterStateObserver observer = new ClusterStateObserver(clusterService, maximumTime, logger, threadPool.getThreadContext()); + CountDownLatch latch = new CountDownLatch(1); + observer.waitForNextChange( + new NoSnapshotRunningListener(observer, + newState -> threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(() -> { + try { + deleteSnapshots(snapshotsToDelete, maximumTime, slmStats); + } finally { + latch.countDown(); + } + }), + e -> { + latch.countDown(); + throw new ElasticsearchException(e); + })); + try { + latch.await(); + } catch (InterruptedException e) { + throw new ElasticsearchException(e); + } + } else { + deleteSnapshots(snapshotsToDelete, maximumTime, slmStats); + } + } + + void deleteSnapshots(Map> snapshotsToDelete, + TimeValue maximumTime, + SnapshotLifecycleStats slmStats) { + int count = snapshotsToDelete.values().stream().mapToInt(List::size).sum(); + logger.info("starting snapshot retention deletion for [{}] snapshots", count); long startTime = nowNanoSupplier.getAsLong(); int deleted = 0; @@ -257,7 +308,7 @@ void deleteSnapshots(Map> snapshotsToDelete, // Check whether we have exceeded the maximum time allowed to spend deleting // snapshots, if we have, short-circuit the rest of the deletions TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime); - logger.trace("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); + logger.debug("elapsed time for deletion of [{}] snapshot: {}", info.snapshotId(), elapsedDeletionTime); if (elapsedDeletionTime.compareTo(maximumTime) > 0) { logger.info("maximum snapshot retention deletion time reached, time spent: [{}]," + " maximum allowed time: [{}], deleted {} out of {} snapshots scheduled for deletion", @@ -312,4 +363,60 @@ public void onFailure(Exception e) { void updateStateWithStats(SnapshotLifecycleStats newStats) { clusterService.submitStateUpdateTask("update_slm_stats", new UpdateSnapshotLifecycleStatsTask(newStats)); } + + public static boolean snapshotInProgress(ClusterState state) { + SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) { + // No snapshots are running, new state is acceptable to proceed + return false; + } + + // There are snapshots + return true; + } + + /** + * A {@link ClusterStateObserver.Listener} that invokes the given function with the new state, + * once no snapshots are running. If a snapshot is still running it registers a new listener + * and tries again. Passes any exceptions to the original exception listener if they occur. + */ + class NoSnapshotRunningListener implements ClusterStateObserver.Listener { + + private final Consumer reRun; + private final Consumer exceptionConsumer; + private final ClusterStateObserver observer; + + NoSnapshotRunningListener(ClusterStateObserver observer, + Consumer reRun, + Consumer exceptionConsumer) { + this.observer = observer; + this.reRun = reRun; + this.exceptionConsumer = exceptionConsumer; + } + + @Override + public void onNewClusterState(ClusterState state) { + try { + if (snapshotInProgress(state)) { + observer.waitForNextChange(this); + } else { + logger.debug("retrying SLM snapshot retention deletion after snapshot has completed"); + reRun.accept(state); + } + } catch (Exception e) { + exceptionConsumer.accept(e); + } + } + + @Override + public void onClusterServiceClose() { + // This means the cluster is being shut down, so nothing to do here + } + + @Override + public void onTimeout(TimeValue timeout) { + exceptionConsumer.accept( + new IllegalStateException("slm retention snapshot deletion out while waiting for ongoing snapshots to complete")); + } + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java index 257dcb518662f..2e2946da47162 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionServiceTests.java @@ -67,7 +67,7 @@ public void testJobsAreScheduled() { private static class FakeRetentionTask extends SnapshotRetentionTask { FakeRetentionTask() { - super(mock(Client.class), null, System::nanoTime); + super(mock(Client.class), null, System::nanoTime, mock(ThreadPool.class)); } @Override diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index b137473551bac..d395c68a05372 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -147,7 +147,7 @@ public void testRetentionTask() throws Exception { AtomicReference> deleted = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, + MockSnapshotRetentionTask retentionTask = new MockSnapshotRetentionTask(noOpClient, clusterService, threadPool, () -> { List snaps = new ArrayList<>(2); snaps.add(eligibleSnapshot); @@ -206,23 +206,24 @@ public void testTimeBoundedDeletion() throws Exception { // We're expected two deletions before they hit the "taken too long" test, so have a latch of 2 CountDownLatch latch = new CountDownLatch(2); AtomicLong nanos = new AtomicLong(System.nanoTime()); - OverrideDeleteSnapshotRetentionTask retentionTask = new OverrideDeleteSnapshotRetentionTask(noOpClient, clusterService, - () -> { - List snaps = Arrays.asList(snap1, snap2, snap3, snap4, snap5); - logger.info("--> retrieving snapshots [{}]", snaps); - return Collections.singletonMap("repo", snaps); - }, - (repo, snapshotId) -> { - logger.info("--> deleting {}", snapshotId); - // Don't pause until snapshot 2 - if (snapshotId.equals(snap2.snapshotId())) { - logger.info("--> pausing for 501ms while deleting snap2 to simulate deletion past a threshold"); - nanos.addAndGet(TimeValue.timeValueMillis(501).nanos()); - } - deleted.add(snapshotId); - latch.countDown(); - }, - nanos::get); + OverrideDeleteSnapshotRetentionTask retentionTask = + new OverrideDeleteSnapshotRetentionTask(noOpClient, clusterService, threadPool, + () -> { + List snaps = Arrays.asList(snap1, snap2, snap3, snap4, snap5); + logger.info("--> retrieving snapshots [{}]", snaps); + return Collections.singletonMap("repo", snaps); + }, + (repo, snapshotId) -> { + logger.info("--> deleting {}", snapshotId); + // Don't pause until snapshot 2 + if (snapshotId.equals(snap2.snapshotId())) { + logger.info("--> pausing for 501ms while deleting snap2 to simulate deletion past a threshold"); + nanos.addAndGet(TimeValue.timeValueMillis(501).nanos()); + } + deleted.add(snapshotId); + latch.countDown(); + }, + nanos::get); long time = System.currentTimeMillis(); retentionTask.triggered(new SchedulerEngine.Event(SnapshotRetentionService.SLM_RETENTION_JOB_ID, time, time)); @@ -267,9 +268,10 @@ private static class MockSnapshotRetentionTask extends SnapshotRetentionTask { MockSnapshotRetentionTask(Client client, ClusterService clusterService, + ThreadPool threadPool, Supplier>> snapshotRetriever, Consumer>> snapshotDeleter) { - super(client, clusterService, System::nanoTime); + super(client, clusterService, System::nanoTime, threadPool); this.snapshotRetriever = snapshotRetriever; this.snapshotDeleter = snapshotDeleter; } @@ -293,10 +295,11 @@ private static class OverrideDeleteSnapshotRetentionTask extends SnapshotRetenti OverrideDeleteSnapshotRetentionTask(Client client, ClusterService clusterService, + ThreadPool threadPool, Supplier>> snapshotRetriever, BiConsumer deleteRunner, LongSupplier nanoSupplier) { - super(client, clusterService, nanoSupplier); + super(client, clusterService, nanoSupplier, threadPool); this.snapshotRetriever = snapshotRetriever; this.deleteRunner = deleteRunner; } From 6a231dbcf4d0728e83ec193c81bcdd1913948e16 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 21 Aug 2019 10:42:28 -0600 Subject: [PATCH 2/4] Increase timeout waiting for snapshot completion --- .../org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 7751313de865b..0985379eb19cc 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -390,7 +390,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { // Create snapshot repos, one fast and one slow initializeRepo(repoId, "1b"); - initializeRepo(repoId2, "1mb"); + initializeRepo(repoId2, "10mb"); createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true, new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); @@ -405,13 +405,14 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { "/_snapshot/" + repoId2 + "/" + completedSnapshotName + "/_status")); try (InputStream content = getResp.getEntity().getContent()) { Map snaps = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); + logger.info("--> waiting for snapshot {} to be successful, got: {}", completedSnapshotName, snaps); List> snaps2 = (List>) snaps.get("snapshots"); assertThat(snaps2.get(0).get("state"), equalTo("SUCCESS")); } } catch (NullPointerException | ResponseException e) { fail("unable to retrieve completed snapshot: " + e); } - }); + }, 60, TimeUnit.SECONDS); // Take another snapshot final String slowSnapshotName = executePolicy(policyName); From be42c68482fd7080c3b37594d503aba34b011b04 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 21 Aug 2019 15:28:00 -0600 Subject: [PATCH 3/4] Apply patch From https://github.com/original-brownbear/elasticsearch/commit/2374316f0d1912c9e1498bece195546a1dc60bce.patch --- .../status/TransportSnapshotsStatusAction.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index ae143add71ace..4748a6caec095 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.node.NodeClient; @@ -121,10 +122,13 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request, new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()])) .snapshots(snapshots).timeout(request.masterNodeTimeout()); client.executeLocally(TransportNodesSnapshotsStatus.TYPE, nodesRequest, - ActionListener.map( - listener, nodeSnapshotStatuses -> - buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())), - nodeSnapshotStatuses))); + ActionListener.wrap( + nodeSnapshotStatuses -> + threadPool.executor(ThreadPool.Names.GENERIC).execute( + ActionRunnable.wrap(listener, l -> l.onResponse( + buildResponse( + request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())), + nodeSnapshotStatuses)))), listener::onFailure)); } else { // We don't have any in-progress shards, just return current stats listener.onResponse(buildResponse(request, currentSnapshots, null)); From 1f9d5c02880654dc0da22894ea3a0060f813d653 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 21 Aug 2019 16:29:03 -0600 Subject: [PATCH 4/4] Rename test variables --- .../xpack/slm/SnapshotLifecycleIT.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java index 96749f8e9afc6..1f11a824a0f13 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleIT.java @@ -383,30 +383,30 @@ public void testSnapshotInProgress() throws Exception { @SuppressWarnings("unchecked") public void testRetentionWhileSnapshotInProgress() throws Exception { final String indexName = "test"; - final String policyName = "test-policy"; - final String policyName2 = "test-policy2"; - final String repoId = "my-repo"; - final String repoId2 = "my-repo2"; + final String slowPolicy = "slow"; + final String fastPolicy = "fast"; + final String slowRepo = "slow-repo"; + final String fastRepo = "fast-repo"; int docCount = 20; for (int i = 0; i < docCount; i++) { index(client(), indexName, "" + i, "foo", "bar"); } // Create snapshot repos, one fast and one slow - initializeRepo(repoId, "1b"); - initializeRepo(repoId2, "10mb"); + initializeRepo(slowRepo, "1b"); + initializeRepo(fastRepo, "10mb"); - createSnapshotPolicy(policyName, "snap", "1 2 3 4 5 ?", repoId, indexName, true, + createSnapshotPolicy(slowPolicy, "snap", "1 2 3 4 5 ?", slowRepo, indexName, true, new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); - createSnapshotPolicy(policyName2, "snap", "1 2 3 4 5 ?", repoId2, indexName, true, + createSnapshotPolicy(fastPolicy, "snap", "1 2 3 4 5 ?", fastRepo, indexName, true, new SnapshotRetentionConfiguration(TimeValue.timeValueSeconds(0), null, null)); // Create a snapshot and wait for it to be complete (need something that can be deleted) - final String completedSnapshotName = executePolicy(policyName2); + final String completedSnapshotName = executePolicy(fastPolicy); assertBusy(() -> { try { Response getResp = client().performRequest(new Request("GET", - "/_snapshot/" + repoId2 + "/" + completedSnapshotName + "/_status")); + "/_snapshot/" + fastRepo + "/" + completedSnapshotName + "/_status")); try (InputStream content = getResp.getEntity().getContent()) { Map snaps = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); logger.info("--> waiting for snapshot {} to be successful, got: {}", completedSnapshotName, snaps); @@ -419,7 +419,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { }, 60, TimeUnit.SECONDS); // Take another snapshot - final String slowSnapshotName = executePolicy(policyName); + final String slowSnapshotName = executePolicy(slowPolicy); // Check that the executed snapshot shows up in the SLM output as in_progress assertBusy(() -> { @@ -430,7 +430,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { policyResponseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), content, true); } assertThat(policyResponseMap.size(), greaterThan(0)); - Optional> inProgress = Optional.ofNullable((Map) policyResponseMap.get(policyName)) + Optional> inProgress = Optional.ofNullable((Map) policyResponseMap.get(slowPolicy)) .map(policy -> (Map) policy.get("in_progress")); if (inProgress.isPresent()) { @@ -463,7 +463,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { // its deleting Thread t = new Thread(() -> { try { - assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + repoId + "/" + slowSnapshotName))); + assertOK(client().performRequest(new Request("DELETE", "/_snapshot/" + slowRepo + "/" + slowSnapshotName))); } catch (IOException e) { fail("should not have thrown " + e); } @@ -475,7 +475,7 @@ public void testRetentionWhileSnapshotInProgress() throws Exception { // We expect a failed response because the snapshot should not exist try { logger.info("--> checking to see if snapshot has been deleted..."); - Response response = client().performRequest(new Request("GET", "/_snapshot/" + repoId + "/" + completedSnapshotName)); + Response response = client().performRequest(new Request("GET", "/_snapshot/" + slowRepo + "/" + completedSnapshotName)); assertThat(EntityUtils.toString(response.getEntity()), containsString("snapshot_missing_exception")); } catch (ResponseException e) { assertThat(EntityUtils.toString(e.getResponse().getEntity()), containsString("snapshot_missing_exception"));