diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala index 042830118223f..379196aa1d42c 100644 --- a/core/src/main/scala/kafka/controller/ControllerContext.scala +++ b/core/src/main/scala/kafka/controller/ControllerContext.scala @@ -463,6 +463,10 @@ class ControllerContext { }.keySet } + def topicName(topicId: Uuid): Option[String] = { + topicNames.get(topicId) + } + def clearPartitionLeadershipInfo(): Unit = partitionLeadershipInfo.clear() def partitionWithLeadersCount: Int = partitionLeadershipInfo.size diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d4fd527d3d4e1..bc228403b1088 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1874,7 +1874,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID) throw new InvalidRequestException("Topic name and topic ID can not both be specified.") val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name() - else zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) + else zkSupport.controller.controllerContext.topicName(topic.topicId).orNull results.add(new DeletableTopicResult() .setName(name) .setTopicId(topic.topicId())) @@ -1884,20 +1884,27 @@ class KafkaApis(val requestChannel: RequestChannel, val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, results.asScala.filter(result => result.name() != null))(_.name) results.forEach { topic => - val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null - if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) { - topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code) - topic.setErrorMessage("Topic IDs are not supported on the server.") - } else if (unresolvedTopicId) - topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) - else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name)) - topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) - else if (!authorizedDeleteTopics.contains(topic.name)) - topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) - else if (!metadataCache.contains(topic.name)) - topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - else - toDelete += topic.name + val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null + if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) { + topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code) + topic.setErrorMessage("Topic IDs are not supported on the server.") + } else if (unresolvedTopicId) { + topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code) + } else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics.contains(topic.name)) { + + // Because the client does not have Describe permission, the name should + // not be returned in the response. Note, however, that we do not consider + // the topicId itself to be sensitive, so there is no reason to obscure + // this case with `UNKNOWN_TOPIC_ID`. + topic.setName(null) + topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + } else if (!authorizedDeleteTopics.contains(topic.name)) { + topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + } else if (!metadataCache.contains(topic.name)) { + topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + } else { + toDelete += topic.name + } } // If no authorized topics return immediately if (toDelete.isEmpty) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index b100962e35c9c..ebe4634adf8a2 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -60,6 +60,8 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartiti import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.annotation.nowarn import scala.collection.mutable @@ -245,9 +247,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { }) ) - val requestKeysToErrorWithIds = (id: Uuid) => Map[ApiKeys, Nothing => Errors]( - ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => Errors.forCode(resp.data.responses.asScala.find(_.topicId == id).get.errorCode())) - ) + def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = { + response match { + case res: DeleteTopicsResponse => + Errors.forCode(res.data.responses.asScala.find(_.topicId == id).get.errorCode) + case _ => + fail(s"Unexpected response type $response") + } + } val requestKeysToAcls = Map[ApiKeys, Map[ResourcePattern, Set[AccessControlEntry]]]( ApiKeys.METADATA -> topicDescribeAcl, @@ -534,12 +541,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { .setTimeoutMs(5000)).build() } - private def deleteTopicsWithIdsRequest(id: Uuid = getTopicIds()(topic)): DeleteTopicsRequest = { + private def deleteTopicsWithIdsRequest(topicId: Uuid): DeleteTopicsRequest = { new DeleteTopicsRequest.Builder( new DeleteTopicsRequestData() .setTopics(Collections.singletonList( new DeleteTopicsRequestData.DeleteTopicState() - .setTopicId(id))) + .setTopicId(topicId))) .setTimeoutMs(5000)).build() } @@ -737,29 +744,53 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRequests(requestKeyToRequest) } - @Test - def testAuthorizationDeleteTopicsIdWithTopicExisting(): Unit = { - sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = { + val topicId = if (withTopicExisting) { + createTopic(topic) + getTopicIds()(topic) + } else { + Uuid.randomUuid() + } + + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(topicId) + ) - val id = getTopicIds()(topic) + def sendAndVerify( + request: AbstractRequest, + isAuthorized: Boolean, + isDescribeAuthorized: Boolean + ): Unit = { + val response = connectAndReceive[AbstractResponse](request) + val error = findErrorForTopicId(topicId, response) + if (!withTopicExisting) { + assertEquals(Errors.UNKNOWN_TOPIC_ID, error) + } else if (!isDescribeAuthorized || !isAuthorized) { + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, error) + } + } - for ((key, request) <- mutable.Map(ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest())) { + for ((key, request) <- requestKeyToRequest) { removeAllClientAcls() - val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = true, describeAuthorized = false, id = id) + sendAndVerify(request, isAuthorized = false, isDescribeAuthorized = false) + + val describeAcls = topicDescribeAcl(topicResource) + addAndVerifyAcls(describeAcls, topicResource) val resourceToAcls = requestKeysToAcls(key) resourceToAcls.get(topicResource).foreach { acls => - val describeAcls = topicDescribeAcl(topicResource) val isAuthorized = describeAcls == acls - addAndVerifyAcls(describeAcls, topicResource) - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = true, describeAuthorized = true, id = id) - removeAllClientAcls() + sendAndVerify(request, isAuthorized = isAuthorized, isDescribeAuthorized = true) } - for ((resource, acls) <- resourceToAcls) + removeAllClientAcls() + for ((resource, acls) <- resourceToAcls) { addAndVerifyAcls(acls, resource) - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = true, describeAuthorized = true, id = id) + } + + sendAndVerify(request, isAuthorized = true, isDescribeAuthorized = true) } } @@ -789,33 +820,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRequests(requestKeyToRequest, false) } - @Test - def testAuthorizationDeleteTopicsIdWithTopicNotExisting(): Unit = { - val id = Uuid.randomUuid() - val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(id), - ) - - for ((key, request) <- requestKeyToRequest) { - removeAllClientAcls() - val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = false, describeAuthorized = false, id = id) - - val resourceToAcls = requestKeysToAcls(key) - resourceToAcls.get(topicResource).foreach { acls => - val describeAcls = topicDescribeAcl(topicResource) - val isAuthorized = describeAcls == acls - addAndVerifyAcls(describeAcls, topicResource) - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = false, describeAuthorized = true, id = id) - removeAllClientAcls() - } - - for ((resource, acls) <- resourceToAcls) - addAndVerifyAcls(acls, resource) - sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = false, describeAuthorized = true, id = id) - } - } - @Test def testCreateTopicAuthorizationWithClusterCreate(): Unit = { removeAllClientAcls() @@ -2058,44 +2062,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } - private def sendRequestWithIdAndVerifyResponseError(request: AbstractRequest, - resources: Set[ResourceType], - isAuthorized: Boolean, - topicExists: Boolean, - describeAuthorized: Boolean, - id: Uuid): AbstractResponse = { - val apiKey = request.apiKey - val response = connectAndReceive[AbstractResponse](request) - val error = requestKeysToErrorWithIds(id)(apiKey).asInstanceOf[AbstractResponse => Errors](response) - - val authorizationErrors = resources.flatMap { resourceType => - if (resourceType == TOPIC) { - if (isAuthorized) - Set(Errors.UNKNOWN_TOPIC_ID, AclEntry.authorizationError(ResourceType.TOPIC)) - else if (describeAuthorized) - Set(AclEntry.authorizationError(ResourceType.TOPIC)) - else - Set(Errors.UNKNOWN_TOPIC_ID) - } else { - Set(AclEntry.authorizationError(resourceType)) - } - } - - if (topicExists) - if (isAuthorized) - assertFalse(authorizationErrors.contains(error), s"$apiKey should be allowed. Found unexpected authorization error $error") - else - assertTrue(authorizationErrors.contains(error), s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors") - else if (resources == Set(TOPIC)) - if (isAuthorized) - assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") - else { - assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error") - } - - response - } - private def sendRequestAndVerifyResponseError(request: AbstractRequest, resources: Set[ResourceType], isAuthorized: Boolean, diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 1fec7892434a8..476fe14cf9a65 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -26,7 +26,7 @@ import java.util.{Collections, Optional, Properties, Random} import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr} import kafka.cluster.{Broker, Partition} -import kafka.controller.KafkaController +import kafka.controller.{ControllerContext, KafkaController} import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import kafka.coordinator.group._ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} @@ -75,6 +75,8 @@ import org.easymock.EasyMock._ import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import org.mockito.{ArgumentMatchers, Mockito} import scala.annotation.nowarn @@ -3479,6 +3481,161 @@ class KafkaApisTest { assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList) } + @Test + def testDeleteTopicsByIdAuthorization(): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + val controllerContext: ControllerContext = EasyMock.mock(classOf[ControllerContext]) + + EasyMock.expect(clientControllerQuotaManager.newQuotaFor( + EasyMock.anyObject(classOf[RequestChannel.Request]), + EasyMock.anyShort() + )).andReturn(UnboundedControllerMutationQuota) + EasyMock.expect(controller.isActive).andReturn(true) + EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext) + + // Try to delete three topics: + // 1. One without describe permission + // 2. One without delete permission + // 3. One which is authorized, but doesn't exist + + expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map( + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.ALLOWED + )) + + expectTopicAuthorization(authorizer, AclOperation.DELETE, Map( + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.DENIED + )) + + val topicIdsMap = Map( + Uuid.randomUuid() -> Some("foo"), + Uuid.randomUuid() -> Some("bar"), + Uuid.randomUuid() -> None + ) + + topicIdsMap.foreach { case (topicId, topicNameOpt) => + EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt) + } + + val topicDatas = topicIdsMap.keys.map { topicId => + new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId) + }.toList + val deleteRequest = new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() + .setTopics(topicDatas.asJava)) + .build(ApiKeys.DELETE_TOPICS.latestVersion) + + val request = buildRequest(deleteRequest) + val capturedResponse = expectNoThrottling(request) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager, + requestChannel, txnCoordinator, controller, controllerContext, authorizer) + createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request) + + val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse] + + topicIdsMap.foreach { case (topicId, nameOpt) => + val response = deleteResponse.data.responses.asScala.find(_.topicId == topicId).get + nameOpt match { + case Some("foo") => + assertNull(response.name) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode)) + case Some("bar") => + assertEquals("bar", response.name) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.forCode(response.errorCode)) + case None => + assertNull(response.name) + assertEquals(Errors.UNKNOWN_TOPIC_ID, Errors.forCode(response.errorCode)) + case _ => + fail("Unexpected topic id/name mapping") + } + } + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray: Boolean): Unit = { + val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer]) + + EasyMock.expect(clientControllerQuotaManager.newQuotaFor( + EasyMock.anyObject(classOf[RequestChannel.Request]), + EasyMock.anyShort() + )).andReturn(UnboundedControllerMutationQuota) + EasyMock.expect(controller.isActive).andReturn(true) + + // Try to delete three topics: + // 1. One without describe permission + // 2. One without delete permission + // 3. One which is authorized, but doesn't exist + + expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map( + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.ALLOWED, + "baz" -> AuthorizationResult.ALLOWED + )) + + expectTopicAuthorization(authorizer, AclOperation.DELETE, Map( + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.DENIED, + "baz" -> AuthorizationResult.ALLOWED + )) + + val deleteRequest = if (usePrimitiveTopicNameArray) { + new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() + .setTopicNames(List("foo", "bar", "baz").asJava)) + .build(5.toShort) + } else { + val topicDatas = List( + new DeleteTopicsRequestData.DeleteTopicState().setName("foo"), + new DeleteTopicsRequestData.DeleteTopicState().setName("bar"), + new DeleteTopicsRequestData.DeleteTopicState().setName("baz") + ) + new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() + .setTopics(topicDatas.asJava)) + .build(ApiKeys.DELETE_TOPICS.latestVersion) + } + + val request = buildRequest(deleteRequest) + val capturedResponse = expectNoThrottling(request) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, clientControllerQuotaManager, + requestChannel, txnCoordinator, controller, authorizer) + createKafkaApis(authorizer = Some(authorizer)).handleDeleteTopicsRequest(request) + + val deleteResponse = capturedResponse.getValue.asInstanceOf[DeleteTopicsResponse] + + def lookupErrorCode(topic: String): Option[Errors] = { + Option(deleteResponse.data.responses().find(topic)) + .map(result => Errors.forCode(result.errorCode)) + } + + assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("foo")) + assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED), lookupErrorCode("bar")) + assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION), lookupErrorCode("baz")) + } + + def expectTopicAuthorization( + authorizer: Authorizer, + aclOperation: AclOperation, + topicResults: Map[String, AuthorizationResult] + ): Unit = { + val expectedActions = topicResults.keys.map { topic => + val pattern = new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + topic -> new Action(aclOperation, pattern, 1, true, true) + }.toMap + + val actionsCapture: Capture[util.List[Action]] = EasyMock.newCapture() + EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.capture(actionsCapture))) + .andAnswer(() => { + actionsCapture.getValue.asScala.map { action => + val topic = action.resourcePattern.name + assertEquals(expectedActions(topic), action) + topicResults(topic) + }.asJava + }) + .once() + } + private def createMockRequest(): RequestChannel.Request = { val request: RequestChannel.Request = EasyMock.createNiceMock(classOf[RequestChannel.Request]) val requestHeader: RequestHeader = EasyMock.createNiceMock(classOf[RequestHeader])