Skip to content

Commit

Permalink
Reduce forking in SLMGetExpiredSnapshotsAction (elastic#102978)
Browse files Browse the repository at this point in the history
We only need to dispatch steps back onto the `MANAGEMENT` pool if they
forked off it, and mostly that's not going to happen. With this commit
we push the forking decisions down into the implementation in order to
skip them when unnecessary.

Relates elastic#101445
  • Loading branch information
DaveCTurner authored Dec 7, 2023
1 parent c16f8ef commit 715b1bf
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.GetSnapshotInfoContext;
Expand Down Expand Up @@ -126,32 +126,21 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
perRepositoryListener -> SubscribableListener

// Get repository data
.<RepositoryData>newForked(
l -> repository.getRepositoryData(
EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO use retentionExecutor, see #101445?
l
)
)
.<RepositoryData>newForked(l -> repository.getRepositoryData(retentionExecutor, l))

// Collect snapshot details by policy, and get any missing details by reading SnapshotInfo
.<SnapshotDetailsByPolicy>andThen(
retentionExecutor,
threadContext,
(l, repositoryData) -> getSnapshotDetailsByPolicy(repository, repositoryData, l)
(l, repositoryData) -> getSnapshotDetailsByPolicy(retentionExecutor, repository, repositoryData, l)
)

// Compute snapshots to delete for each (relevant) policy
.<Void>andThen(
retentionExecutor,
threadContext,
(l, snapshotDetailsByPolicy) -> ActionListener.completeWith(l, () -> {
resultsBuilder.addResult(
repositoryName,
getSnapshotsToDelete(repositoryName, request.policies(), snapshotDetailsByPolicy)
);
return null;
})
)
.<Void>andThen((l, snapshotDetailsByPolicy) -> ActionListener.completeWith(l, () -> {
resultsBuilder.addResult(
repositoryName,
getSnapshotsToDelete(repositoryName, request.policies(), snapshotDetailsByPolicy)
);
return null;
}))

// And notify this repository's listener on completion
.addListener(perRepositoryListener.delegateResponse((l, e) -> {
Expand Down Expand Up @@ -184,6 +173,7 @@ <T> Stream<T> flatMap(BiFunction<String, Map<SnapshotId, RepositoryData.Snapshot

// Exposed for testing
static void getSnapshotDetailsByPolicy(
Executor executor,
Repository repository,
RepositoryData repositoryData,
ActionListener<SnapshotDetailsByPolicy> listener
Expand Down Expand Up @@ -218,7 +208,7 @@ static void getSnapshotDetailsByPolicy(
snapshotInfo.snapshotId(),
RepositoryData.SnapshotDetails.fromSnapshotInfo(snapshotInfo)
),
listener.map(ignored -> snapshotDetailsByPolicy)
new ThreadedActionListener<>(executor, listener.map(ignored -> snapshotDetailsByPolicy))
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ record SeenSnapshotInfo(SnapshotId snapshotId, String policyId) {}
.<RepositoryData>newForked(l -> repository.getRepositoryData(EsExecutors.DIRECT_EXECUTOR_SERVICE, l))

.<SLMGetExpiredSnapshotsAction.SnapshotDetailsByPolicy>andThen(
(l, rd) -> SLMGetExpiredSnapshotsAction.getSnapshotDetailsByPolicy(repository, rd, l)
(l, rd) -> SLMGetExpiredSnapshotsAction.getSnapshotDetailsByPolicy(EsExecutors.DIRECT_EXECUTOR_SERVICE, repository, rd, l)
)

.andThen((l, snapshotDetailsByPolicy) -> {
Expand Down

0 comments on commit 715b1bf

Please sign in to comment.