diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 610ab31bfac50..d4ae726280024 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -240,6 +240,8 @@ protected abstract void dispatchedShardOperationOnPrimary( */ @Override protected void shardOperationOnReplica(ReplicaRequest request, IndexShard replica, ActionListener listener) { + request.incRef(); + listener = ActionListener.runAfter(listener, request::decRef); threadPool.executor(executorFunction.apply(executorSelector, replica)).execute(new ActionRunnable<>(listener) { @Override protected void doRun() { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index 2295ce38c9aa6..ce88f93c20fa6 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -2781,93 +2781,94 @@ public void testAuthorizationOfMultipleActionsSingleIndexBulkItems() { roleMap.put("index-role", indexRole); final ShardId shardId = new ShardId(indexName, UUID.randomUUID().toString(), 1); - final BulkShardRequest request = new BulkShardRequest(shardId, randomFrom(WriteRequest.RefreshPolicy.values()), items); + try (BulkShardRequest request = new BulkShardRequest(shardId, randomFrom(WriteRequest.RefreshPolicy.values()), items)) { - mockEmptyMetadata(); - final Authentication authentication; - final String requestId; - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - authentication = createAuthentication(new User("user", "all-role")); - requestId = AuditUtil.getOrGenerateRequestId(threadContext); - authorize(authentication, action, request); - } - // bulk shard request is authorized - verify(auditTrail).accessGranted( - eq(requestId), - eq(authentication), - eq(action), - eq(request), - authzInfoRoles(new String[] { allRole.getName() }) - ); - // there's one granted audit entry for each action type - actionTypes.forEach(actionType -> { - verify(auditTrail).explicitIndexAccessEvent( + mockEmptyMetadata(); + final Authentication authentication; + final String requestId; + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + authentication = createAuthentication(new User("user", "all-role")); + requestId = AuditUtil.getOrGenerateRequestId(threadContext); + authorize(authentication, action, request); + } + // bulk shard request is authorized + verify(auditTrail).accessGranted( eq(requestId), - eq(AuditLevel.ACCESS_GRANTED), eq(authentication), - eq(actionType), - eq(new String[] { indexName }), - eq(BulkItemRequest.class.getSimpleName()), - eq(request.remoteAddress()), + eq(action), + eq(request), authzInfoRoles(new String[] { allRole.getName() }) ); - }); - verifyNoMoreInteractions(auditTrail); - // all bulk items go through as authorized - for (BulkItemRequest bulkItemRequest : request.items()) { - assertThat(bulkItemRequest.getPrimaryResponse(), nullValue()); - } - - // use the "index" role - final Authentication indexAuthentication; - final String indexRequestId; - try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { - indexAuthentication = createAuthentication(new User("index-user", "index-role")); - indexRequestId = AuditUtil.getOrGenerateRequestId(threadContext); - authorize(indexAuthentication, action, request); - } - // bulk shard request is authorized - verify(auditTrail).accessGranted( - eq(indexRequestId), - eq(indexAuthentication), - eq(action), - eq(request), - authzInfoRoles(new String[] { indexRole.getName() }) - ); - // there's a single granted audit entry for each action type, less the delete action (which is denied) - actionTypes.forEach(actionType -> { - if (actionType.equals(TransportDeleteAction.NAME) == false) { + // there's one granted audit entry for each action type + actionTypes.forEach(actionType -> { verify(auditTrail).explicitIndexAccessEvent( - eq(indexRequestId), + eq(requestId), eq(AuditLevel.ACCESS_GRANTED), - eq(indexAuthentication), + eq(authentication), eq(actionType), eq(new String[] { indexName }), eq(BulkItemRequest.class.getSimpleName()), eq(request.remoteAddress()), - authzInfoRoles(new String[] { indexRole.getName() }) + authzInfoRoles(new String[] { allRole.getName() }) ); + }); + verifyNoMoreInteractions(auditTrail); + // all bulk items go through as authorized + for (BulkItemRequest bulkItemRequest : request.items()) { + assertThat(bulkItemRequest.getPrimaryResponse(), nullValue()); } - }); - if (deleteItems.isEmpty() == false) { - // there's one denied audit entry for all the delete action types - verify(auditTrail).explicitIndexAccessEvent( + + // use the "index" role + final Authentication indexAuthentication; + final String indexRequestId; + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + indexAuthentication = createAuthentication(new User("index-user", "index-role")); + indexRequestId = AuditUtil.getOrGenerateRequestId(threadContext); + authorize(indexAuthentication, action, request); + } + // bulk shard request is authorized + verify(auditTrail).accessGranted( eq(indexRequestId), - eq(AuditLevel.ACCESS_DENIED), eq(indexAuthentication), - eq(TransportDeleteAction.NAME), - eq(new String[] { indexName }), - eq(BulkItemRequest.class.getSimpleName()), - eq(request.remoteAddress()), + eq(action), + eq(request), authzInfoRoles(new String[] { indexRole.getName() }) ); - } - verifyNoMoreInteractions(auditTrail); - for (BulkItemRequest bulkItemRequest : request.items()) { - if (deleteItems.contains(bulkItemRequest.id())) { - assertThat(bulkItemRequest.getPrimaryResponse().isFailed(), is(true)); - } else { - assertThat(bulkItemRequest.getPrimaryResponse(), nullValue()); + // there's a single granted audit entry for each action type, less the delete action (which is denied) + actionTypes.forEach(actionType -> { + if (actionType.equals(TransportDeleteAction.NAME) == false) { + verify(auditTrail).explicitIndexAccessEvent( + eq(indexRequestId), + eq(AuditLevel.ACCESS_GRANTED), + eq(indexAuthentication), + eq(actionType), + eq(new String[] { indexName }), + eq(BulkItemRequest.class.getSimpleName()), + eq(request.remoteAddress()), + authzInfoRoles(new String[] { indexRole.getName() }) + ); + } + }); + if (deleteItems.isEmpty() == false) { + // there's one denied audit entry for all the delete action types + verify(auditTrail).explicitIndexAccessEvent( + eq(indexRequestId), + eq(AuditLevel.ACCESS_DENIED), + eq(indexAuthentication), + eq(TransportDeleteAction.NAME), + eq(new String[] { indexName }), + eq(BulkItemRequest.class.getSimpleName()), + eq(request.remoteAddress()), + authzInfoRoles(new String[] { indexRole.getName() }) + ); + } + verifyNoMoreInteractions(auditTrail); + for (BulkItemRequest bulkItemRequest : request.items()) { + if (deleteItems.contains(bulkItemRequest.id())) { + assertThat(bulkItemRequest.getPrimaryResponse().isFailed(), is(true)); + } else { + assertThat(bulkItemRequest.getPrimaryResponse(), nullValue()); + } } } }