diff --git a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsAction.java b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsAction.java index bd2d040d76299..550410d1d59aa 100644 --- a/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsAction.java +++ b/x-pack/plugin/slm/src/main/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsAction.java @@ -18,13 +18,13 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Tuple; import org.elasticsearch.repositories.GetSnapshotInfoContext; @@ -126,32 +126,21 @@ protected void doExecute(Task task, Request request, ActionListener li perRepositoryListener -> SubscribableListener // Get repository data - .newForked( - l -> repository.getRepositoryData( - EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO use retentionExecutor, see #101445? - l - ) - ) + .newForked(l -> repository.getRepositoryData(retentionExecutor, l)) // Collect snapshot details by policy, and get any missing details by reading SnapshotInfo .andThen( - retentionExecutor, - threadContext, - (l, repositoryData) -> getSnapshotDetailsByPolicy(repository, repositoryData, l) + (l, repositoryData) -> getSnapshotDetailsByPolicy(retentionExecutor, repository, repositoryData, l) ) // Compute snapshots to delete for each (relevant) policy - .andThen( - retentionExecutor, - threadContext, - (l, snapshotDetailsByPolicy) -> ActionListener.completeWith(l, () -> { - resultsBuilder.addResult( - repositoryName, - getSnapshotsToDelete(repositoryName, request.policies(), snapshotDetailsByPolicy) - ); - return null; - }) - ) + .andThen((l, snapshotDetailsByPolicy) -> ActionListener.completeWith(l, () -> { + resultsBuilder.addResult( + repositoryName, + getSnapshotsToDelete(repositoryName, request.policies(), snapshotDetailsByPolicy) + ); + return null; + })) // And notify this repository's listener on completion .addListener(perRepositoryListener.delegateResponse((l, e) -> { @@ -184,6 +173,7 @@ Stream flatMap(BiFunction listener @@ -218,7 +208,7 @@ static void getSnapshotDetailsByPolicy( snapshotInfo.snapshotId(), RepositoryData.SnapshotDetails.fromSnapshotInfo(snapshotInfo) ), - listener.map(ignored -> snapshotDetailsByPolicy) + new ThreadedActionListener<>(executor, listener.map(ignored -> snapshotDetailsByPolicy)) ) ); } diff --git a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsActionTests.java b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsActionTests.java index c876bb83f919d..eda0e4f8ae39c 100644 --- a/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsActionTests.java +++ b/x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SLMGetExpiredSnapshotsActionTests.java @@ -180,7 +180,7 @@ record SeenSnapshotInfo(SnapshotId snapshotId, String policyId) {} .newForked(l -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, l)) .andThen( - (l, rd) -> SLMGetExpiredSnapshotsAction.getSnapshotDetailsByPolicy(repository, rd, l) + (l, rd) -> SLMGetExpiredSnapshotsAction.getSnapshotDetailsByPolicy(EsExecutors.DIRECT_EXECUTOR_SERVICE, repository, rd, l) ) .andThen((l, snapshotDetailsByPolicy) -> {