From a4ba73270cbdcb31e92fc54fc8c9858abf4be552 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 2 Mar 2021 10:20:07 -0800 Subject: [PATCH] KAFKA-12394; Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission (#10223) We now accept topicIds in the `DeleteTopic` request. If the client principal does not have `Describe` permission, then we return `TOPIC_AUTHORIZATION_FAILED`. This is justified because the topicId is not considered sensitive. However, in this case, we should not return the name of the topic in the response since we do consider it sensitive. Reviewers: David Jacot , dengziming , Justine Olshan , Chia-Ping Tsai --- .../kafka/controller/ControllerContext.scala | 4 + .../main/scala/kafka/server/KafkaApis.scala | 37 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 132 ++++++--------- .../unit/kafka/server/KafkaApisTest.scala | 159 +++++++++++++++++- 4 files changed, 233 insertions(+), 99 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 042830118223f..379196aa1d42c 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -463,6 +463,10 @@ class ControllerContext { }.keySet } + def topicName(topicId: Uuid): Option[String] = { + topicNames.get(topicId) + } + def clearPartitionLeadershipInfo(): Unit = partitionLeadershipInfo.clear() def partitionWithLeadersCount: Int = partitionLeadershipInfo.size diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d4fd527d3d4e1..bc228403b1088 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1874,7 +1874,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID) throw new InvalidRequestException("Topic name and topic ID can not both be specified.") val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name() - else zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) + else zkSupport.controller.controllerContext.topicName(topic.topicId).orNull results.add(new DeletableTopicResult() .setName(name) .setTopicId(topic.topicId())) @@ -1884,20 +1884,27 @@ class KafkaApis(val requestChannel: RequestChannel, val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, results.asScala.filter(result => result.name() != null))(_.name) results.forEach { topic => - val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null - if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) { - topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code) - topic.setErrorMessage("Topic IDs are not supported on the server.") - } else if (unresolvedTopicId) - topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) - else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) - topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) - else if (!authorizedDeleteTopics.contains(topic.name)) - topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) - else if (!metadataCache.contains(topic.name)) - topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - else - toDelete += topic.name + val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null + if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) { + topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code) + topic.setErrorMessage("Topic IDs are not supported on the server.") + } else if (unresolvedTopicId) { + topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) + } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics.contains(topic.name)) { + + // Because the client does not have Describe permission, the name should + // not be returned in the response. Note, however, that we do not consider + // the topicId itself to be sensitive, so there is no reason to obscure + // this case with `UNKNOWN_TOPIC_ID`. + topic.setName(null) + topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + } else if (!authorizedDeleteTopics.contains(topic.name)) { + topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + } else if (!metadataCache.contains(topic.name)) { + topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + } else { + toDelete += topic.name + } } // If no authorized topics return immediately if (toDelete.isEmpty) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index b100962e35c9c..ebe4634adf8a2 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -60,6 +60,8 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartiti import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.annotation.nowarn import scala.collection.mutable @@ -245,9 +247,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { }) ) - val requestKeysToErrorWithIds = (id: Uuid) => Map[ApiKeys, Nothing => Errors]( - ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => Errors.forCode(resp.data.responses.asScala.find(_.topicId == id).get.errorCode())) - ) + def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = { + response match { + case res: DeleteTopicsResponse => + Errors.forCode(res.data.responses.asScala.find(_.topicId == id).get.errorCode) + case _ => + fail(s"Unexpected response type $response") + } + } val requestKeysToAcls = Map[ApiKeys, Map[ResourcePattern, Set[AccessControlEntry]]]( ApiKeys.METADATA -> topicDescribeAcl, @@ -534,12 +541,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { .setTimeoutMs(5000)).build() } - private def deleteTopicsWithIdsRequest(id: Uuid = getTopicIds()(topic)): DeleteTopicsRequest = { + private def deleteTopicsWithIdsRequest(topicId: Uuid): DeleteTopicsRequest = { new DeleteTopicsRequest.Builder( new DeleteTopicsRequestData() .setTopics(Collections.singletonList( new DeleteTopicsRequestData.DeleteTopicState() - .setTopicId(id))) + .setTopicId(topicId))) .setTimeoutMs(5000)).build() } @@ -737,29 +744,53 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRequests(requestKeyToRequest) } - @Test - def testAuthorizationDeleteTopicsIdWithTopicExisting(): Unit = { - sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = { + val topicId = if (withTopicExisting) { + createTopic(topic) + getTopicIds()(topic) + } else { + Uuid.randomUuid() + } + + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(topicId) + ) - val id = getTopicIds()(topic) + def sendAndVerify( + request: AbstractRequest, + isAuthorized: Boolean, + isDescribeAuthorized: Boolean + ): Unit = { + val response = connectAndReceive[AbstractResponse](request) + val error = findErrorForTopicId(topicId, response) + if (!withTopicExisting) { + assertEquals(Errors.UNKNOWN_TOPIC_ID, error) + } else if (!isDescribeAuthorized || !isAuthorized) { + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, error) + } + } - for ((key, request) <- mutable.Map(ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest())) { + for ((key, request) <- requestKeyToRequest) { removeAllClientAcls() - val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = true, describeAuthorized = false, id = id) + sendAndVerify(request, isAuthorized = false, isDescribeAuthorized = false) + + val describeAcls = topicDescribeAcl(topicResource) + addAndVerifyAcls(describeAcls, topicResource) val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => - val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls - addAndVerifyAcls(describeAcls, topicResource) - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = true, describeAuthorized = true, id = id) - removeAllClientAcls() + sendAndVerify(request, isAuthorized = isAuthorized, isDescribeAuthorized = true) } - for ((resource, acls) <- resourceToAcls) + removeAllClientAcls() + for ((resource, acls) <- resourceToAcls) { addAndVerifyAcls(acls, resource) - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = true, describeAuthorized = true, id = id) + } + + sendAndVerify(request, isAuthorized = true, isDescribeAuthorized = true) } } @@ -789,33 +820,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRequests(requestKeyToRequest, false) } - @Test - def testAuthorizationDeleteTopicsIdWithTopicNotExisting(): Unit = { - val id = Uuid.randomUuid() - val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(id), - ) - - for ((key, request) <- requestKeyToRequest) { - removeAllClientAcls() - val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = false, describeAuthorized = false, id = id) - - val resourceToAcls = requestKeysToAcls(key) - resourceToAcls.get(topicResource).foreach { acls => - val describeAcls = topicDescribeAcl(topicResource) - val isAuthorized = describeAcls == acls - addAndVerifyAcls(describeAcls, topicResource) - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = false, describeAuthorized = true, id = id) - removeAllClientAcls() - } - - for ((resource, acls) <- resourceToAcls) - addAndVerifyAcls(acls, resource) - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = false, describeAuthorized = true, id = id) - } - } - @Test def testCreateTopicAuthorizationWithClusterCreate(): Unit = { removeAllClientAcls() @@ -2058,44 +2062,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } - private def sendRequestWithIdAndVerifyResponseError(request: AbstractRequest, - resources: Set[ResourceType], - isAuthorized: Boolean, - topicExists: Boolean, - describeAuthorized: Boolean, - id: Uuid): AbstractResponse = { - val apiKey = request.apiKey - val response = connectAndReceive[AbstractResponse](request) - val error = requestKeysToErrorWithIds(id)(apiKey).asInstanceOf[AbstractResponse => Errors](response) - - val authorizationErrors = resources.flatMap { resourceType => - if (resourceType == TOPIC) { - if (isAuthorized) - Set(Errors.UNKNOWN_TOPIC_ID, AclEntry.authorizationError(ResourceType.TOPIC)) - else if (describeAuthorized) - Set(AclEntry.authorizationError(ResourceType.TOPIC)) - else - Set(Errors.UNKNOWN_TOPIC_ID) - } else { - Set(AclEntry.authorizationError(resourceType)) - } - } - - if (topicExists) - if (isAuthorized) - assertFalse(authorizationErrors.contains(error), s"$apiKey should be allowed. Found unexpected authorization error $error") - else - assertTrue(authorizationErrors.contains(error), s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors") - else if (resources == Set(TOPIC)) - if (isAuthorized) - assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") - else { - assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") - } - - response - } - private def sendRequestAndVerifyResponseError(request: AbstractRequest, resources: Set[ResourceType], isAuthorized: Boolean, diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 1fec7892434a8..476fe14cf9a65 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -26,7 +26,7 @@ import java.util.{Collections, Optional, Properties, Random} import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr} import kafka.cluster.{Broker, Partition} -import kafka.controller.KafkaController +import kafka.controller.{ControllerContext, KafkaController} import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import kafka.coordinator.group._ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} @@ -75,6 +75,8 @@ import org.easymock.EasyMock._ import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.mockito.{ArgumentMatchers, Mockito} import scala.annotation.nowarn @@ -3479,6 +3481,161 @@ class KafkaApisTest { assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList) } + @Test + def testDeleteTopicsByIdAuthorization(): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + val controllerContext: ControllerContext = EasyMock.mock(classOf[ControllerContext]) + + EasyMock.expect(clientControllerQuotaManager.newQuotaFor( + EasyMock.anyObject(classOf[RequestChannel.Request]), + EasyMock.anyShort() + )).andReturn(UnboundedControllerMutationQuota) + EasyMock.expect(controller.isActive).andReturn(true) + EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext) + + // Try to delete three topics: + // 1. One without describe permission + // 2. One without delete permission + // 3. One which is authorized, but doesn't exist + + expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map( + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.ALLOWED + )) + + expectTopicAuthorization(authorizer, AclOperation.DELETE, Map( + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.DENIED + )) + + val topicIdsMap = Map( + Uuid.randomUuid() -> Some("foo"), + Uuid.randomUuid() -> Some("bar"), + Uuid.randomUuid() -> None + ) + + topicIdsMap.foreach { case (topicId, topicNameOpt) => + EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt) + } + + val topicDatas = topicIdsMap.keys.map { topicId => + new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId) + }.toList + val deleteRequest = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() + .setTopics(topicDatas.asJava)) + .build(ApiKeys.DELETE_TOPICS.latestVersion) + + val request = buildRequest(deleteRequest) + val capturedResponse = expectNoThrottling(request) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager, + requestChannel, txnCoordinator, controller, controllerContext, authorizer) + createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request) + + val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse] + + topicIdsMap.foreach { case (topicId, nameOpt) => + val response = deleteResponse.data.responses.asScala.find(_.topicId == topicId).get + nameOpt match { + case Some("foo") => + assertNull(response.name) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode)) + case Some("bar") => + assertEquals("bar", response.name) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode)) + case None => + assertNull(response.name) + assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode)) + case _ => + fail("Unexpected topic id/name mapping") + } + } + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + + EasyMock.expect(clientControllerQuotaManager.newQuotaFor( + EasyMock.anyObject(classOf[RequestChannel.Request]), + EasyMock.anyShort() + )).andReturn(UnboundedControllerMutationQuota) + EasyMock.expect(controller.isActive).andReturn(true) + + // Try to delete three topics: + // 1. One without describe permission + // 2. One without delete permission + // 3. One which is authorized, but doesn't exist + + expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map( + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.ALLOWED, + "baz" -> AuthorizationResult.ALLOWED + )) + + expectTopicAuthorization(authorizer, AclOperation.DELETE, Map( + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.DENIED, + "baz" -> AuthorizationResult.ALLOWED + )) + + val deleteRequest = if (usePrimitiveTopicNameArray) { + new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() + .setTopicNames(List("foo", "bar", "baz").asJava)) + .build(5.toShort) + } else { + val topicDatas = List( + new DeleteTopicsRequestData.DeleteTopicState().setName("foo"), + new DeleteTopicsRequestData.DeleteTopicState().setName("bar"), + new DeleteTopicsRequestData.DeleteTopicState().setName("baz") + ) + new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() + .setTopics(topicDatas.asJava)) + .build(ApiKeys.DELETE_TOPICS.latestVersion) + } + + val request = buildRequest(deleteRequest) + val capturedResponse = expectNoThrottling(request) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager, + requestChannel, txnCoordinator, controller, authorizer) + createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request) + + val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse] + + def lookupErrorCode(topic: String): Option[Errors] = { + Option(deleteResponse.data.responses().find(topic)) + .map(result => Errors.forCode(result.errorCode)) + } + + assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("foo")) + assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("bar")) + assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz")) + } + + def expectTopicAuthorization( + authorizer: Authorizer, + aclOperation: AclOperation, + topicResults: Map[String, AuthorizationResult] + ): Unit = { + val expectedActions = topicResults.keys.map { topic => + val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + topic -> new Action(aclOperation, pattern, 1, true, true) + }.toMap + + val actionsCapture: Capture[util.List[Action]] = EasyMock.newCapture() + EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.capture(actionsCapture))) + .andAnswer(() => { + actionsCapture.getValue.asScala.map { action => + val topic = action.resourcePattern.name + assertEquals(expectedActions(topic), action) + topicResults(topic) + }.asJava + }) + .once() + } + private def createMockRequest(): RequestChannel.Request = { val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) val requestHeader: RequestHeader = EasyMock.createNiceMock(classOf[RequestHeader])