Skip to content

Commit

Permalink
KAFKA-12394; Return TOPIC_AUTHORIZATION_FAILED in delete topic resp…
Browse files Browse the repository at this point in the history
…onse if no describe permission (#10223)

We now accept topicIds in the `DeleteTopic` request. If the client principal does not have `Describe` permission, then we return `TOPIC_AUTHORIZATION_FAILED`. This is justified because the topicId is not considered sensitive. However, in this case, we should not return the name of the topic in the response since we do consider it sensitive.

Reviewers: David Jacot <djacot@confluent.io>, dengziming <dengziming1993@gmail.com>, Justine Olshan <jolshan@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
Jason Gustafson authored Mar 2, 2021
1 parent 29b4a3d commit a4ba732
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 99 deletions.
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/controller/ControllerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,10 @@ class ControllerContext {
}.keySet
}

def topicName(topicId: Uuid): Option[String] = {
topicNames.get(topicId)
}

def clearPartitionLeadershipInfo(): Unit = partitionLeadershipInfo.clear()

def partitionWithLeadersCount: Int = partitionLeadershipInfo.size
Expand Down
37 changes: 22 additions & 15 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1874,7 +1874,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
throw new InvalidRequestException("Topic name and topic ID can not both be specified.")
val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
else zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
else zkSupport.controller.controllerContext.topicName(topic.topicId).orNull
results.add(new DeletableTopicResult()
.setName(name)
.setTopicId(topic.topicId()))
Expand All @@ -1884,20 +1884,27 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedDeleteTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
results.asScala.filter(result => result.name() != null))(_.name)
results.forEach { topic =>
val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) && topic.name() == null
if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
topic.setErrorMessage("Topic IDs are not supported on the server.")
} else if (unresolvedTopicId)
topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics(topic.name))
topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
else if (!authorizedDeleteTopics.contains(topic.name))
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
else if (!metadataCache.contains(topic.name))
topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
else
toDelete += topic.name
val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID && topic.name() == null
if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
topic.setErrorMessage("Topic IDs are not supported on the server.")
} else if (unresolvedTopicId) {
topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
} else if (topicIdsFromRequest.contains(topic.topicId) && !authorizedDescribeTopics.contains(topic.name)) {

// Because the client does not have Describe permission, the name should
// not be returned in the response. Note, however, that we do not consider
// the topicId itself to be sensitive, so there is no reason to obscure
// this case with `UNKNOWN_TOPIC_ID`.
topic.setName(null)
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
} else if (!authorizedDeleteTopics.contains(topic.name)) {
topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
} else if (!metadataCache.contains(topic.name)) {
topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
} else {
toDelete += topic.name
}
}
// If no authorized topics return immediately
if (toDelete.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartiti
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

import scala.annotation.nowarn
import scala.collection.mutable
Expand Down Expand Up @@ -245,9 +247,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
})
)

val requestKeysToErrorWithIds = (id: Uuid) => Map[ApiKeys, Nothing => Errors](
ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => Errors.forCode(resp.data.responses.asScala.find(_.topicId == id).get.errorCode()))
)
def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
response match {
case res: DeleteTopicsResponse =>
Errors.forCode(res.data.responses.asScala.find(_.topicId == id).get.errorCode)
case _ =>
fail(s"Unexpected response type $response")
}
}

val requestKeysToAcls = Map[ApiKeys, Map[ResourcePattern, Set[AccessControlEntry]]](
ApiKeys.METADATA -> topicDescribeAcl,
Expand Down Expand Up @@ -534,12 +541,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setTimeoutMs(5000)).build()
}

private def deleteTopicsWithIdsRequest(id: Uuid = getTopicIds()(topic)): DeleteTopicsRequest = {
private def deleteTopicsWithIdsRequest(topicId: Uuid): DeleteTopicsRequest = {
new DeleteTopicsRequest.Builder(
new DeleteTopicsRequestData()
.setTopics(Collections.singletonList(
new DeleteTopicsRequestData.DeleteTopicState()
.setTopicId(id)))
.setTopicId(topicId)))
.setTimeoutMs(5000)).build()
}

Expand Down Expand Up @@ -737,29 +744,53 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
sendRequests(requestKeyToRequest)
}

@Test
def testAuthorizationDeleteTopicsIdWithTopicExisting(): Unit = {
sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = {
val topicId = if (withTopicExisting) {
createTopic(topic)
getTopicIds()(topic)
} else {
Uuid.randomUuid()
}

val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(topicId)
)

val id = getTopicIds()(topic)
def sendAndVerify(
request: AbstractRequest,
isAuthorized: Boolean,
isDescribeAuthorized: Boolean
): Unit = {
val response = connectAndReceive[AbstractResponse](request)
val error = findErrorForTopicId(topicId, response)
if (!withTopicExisting) {
assertEquals(Errors.UNKNOWN_TOPIC_ID, error)
} else if (!isDescribeAuthorized || !isAuthorized) {
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, error)
}
}

for ((key, request) <- mutable.Map(ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest())) {
for ((key, request) <- requestKeyToRequest) {
removeAllClientAcls()
val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = true, describeAuthorized = false, id = id)
sendAndVerify(request, isAuthorized = false, isDescribeAuthorized = false)

val describeAcls = topicDescribeAcl(topicResource)
addAndVerifyAcls(describeAcls, topicResource)

val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).foreach { acls =>
val describeAcls = topicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = true, describeAuthorized = true, id = id)
removeAllClientAcls()
sendAndVerify(request, isAuthorized = isAuthorized, isDescribeAuthorized = true)
}

for ((resource, acls) <- resourceToAcls)
removeAllClientAcls()
for ((resource, acls) <- resourceToAcls) {
addAndVerifyAcls(acls, resource)
sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = true, describeAuthorized = true, id = id)
}

sendAndVerify(request, isAuthorized = true, isDescribeAuthorized = true)
}
}

Expand Down Expand Up @@ -789,33 +820,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
sendRequests(requestKeyToRequest, false)
}

@Test
def testAuthorizationDeleteTopicsIdWithTopicNotExisting(): Unit = {
val id = Uuid.randomUuid()
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(id),
)

for ((key, request) <- requestKeyToRequest) {
removeAllClientAcls()
val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = false, topicExists = false, describeAuthorized = false, id = id)

val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).foreach { acls =>
val describeAcls = topicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
addAndVerifyAcls(describeAcls, topicResource)
sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = isAuthorized, topicExists = false, describeAuthorized = true, id = id)
removeAllClientAcls()
}

for ((resource, acls) <- resourceToAcls)
addAndVerifyAcls(acls, resource)
sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized = true, topicExists = false, describeAuthorized = true, id = id)
}
}

@Test
def testCreateTopicAuthorizationWithClusterCreate(): Unit = {
removeAllClientAcls()
Expand Down Expand Up @@ -2058,44 +2062,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}

private def sendRequestWithIdAndVerifyResponseError(request: AbstractRequest,
resources: Set[ResourceType],
isAuthorized: Boolean,
topicExists: Boolean,
describeAuthorized: Boolean,
id: Uuid): AbstractResponse = {
val apiKey = request.apiKey
val response = connectAndReceive[AbstractResponse](request)
val error = requestKeysToErrorWithIds(id)(apiKey).asInstanceOf[AbstractResponse => Errors](response)

val authorizationErrors = resources.flatMap { resourceType =>
if (resourceType == TOPIC) {
if (isAuthorized)
Set(Errors.UNKNOWN_TOPIC_ID, AclEntry.authorizationError(ResourceType.TOPIC))
else if (describeAuthorized)
Set(AclEntry.authorizationError(ResourceType.TOPIC))
else
Set(Errors.UNKNOWN_TOPIC_ID)
} else {
Set(AclEntry.authorizationError(resourceType))
}
}

if (topicExists)
if (isAuthorized)
assertFalse(authorizationErrors.contains(error), s"$apiKey should be allowed. Found unexpected authorization error $error")
else
assertTrue(authorizationErrors.contains(error), s"$apiKey should be forbidden. Found error $error but expected one of $authorizationErrors")
else if (resources == Set(TOPIC))
if (isAuthorized)
assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error")
else {
assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an unexpected error")
}

response
}

private def sendRequestAndVerifyResponseError(request: AbstractRequest,
resources: Set[ResourceType],
isAuthorized: Boolean,
Expand Down
Loading

0 comments on commit a4ba732

Please sign in to comment.