diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java index 79c5c84acdaec..0481867b594f2 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/TokenService.java @@ -347,13 +347,12 @@ private void getUserTokenFromId(String userTokenId, Version tokenVersion, Action logger.warn("failed to get access token [{}] because index [{}] is not available", userTokenId, tokensIndex.aliasName()); listener.onResponse(null); } else { + final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, + getTokenDocumentId(userTokenId)).request(); + final Consumer onFailure = ex -> listener.onFailure(traceLog("decode token", userTokenId, ex)); tokensIndex.checkIndexVersionThenExecute( ex -> listener.onFailure(traceLog("prepare tokens index [" + tokensIndex.aliasName() +"]", userTokenId, ex)), - () -> { - final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, - getTokenDocumentId(userTokenId)).request(); - Consumer onFailure = ex -> listener.onFailure(traceLog("decode token", userTokenId, ex)); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, + () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, ActionListener.wrap(response -> { if (response.isExists()) { Map accessTokenSource = @@ -384,8 +383,8 @@ private void getUserTokenFromId(String userTokenId, Version tokenVersion, Action logger.error(new ParameterizedMessage("failed to get access token [{}]", userTokenId), e); listener.onFailure(e); } - }), client::get); - }); + }), client::get) + ); } } @@ -862,7 +861,9 @@ private void innerRefresh(String tokenDocId, Map source, long se .setRefreshPolicy(RefreshPolicy.IMMEDIATE) .setIfSeqNo(seqNo) .setIfPrimaryTerm(primaryTerm); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(), + refreshedTokenIndex.prepareIndexIfNeededThenExecute( + ex -> listener.onFailure(traceLog("prepare index [" + refreshedTokenIndex.aliasName() + "]", ex)), + () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, updateRequest.request(), ActionListener.wrap(updateResponse -> { if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) { logger.debug(() -> new ParameterizedMessage("updated the original token document to {}", @@ -931,7 +932,7 @@ public void onFailure(Exception e) { } else { onFailure.accept(e); } - }), client::update); + }), client::update)); } } @@ -1005,7 +1006,9 @@ private void getSupersedingTokenDocAsync(RefreshTokenStatus refreshTokenStatus, private void getTokenDocAsync(String tokenDocId, SecurityIndexManager tokensIndex, ActionListener listener) { final GetRequest getRequest = client.prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, tokenDocId).request(); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, listener, client::get); + tokensIndex.checkIndexVersionThenExecute( + ex -> listener.onFailure(traceLog("prepare tokens index [" + tokensIndex.aliasName() + "]", tokenDocId, ex)), + () -> executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, listener, client::get)); } private Version getTokenVersionCompatibility() { @@ -1392,10 +1395,10 @@ private void checkIfTokenIsValid(UserToken userToken, ActionListener logger.warn("failed to validate access token because the index [" + tokensIndex.aliasName() + "] doesn't exist"); listener.onResponse(null); } else { + final GetRequest getRequest = client + .prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, getTokenDocumentId(userToken)).request(); + Consumer onFailure = ex -> listener.onFailure(traceLog("check token state", userToken.getId(), ex)); tokensIndex.checkIndexVersionThenExecute(listener::onFailure, () -> { - final GetRequest getRequest = client - .prepareGet(tokensIndex.aliasName(), SINGLE_MAPPING_NAME, getTokenDocumentId(userToken)).request(); - Consumer onFailure = ex -> listener.onFailure(traceLog("check token state", userToken.getId(), ex)); executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, getRequest, ActionListener.wrap(response -> { if (response.isExists()) {