diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala b/core/src/main/scala/kafka/security/auth/Acl.scala deleted file mode 100644 index befd9d27a61dd..0000000000000 --- a/core/src/main/scala/kafka/security/auth/Acl.scala +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.security.auth - -import kafka.security.authorizer.AclEntry -import org.apache.kafka.common.resource.ResourcePattern -import org.apache.kafka.common.security.auth.KafkaPrincipal - -@deprecated("Use org.apache.kafka.common.acl.AclBinding", "Since 2.5") -object Acl { - val WildCardPrincipal: KafkaPrincipal = AclEntry.WildcardPrincipal - val WildCardHost: String = AclEntry.WildcardHost - val WildCardResource: String = ResourcePattern.WILDCARD_RESOURCE - val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All) - val PrincipalKey = AclEntry.PrincipalKey - val PermissionTypeKey = AclEntry.PermissionTypeKey - val OperationKey = AclEntry.OperationKey - val HostsKey = AclEntry.HostsKey - val VersionKey = AclEntry.VersionKey - val CurrentVersion = AclEntry.CurrentVersion - val AclsKey = AclEntry.AclsKey - - /** - * - * @see AclEntry - */ - def fromBytes(bytes: Array[Byte]): Set[Acl] = { - AclEntry.fromBytes(bytes) - .map(ace => Acl(ace.kafkaPrincipal, - PermissionType.fromJava(ace.permissionType()), - ace.host(), - Operation.fromJava(ace.operation()))) - } - - def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = { - AclEntry.toJsonCompatibleMap(acls.map(acl => - AclEntry(acl.principal, acl.permissionType.toJava, acl.host, acl.operation.toJava) - )) - } -} - -/** - * An instance of this class will represent an acl that can express following statement. - *
- * Principal P has permissionType PT on Operation O1 from hosts H1. - *- * @param principal A value of *:* indicates all users. - * @param permissionType - * @param host A value of * indicates all hosts. - * @param operation A value of ALL indicates all operations. - */ -@deprecated("Use org.apache.kafka.common.acl.AclBinding", "Since 2.5") -case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) { - - /** - * TODO: Ideally we would have a symmetric toJson method but our current json library can not jsonify/dejsonify complex objects. - * @return Map representation of the Acl. - */ - def toMap(): Map[String, Any] = { - Map(Acl.PrincipalKey -> principal.toString, - Acl.PermissionTypeKey -> permissionType.name, - Acl.OperationKey -> operation.name, - Acl.HostsKey -> host) - } - - override def toString: String = { - "%s has %s permission for operations: %s from hosts: %s".format(principal, permissionType.name, operation, host) - } - -} - diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala b/core/src/main/scala/kafka/security/auth/Authorizer.scala deleted file mode 100644 index 7509171313a53..0000000000000 --- a/core/src/main/scala/kafka/security/auth/Authorizer.scala +++ /dev/null @@ -1,149 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.security.auth - -import kafka.network.RequestChannel.Session -import org.apache.kafka.common.Configurable -import org.apache.kafka.common.security.auth.KafkaPrincipal - -/** - * Top level interface that all pluggable authorizers must implement. Kafka will read the `authorizer.class.name` config - * value at startup time, create an instance of the specified class using the default constructor, and call its - * `configure` method. - * - * From that point onwards, every client request will first be routed to the `authorize` method and the request will only - * be authorized if the method returns true. - * - * If `authorizer.class.name` has no value specified, then no authorization will be performed, and all operations are - * permitted. - */ -@deprecated("Use org.apache.kafka.server.authorizer.Authorizer", "Since 2.4") -trait Authorizer extends Configurable { - - /** - * @param session The session being authenticated. - * @param operation Type of operation client is trying to perform on resource. - * @param resource Resource the client is trying to access. Resource pattern type is always literal in input resource. - * @return true if the operation should be permitted, false otherwise - */ - def authorize(session: Session, operation: Operation, resource: Resource): Boolean - - /** - * add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new - * acls will be added to existing acls. - * - * {code} - * // The following will add ACLs to the literal resource path 'foo', which will only affect the topic named 'foo': - * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL)) - * - * // The following will add ACLs to the special literal topic resource path '*', which affects all topics: - * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL)) - * - * // The following will add ACLs to the prefixed resource path 'foo', which affects all topics whose name begins with 'foo': - * authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED)) - * {code} - * - * @param acls set of acls to add to existing acls - * @param resource the resource path to which these acls should be attached. - * the supplied resource will have a specific resource pattern type, - * i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``. - */ - def addAcls(acls: Set[Acl], resource: Resource): Unit - - /** - * remove these acls from the resource. - * - * {code} - * // The following will remove ACLs from the literal resource path 'foo', which will only affect the topic named 'foo': - * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL)) - * - * // The following will remove ACLs from the special literal topic resource path '*', which affects all topics: - * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL)) - * - * // The following will remove ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo': - * authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED)) - * {code} - * - * @param acls set of acls to be removed. - * @param resource resource path from which the acls should be removed. - * the supplied resource will have a specific resource pattern type, - * i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``. - * @return true if some acl got removed, false if no acl was removed. - */ - def removeAcls(acls: Set[Acl], resource: Resource): Boolean - - /** - * remove a resource along with all of its acls from acl store. - * - * {code} - * // The following will remove all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo': - * authorizer.removeAcls(Resource(Topic, "foo", LITERAL)) - * - * // The following will remove all ACLs from the special literal topic resource path '*', which affects all topics: - * authorizer.removeAcls(Resource(Topic, "*", LITERAL)) - * - * // The following will remove all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo': - * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED)) - * {code} - * - * @param resource the resource path from which these acls should be removed. - * the supplied resource will have a specific resource pattern type, - * i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``. - * @return - */ - def removeAcls(resource: Resource): Boolean - - /** - * get set of acls for the supplied resource - * - * {code} - * // The following will get all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo': - * authorizer.removeAcls(Resource(Topic, "foo", LITERAL)) - * - * // The following will get all ACLs from the special literal topic resource path '*', which affects all topics: - * authorizer.removeAcls(Resource(Topic, "*", LITERAL)) - * - * // The following will get all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo': - * authorizer.removeAcls(Resource(Topic, "foo", PREFIXED)) - * {code} - * - * @param resource the resource path to which the acls belong. - * the supplied resource will have a specific resource pattern type, - * i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``. - * @return empty set if no acls are found, otherwise the acls for the resource. - */ - def getAcls(resource: Resource): Set[Acl] - - /** - * get the acls for this principal. - * @param principal principal name. - * @return empty Map if no acls exist for this principal, otherwise a map of resource -> acls for the principal. - */ - def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] - - /** - * gets the map of resource paths to acls for all resources. - */ - def getAcls(): Map[Resource, Set[Acl]] - - /** - * Closes this instance. - */ - def close(): Unit - -} diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala deleted file mode 100644 index 3ae2384ea0156..0000000000000 --- a/core/src/main/scala/kafka/security/auth/Operation.scala +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.security.auth - -import kafka.common.{BaseEnum, KafkaException} -import org.apache.kafka.common.acl.AclOperation - -/** - * Different operations a client may perform on kafka resources. - */ -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -sealed trait Operation extends BaseEnum { - def toJava : AclOperation -} - -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object Read extends Operation { - val name = "Read" - val toJava = AclOperation.READ -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object Write extends Operation { - val name = "Write" - val toJava = AclOperation.WRITE -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object Create extends Operation { - val name = "Create" - val toJava = AclOperation.CREATE -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object Delete extends Operation { - val name = "Delete" - val toJava = AclOperation.DELETE -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object Alter extends Operation { - val name = "Alter" - val toJava = AclOperation.ALTER -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object Describe extends Operation { - val name = "Describe" - val toJava = AclOperation.DESCRIBE -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object ClusterAction extends Operation { - val name = "ClusterAction" - val toJava = AclOperation.CLUSTER_ACTION -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object DescribeConfigs extends Operation { - val name = "DescribeConfigs" - val toJava = AclOperation.DESCRIBE_CONFIGS -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object AlterConfigs extends Operation { - val name = "AlterConfigs" - val toJava = AclOperation.ALTER_CONFIGS -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object IdempotentWrite extends Operation { - val name = "IdempotentWrite" - val toJava = AclOperation.IDEMPOTENT_WRITE -} -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -case object All extends Operation { - val name = "All" - val toJava = AclOperation.ALL -} - -@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5") -object Operation { - - def fromString(operation: String): Operation = { - val op = values.find(op => op.name.equalsIgnoreCase(operation)) - op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(","))) - } - - def fromJava(operation: AclOperation): Operation = { - operation match { - case AclOperation.READ => Read - case AclOperation.WRITE => Write - case AclOperation.CREATE => Create - case AclOperation.DELETE => Delete - case AclOperation.ALTER => Alter - case AclOperation.DESCRIBE => Describe - case AclOperation.CLUSTER_ACTION => ClusterAction - case AclOperation.ALTER_CONFIGS => AlterConfigs - case AclOperation.DESCRIBE_CONFIGS => DescribeConfigs - case AclOperation.IDEMPOTENT_WRITE => IdempotentWrite - case AclOperation.ALL => All - case _ => throw new KafkaException(operation + " is not a convertible operation name. The valid names are " + values.mkString(",")) - } - } - - def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, - DescribeConfigs, IdempotentWrite, All) -} diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala deleted file mode 100644 index c5325b25342d1..0000000000000 --- a/core/src/main/scala/kafka/security/auth/PermissionType.scala +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.security.auth - -import kafka.common.{BaseEnum, KafkaException} -import org.apache.kafka.common.acl.AclPermissionType - -@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5") -sealed trait PermissionType extends BaseEnum { - val toJava: AclPermissionType -} - -@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5") -case object Allow extends PermissionType { - val name = "Allow" - val toJava = AclPermissionType.ALLOW -} - -@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5") -case object Deny extends PermissionType { - val name = "Deny" - val toJava = AclPermissionType.DENY -} - -@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5") -object PermissionType { - def fromString(permissionType: String): PermissionType = { - val pType = values.find(pType => pType.name.equalsIgnoreCase(permissionType)) - pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(","))) - } - - def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString) - - def values: Seq[PermissionType] = List(Allow, Deny) -} - diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala b/core/src/main/scala/kafka/security/auth/Resource.scala deleted file mode 100644 index 8045c68141687..0000000000000 --- a/core/src/main/scala/kafka/security/auth/Resource.scala +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.security.auth - -import kafka.common.KafkaException -import kafka.security.authorizer.AclEntry -import org.apache.kafka.common.resource.{PatternType, ResourcePattern} - -@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5") -object Resource { - val Separator = AclEntry.ResourceSeparator - val ClusterResourceName = "kafka-cluster" - val ClusterResource = Resource(Cluster, Resource.ClusterResourceName, PatternType.LITERAL) - val WildCardResource = AclEntry.WildcardResource - - @deprecated("This resource name is not used by Kafka and will be removed in a future release", since = "2.1") - val ProducerIdResourceName = "producer-id" // This is not used since we don't have a producer id resource - - def fromString(str: String): Resource = { - ResourceType.values.find(resourceType => str.startsWith(resourceType.name + Separator)) match { - case None => throw new KafkaException("Invalid resource string: '" + str + "'") - case Some(resourceType) => - val remaining = str.substring(resourceType.name.length + 1) - - PatternType.values.find(patternType => remaining.startsWith(patternType.name + Separator)) match { - case Some(patternType) => - val name = remaining.substring(patternType.name.length + 1) - Resource(resourceType, name, patternType) - - case None => - Resource(resourceType, remaining, PatternType.LITERAL) - } - } - } -} - -/** - * - * @param resourceType non-null type of resource. - * @param name non-null name of the resource, for topic this will be topic name , for group it will be group name. For cluster type - * it will be a constant string kafka-cluster. - * @param patternType non-null resource pattern type: literal, prefixed, etc. - */ -@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5") -case class Resource(resourceType: ResourceType, name: String, patternType: PatternType) { - - if (!patternType.isSpecific) - throw new IllegalArgumentException(s"patternType must not be $patternType") - - /** - * Create an instance of this class with the provided parameters. - * Resource pattern type would default to PatternType.LITERAL. - * - * @param resourceType non-null resource type - * @param name non-null resource name - * @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, PatternType)]] - */ - @deprecated("Use Resource(ResourceType, String, PatternType", "Since 2.0") - def this(resourceType: ResourceType, name: String) = { - this(resourceType, name, PatternType.LITERAL) - } - - def toPattern: ResourcePattern = { - new ResourcePattern(resourceType.toJava, name, patternType) - } - - override def toString: String = { - resourceType.name + Resource.Separator + patternType + Resource.Separator + name - } -} - diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala deleted file mode 100644 index 72b71c0b991dd..0000000000000 --- a/core/src/main/scala/kafka/security/auth/ResourceType.scala +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.security.auth - -import kafka.common.{BaseEnum, KafkaException} -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.resource.{ResourceType => JResourceType} - -@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5") -sealed trait ResourceType extends BaseEnum with Ordered[ ResourceType ] { - def error: Errors - def toJava: JResourceType - // this method output will not include "All" Operation type - def supportedOperations: Set[Operation] - - override def compare(that: ResourceType): Int = this.name compare that.name -} - -@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5") -case object Topic extends ResourceType { - val name = "Topic" - val error = Errors.TOPIC_AUTHORIZATION_FAILED - val toJava = JResourceType.TOPIC - val supportedOperations = Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs) -} - -@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5") -case object Group extends ResourceType { - val name = "Group" - val error = Errors.GROUP_AUTHORIZATION_FAILED - val toJava = JResourceType.GROUP - val supportedOperations = Set(Read, Describe, Delete) -} - -@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5") -case object Cluster extends ResourceType { - val name = "Cluster" - val error = Errors.CLUSTER_AUTHORIZATION_FAILED - val toJava = JResourceType.CLUSTER - val supportedOperations = Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe) -} - -@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5") -case object TransactionalId extends ResourceType { - val name = "TransactionalId" - val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED - val toJava = JResourceType.TRANSACTIONAL_ID - val supportedOperations = Set(Describe, Write) -} - -@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5") -case object DelegationToken extends ResourceType { - val name = "DelegationToken" - val error = Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED - val toJava = JResourceType.DELEGATION_TOKEN - val supportedOperations : Set[Operation] = Set(Describe) -} - -@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5") -object ResourceType { - - def fromString(resourceType: String): ResourceType = { - val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceType)) - rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(","))) - } - - def fromJava(resourceType: JResourceType): ResourceType = { - resourceType match { - case JResourceType.TOPIC => Topic - case JResourceType.GROUP => Group - case JResourceType.CLUSTER => Cluster - case JResourceType.TRANSACTIONAL_ID => TransactionalId - case JResourceType.DELEGATION_TOKEN => DelegationToken - case _ => throw new KafkaException(resourceType + " is not a convertible resource type. The valid types are " + values.mkString(",")) - } - } - - def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId, DelegationToken) -} diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala deleted file mode 100644 index 653e154f7158b..0000000000000 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.security.auth - -import java.util - -import kafka.network.RequestChannel.Session -import kafka.security.auth.SimpleAclAuthorizer.BaseAuthorizer -import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils, AuthorizerWrapper} -import kafka.utils._ -import kafka.zk.ZkVersion -import org.apache.kafka.common.acl.{AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} -import org.apache.kafka.common.errors.ApiException -import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter} -import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} - -import scala.collection.mutable -import scala.jdk.CollectionConverters._ - -@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4") -object SimpleAclAuthorizer { - //optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in - //same zookeeper where all other kafka broker info is stored. - val ZkUrlProp = AclAuthorizer.ZkUrlProp - val ZkConnectionTimeOutProp = AclAuthorizer.ZkConnectionTimeOutProp - val ZkSessionTimeOutProp = AclAuthorizer.ZkSessionTimeOutProp - val ZkMaxInFlightRequests = AclAuthorizer.ZkMaxInFlightRequests - - //List of users that will be treated as super users and will have access to all the resources for all actions from all hosts, defaults to no super users. - val SuperUsersProp = AclAuthorizer.SuperUsersProp - //If set to true when no acls are found for a resource , authorizer allows access to everyone. Defaults to false. - val AllowEveryoneIfNoAclIsFoundProp = AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp - - case class VersionedAcls(acls: Set[Acl], zkVersion: Int) { - def exists: Boolean = zkVersion != ZkVersion.UnknownVersion - } - val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion) - - private[auth] class BaseAuthorizer extends AclAuthorizer { - override def logAuditMessage(requestContext: AuthorizableRequestContext, action: Action, authorized: Boolean): Unit = { - val principal = requestContext.principal - val host = requestContext.clientAddress.getHostAddress - val operation = Operation.fromJava(action.operation) - val resource = AuthorizerWrapper.convertToResource(action.resourcePattern) - def logMessage: String = { - val authResult = if (authorized) "Allowed" else "Denied" - s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource" - } - - if (authorized) authorizerLogger.debug(logMessage) - else authorizerLogger.info(logMessage) - } - } -} - -@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4") -class SimpleAclAuthorizer extends Authorizer with Logging { - - private val aclAuthorizer = new BaseAuthorizer - - // The maximum number of times we should try to update the resource acls in zookeeper before failing; - // This should never occur, but is a safeguard just in case. - protected[auth] var maxUpdateRetries = 10 - - - /** - * Guaranteed to be called before any authorize call is made. - */ - override def configure(javaConfigs: util.Map[String, _]): Unit = { - aclAuthorizer.configure(javaConfigs) - } - - override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { - val requestContext = AuthorizerUtils.sessionToRequestContext(session) - val action = new Action(operation.toJava, resource.toPattern, 1, true, true) - aclAuthorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED - } - - def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = { - aclAuthorizer.isSuperUser(principal) - } - - override def addAcls(acls: Set[Acl], resource: Resource): Unit = { - aclAuthorizer.maxUpdateRetries = maxUpdateRetries - if (acls != null && acls.nonEmpty) { - val bindings = acls.map { acl => AuthorizerWrapper.convertToAclBinding(resource, acl) } - createAcls(bindings) - } - } - - override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = { - val filters = aclsTobeRemoved.map { acl => - new AclBindingFilter(resource.toPattern.toFilter, AuthorizerWrapper.convertToAccessControlEntry(acl).toFilter) - } - deleteAcls(filters) - } - - override def removeAcls(resource: Resource): Boolean = { - val filter = new AclBindingFilter(resource.toPattern.toFilter, AccessControlEntryFilter.ANY) - deleteAcls(Set(filter)) - } - - override def getAcls(resource: Resource): Set[Acl] = { - val filter = new AclBindingFilter(resource.toPattern.toFilter, AccessControlEntryFilter.ANY) - acls(filter).getOrElse(resource, Set.empty) - } - - override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = { - val filter = new AclBindingFilter(ResourcePatternFilter.ANY, - new AccessControlEntryFilter(principal.toString, null, AclOperation.ANY, AclPermissionType.ANY)) - acls(filter) - } - - def getMatchingAcls(resourceType: ResourceType, resourceName: String): Set[Acl] = { - val filter = new AclBindingFilter(new ResourcePatternFilter(resourceType.toJava, resourceName, PatternType.MATCH), - AccessControlEntryFilter.ANY) - acls(filter).flatMap(_._2).toSet - } - - override def getAcls(): Map[Resource, Set[Acl]] = { - acls(AclBindingFilter.ANY) - } - - def close(): Unit = { - aclAuthorizer.close() - } - - private def createAcls(bindings: Set[AclBinding]): Unit = { - aclAuthorizer.maxUpdateRetries = maxUpdateRetries - val results = aclAuthorizer.createAcls(null, bindings.toList.asJava).asScala.map(_.toCompletableFuture.get) - results.foreach { result => result.exception.ifPresent(throwException) } - } - - private def deleteAcls(filters: Set[AclBindingFilter]): Boolean = { - aclAuthorizer.maxUpdateRetries = maxUpdateRetries - val results = aclAuthorizer.deleteAcls(null, filters.toList.asJava).asScala.map(_.toCompletableFuture.get) - results.foreach { result => result.exception.ifPresent(throwException) } - results.flatMap(_.aclBindingDeleteResults.asScala).foreach { result => result.exception.ifPresent(e => throw e) } - results.exists(r => r.aclBindingDeleteResults.asScala.exists(d => !d.exception.isPresent)) - } - - private def acls(filter: AclBindingFilter): Map[Resource, Set[Acl]] = { - val result = mutable.Map[Resource, mutable.Set[Acl]]() - aclAuthorizer.acls(filter).forEach { binding => - val resource = AuthorizerWrapper.convertToResource(binding.pattern) - val acl = AuthorizerWrapper.convertToAcl(binding.entry) - result.getOrElseUpdate(resource, mutable.Set()).add(acl) - } - result.mapValues(_.toSet).toMap - } - - // To retain the same exceptions as in previous versions, throw the underlying exception when the exception - // was wrapped by AclAuthorizer in an ApiException - private def throwException(e: ApiException): Unit = { - if (e.getCause != null) - throw e.getCause - else - throw e - } -} diff --git a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala index 46c33ca491d18..cac6480c73c38 100644 --- a/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{CompletableFuture, CompletionStage} import com.typesafe.scalalogging.Logger import kafka.api.KAFKA_2_0_IV1 -import kafka.security.authorizer.AclAuthorizer.{AclSeqs, ResourceOrdering, VersionedAcls} import kafka.security.authorizer.AclEntry.ResourceSeparator import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils._ @@ -118,9 +117,16 @@ object AclAuthorizer { zkClientConfig } } + + private def validateAclBinding(aclBinding: AclBinding): Unit = { + if (aclBinding.isUnknown) + throw new IllegalArgumentException("ACL binding contains unknown elements") + } } class AclAuthorizer extends Authorizer with Logging { + import kafka.security.authorizer.AclAuthorizer._ + private[security] val authorizerLogger = Logger("kafka.authorizer.logger") private var superUsers = Set.empty[KafkaPrincipal] private var shouldAllowEveryoneIfNoAclIsFound = false @@ -200,7 +206,7 @@ class AclAuthorizer extends Authorizer with Logging { throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " + s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater") } - AuthorizerUtils.validateAclBinding(aclBinding) + validateAclBinding(aclBinding) true } catch { case e: Throwable => @@ -225,7 +231,7 @@ class AclAuthorizer extends Authorizer with Logging { } } } - results.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava + results.toBuffer.map(CompletableFuture.completedFuture[AclCreateResult]).asJava } /** diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala index 0d670befbb7ff..0e417d677eb2e 100644 --- a/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala +++ b/core/src/main/scala/kafka/security/authorizer/AuthorizerUtils.scala @@ -20,32 +20,15 @@ package kafka.security.authorizer import java.net.InetAddress import kafka.network.RequestChannel.Session -import kafka.security.auth.{Authorizer => LegacyAuthorizer} -import org.apache.kafka.common.acl._ -import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.resource.Resource import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer} -import scala.annotation.nowarn - object AuthorizerUtils { - @nowarn("cat=deprecation") - def createAuthorizer(className: String): Authorizer = { - Utils.newInstance(className, classOf[Object]) match { - case auth: Authorizer => auth - case auth: kafka.security.auth.Authorizer => new AuthorizerWrapper(auth) - case _ => throw new ConfigException(s"Authorizer does not implement ${classOf[Authorizer].getName} or ${classOf[LegacyAuthorizer].getName}.") - } - } - - def validateAclBinding(aclBinding: AclBinding): Unit = { - if (aclBinding.isUnknown) - throw new IllegalArgumentException("ACL binding contains unknown elements") - } + def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer]) def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME) diff --git a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala b/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala deleted file mode 100644 index cc25fced4529e..0000000000000 --- a/core/src/main/scala/kafka/security/authorizer/AuthorizerWrapper.scala +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.security.authorizer - -import java.util.concurrent.{CompletableFuture, CompletionStage} -import java.{lang, util} - -import kafka.network.RequestChannel.Session -import kafka.security.auth.{Acl, Operation, PermissionType, Resource, SimpleAclAuthorizer, ResourceType => ResourceTypeLegacy} -import kafka.security.authorizer.AuthorizerWrapper._ -import org.apache.kafka.common.Endpoint -import org.apache.kafka.common.acl._ -import org.apache.kafka.common.errors.{ApiException, InvalidRequestException} -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.ApiError -import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType} -import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.SecurityUtils -import org.apache.kafka.common.utils.SecurityUtils.parseKafkaPrincipal -import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult -import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizerServerInfo, _} - -import scala.collection.mutable.ArrayBuffer -import scala.collection.{Seq, immutable, mutable} -import scala.jdk.CollectionConverters._ -import scala.util.{Failure, Success, Try} - -@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") -object AuthorizerWrapper { - - def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = { - (for { - resourceType <- Try(ResourceTypeLegacy.fromJava(filter.patternFilter.resourceType)) - principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal)) - operation <- Try(Operation.fromJava(filter.entryFilter.operation)) - permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType)) - resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.patternType) - acl = Acl(principal, permissionType, filter.entryFilter.host, operation) - } yield (resource, acl)) match { - case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage)) - case Success(s) => Right(s) - } - } - - def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = { - val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType) - new AclBinding(resourcePattern, convertToAccessControlEntry(acl)) - } - - def convertToAccessControlEntry(acl: Acl): AccessControlEntry = { - new AccessControlEntry(acl.principal.toString, acl.host.toString, - acl.operation.toJava, acl.permissionType.toJava) - } - - def convertToAcl(ace: AccessControlEntry): Acl = { - new Acl(parseKafkaPrincipal(ace.principal), PermissionType.fromJava(ace.permissionType), ace.host, - Operation.fromJava(ace.operation)) - } - - def convertToResource(resourcePattern: ResourcePattern): Resource = { - Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType) - } -} - -@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") -class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer { - - var shouldAllowEveryoneIfNoAclIsFound = false - - override def configure(configs: util.Map[String, _]): Unit = { - baseAuthorizer.configure(configs) - shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get( - AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) - && baseAuthorizer.isInstanceOf[SimpleAclAuthorizer]) - } - - override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = { - serverInfo.endpoints.asScala.map { endpoint => - endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava - } - - override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = { - val session = Session(requestContext.principal, requestContext.clientAddress) - actions.asScala.map { action => - val operation = Operation.fromJava(action.operation) - if (baseAuthorizer.authorize(session, operation, convertToResource(action.resourcePattern))) - AuthorizationResult.ALLOWED - else - AuthorizationResult.DENIED - }.asJava - } - - override def createAcls(requestContext: AuthorizableRequestContext, - aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = { - aclBindings.asScala - .map { aclBinding => - convertToResourceAndAcl(aclBinding.toFilter) match { - case Left(apiError) => new AclCreateResult(apiError.exception) - case Right((resource, acl)) => - try { - baseAuthorizer.addAcls(Set(acl), resource) - AclCreateResult.SUCCESS - } catch { - case e: ApiException => new AclCreateResult(e) - case e: Throwable => new AclCreateResult(new InvalidRequestException("Failed to create ACL", e)) - } - } - }.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava - } - - override def deleteAcls(requestContext: AuthorizableRequestContext, - aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = { - val filters = aclBindingFilters.asScala - val results = mutable.Map[Int, AclDeleteResult]() - val toDelete = mutable.Map[Int, ArrayBuffer[(Resource, Acl)]]() - - if (filters.forall(_.matchesAtMostOne)) { - // Delete based on a list of ACL fixtures. - for ((filter, i) <- filters.zipWithIndex) { - convertToResourceAndAcl(filter) match { - case Left(apiError) => results.put(i, new AclDeleteResult(apiError.exception)) - case Right(binding) => toDelete.put(i, ArrayBuffer(binding)) - } - } - } else { - // Delete based on filters that may match more than one ACL. - val aclMap = baseAuthorizer.getAcls() - val filtersWithIndex = filters.zipWithIndex - for ((resource, acls) <- aclMap; acl <- acls) { - val binding = new AclBinding( - new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType), - new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, - acl.permissionType.toJava)) - - for ((filter, i) <- filtersWithIndex if filter.matches(binding)) - toDelete.getOrElseUpdate(i, ArrayBuffer.empty) += ((resource, acl)) - } - } - - for ((i, acls) <- toDelete) { - val deletionResults = acls.flatMap { case (resource, acl) => - val aclBinding = convertToAclBinding(resource, acl) - try { - if (baseAuthorizer.removeAcls(immutable.Set(acl), resource)) - Some(new AclBindingDeleteResult(aclBinding)) - else None - } catch { - case throwable: Throwable => - Some(new AclBindingDeleteResult(aclBinding, ApiError.fromThrowable(throwable).exception)) - } - }.asJava - - results.put(i, new AclDeleteResult(deletionResults)) - } - - filters.indices.map { i => - results.getOrElse(i, new AclDeleteResult(Seq.empty[AclBindingDeleteResult].asJava)) - }.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava - } - - override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = { - baseAuthorizer.getAcls().flatMap { case (resource, acls) => - acls.map(acl => convertToAclBinding(resource, acl)).filter(filter.matches) - }.asJava - } - - override def close(): Unit = { - baseAuthorizer.close() - } - - override def authorizeByResourceType(requestContext: AuthorizableRequestContext, - op: AclOperation, - resourceType: ResourceType): AuthorizationResult = { - SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType) - - if (super.authorizeByResourceType(requestContext, op, resourceType) == AuthorizationResult.ALLOWED) - AuthorizationResult.ALLOWED - else if (denyAllResource(requestContext, op, resourceType) || !shouldAllowEveryoneIfNoAclIsFound) - AuthorizationResult.DENIED - else - AuthorizationResult.ALLOWED - } - - private def denyAllResource(requestContext: AuthorizableRequestContext, - op: AclOperation, - resourceType: ResourceType): Boolean = { - val resourceTypeFilter = new ResourcePatternFilter( - resourceType, Resource.WildCardResource, PatternType.LITERAL) - val principal = new KafkaPrincipal( - requestContext.principal.getPrincipalType, requestContext.principal.getName).toString - val host = requestContext.clientAddress().getHostAddress - val entryFilter = new AccessControlEntryFilter(null, null, op, AclPermissionType.DENY) - val entryFilterAllOp = new AccessControlEntryFilter(null, null, AclOperation.ALL, AclPermissionType.DENY) - val aclFilter = new AclBindingFilter(resourceTypeFilter, entryFilter) - val aclFilterAllOp = new AclBindingFilter(resourceTypeFilter, entryFilterAllOp) - - (acls(aclFilter).asScala.exists(b => principalHostMatch(b.entry(), principal, host)) - || acls(aclFilterAllOp).asScala.exists(b => principalHostMatch(b.entry(), principal, host))) - } - - private def principalHostMatch(ace: AccessControlEntry, - principal: String, - host: String): Boolean = { - ((ace.host() == AclEntry.WildcardHost || ace.host() == host) - && (ace.principal() == AclEntry.WildcardPrincipalString || ace.principal() == principal)) - } - -} diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 2e4dc482092b5..e8a3e6cf4e498 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -679,8 +679,7 @@ object KafkaConfig { /************* Authorizer Configuration ***********/ val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" + - " interface, which is used by the broker for authorization. This config also supports authorizers that implement the deprecated" + - " kafka.security.auth.Authorizer trait which was previously used for authorization." + " interface, which is used by the broker for authorization." /** ********* Socket Server Configuration ***********/ val PortDoc = "DEPRECATED: only used when
listeners
is not set. " +
"Use listeners
instead. \n" +
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index d82c8e585f531..f0d9188bd2c66 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -18,10 +18,9 @@ import java.util
import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties}
-
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
import kafka.log.LogConfig
-import kafka.security.authorizer.AclEntry
+import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
@@ -144,7 +143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
+ properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
index 425d8f3f8e626..2b3ae1c1da696 100644
--- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala
@@ -15,7 +15,7 @@ package kafka.api
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.api.GroupAuthorizerIntegrationTest._
-import kafka.security.auth.SimpleAclAuthorizer
+import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
@@ -63,7 +63,7 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
def clientPrincipal: KafkaPrincipal = ClientPrincipal
override def brokerPropertyOverrides(properties: Properties): Unit = {
- properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
+ properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
diff --git a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
index 75df9e7079d29..17e39f608a778 100644
--- a/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslGssapiSslEndToEndAuthorizationTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.api
-import kafka.security.auth.SimpleAclAuthorizer
+import kafka.security.authorizer.AclAuthorizer
import kafka.server.KafkaConfig
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.config.SslConfigs
@@ -25,8 +25,6 @@ import org.junit.jupiter.api.Assertions.assertNull
import scala.collection.immutable.List
-// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
-// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KafkaClientPrincipalUnqualifiedName)
@@ -35,7 +33,7 @@ class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTe
override protected def kafkaClientSaslMechanism = "GSSAPI"
override protected def kafkaServerSaslMechanisms = List("GSSAPI")
- override protected def authorizerClass = classOf[SimpleAclAuthorizer]
+ override protected def authorizerClass = classOf[AclAuthorizer]
// Configure brokers to require SSL client authentication in order to verify that SASL_SSL works correctly even if the
// client doesn't have a keystore. We want to cover the scenario where a broker requires either SSL client
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
index 1f15563b012ea..53a267f19657e 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
@@ -15,40 +15,38 @@ package kafka.api
import java.io.File
import java.util
import kafka.log.LogConfig
+import kafka.security.authorizer.AclAuthorizer
+import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import kafka.utils.TestUtils._
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
+import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException}
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.server.authorizer.Authorizer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import scala.annotation.nowarn
+import java.util.Collections
import scala.jdk.CollectionConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionException
import scala.util.{Failure, Success, Try}
-abstract class AuthorizationAdmin {
- def authorizerClassName: String
- def initializeAcls(): Unit
- def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit
- def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit
-}
-
-// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
-// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
- @nowarn("cat=deprecation")
- val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin
+ val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+
+ val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], classOf[AclAuthorizer])
+
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
@@ -477,58 +475,53 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
client.describeAcls(allTopicAcls).values.get().asScala.toSet
}
- @deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
- class LegacyAuthorizationAdmin extends AuthorizationAdmin {
- import kafka.security.auth._
- import kafka.security.authorizer.AuthorizerWrapper
+ class AclAuthorizationAdmin(authorizerClass: Class[_ <: AclAuthorizer], authorizerForInitClass: Class[_ <: AclAuthorizer]) {
- override def authorizerClassName: String = classOf[SimpleAclAuthorizer].getName
+ def authorizerClassName: String = authorizerClass.getName
- override def initializeAcls(): Unit = {
- val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
+ def initializeAcls(): Unit = {
+ val authorizer = CoreUtils.createObject[Authorizer](authorizerForInitClass.getName)
try {
authorizer.configure(configs.head.originals())
- authorizer.addAcls(Set(new Acl(Acl.WildCardPrincipal, Allow,
- Acl.WildCardHost, All)), new Resource(Topic, "*", PatternType.LITERAL))
- authorizer.addAcls(Set(new Acl(Acl.WildCardPrincipal, Allow,
- Acl.WildCardHost, All)), new Resource(Group, "*", PatternType.LITERAL))
+ val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, ALL, ALLOW)
+ authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(TOPIC, "*", LITERAL), ace)).asJava)
+ authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(GROUP, "*", LITERAL), ace)).asJava)
- authorizer.addAcls(Set(clusterAcl(ALLOW, CREATE),
+ authorizer.createAcls(null, List(clusterAcl(ALLOW, CREATE),
clusterAcl(ALLOW, DELETE),
clusterAcl(ALLOW, CLUSTER_ACTION),
clusterAcl(ALLOW, ALTER_CONFIGS),
- clusterAcl(ALLOW, ALTER)),
- Resource.ClusterResource)
+ clusterAcl(ALLOW, ALTER))
+ .map(ace => new AclBinding(clusterResourcePattern, ace)).asJava)
} finally {
authorizer.close()
}
}
- override def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
- val acls = Set(clusterAcl(permissionType, operation))
- val authorizer = simpleAclAuthorizer
- val prevAcls = authorizer.getAcls(Resource.ClusterResource)
- authorizer.addAcls(acls, Resource.ClusterResource)
- TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, Resource.ClusterResource)
+ def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
+ val ace = clusterAcl(permissionType, operation)
+ val aclBinding = new AclBinding(clusterResourcePattern, ace)
+ val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
+ val prevAcls = authorizer.acls(new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY))
+ .asScala.map(_.entry).toSet
+ authorizer.createAcls(null, Collections.singletonList(aclBinding))
+ TestUtils.waitAndVerifyAcls(prevAcls ++ Set(ace), authorizer, clusterResourcePattern)
}
- override def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
- val acls = Set(clusterAcl(permissionType, operation))
- val authorizer = simpleAclAuthorizer
- val prevAcls = authorizer.getAcls(Resource.ClusterResource)
- assertTrue(authorizer.removeAcls(acls, Resource.ClusterResource))
- TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, Resource.ClusterResource)
- }
-
-
- private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): Acl = {
- new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"), PermissionType.fromJava(permissionType),
- Acl.WildCardHost, Operation.fromJava(operation))
+ def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
+ val ace = clusterAcl(permissionType, operation)
+ val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
+ val clusterFilter = new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY)
+ val prevAcls = authorizer.acls(clusterFilter).asScala.map(_.entry).toSet
+ val deleteFilter = new AclBindingFilter(clusterResourcePattern.toFilter, ace.toFilter)
+ assertFalse(authorizer.deleteAcls(null, Collections.singletonList(deleteFilter))
+ .get(0).toCompletableFuture.get.aclBindingDeleteResults().asScala.head.exception.isPresent)
+ TestUtils.waitAndVerifyAcls(prevAcls -- Set(ace), authorizer, clusterResourcePattern)
}
- private def simpleAclAuthorizer: Authorizer = {
- val authorizerWrapper = servers.head.dataPlaneRequestProcessor.authorizer.get.asInstanceOf[AuthorizerWrapper]
- authorizerWrapper.baseAuthorizer
+ private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = {
+ new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*").toString,
+ WildcardHost, operation, permissionType)
}
}
}
diff --git a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 88b649f225f97..b9180815c6f05 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -14,22 +14,16 @@ package kafka.api
import java.io.File
import java.util
-import java.util.Collections
import java.util.concurrent._
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaYammerMetrics
import kafka.security.authorizer.AclAuthorizer
-import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.KafkaConfig
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateAclsResult}
import org.apache.kafka.common.acl._
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType._
import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
-import org.apache.kafka.common.resource.PatternType._
-import org.apache.kafka.common.resource.ResourceType._
+import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer._
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
@@ -84,8 +78,7 @@ object SslAdminIntegrationTest {
}
class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
- override val authorizationAdmin = new AclAuthorizationAdmin
- val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+ override val authorizationAdmin = new AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], classOf[AclAuthorizer])
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
@@ -266,54 +259,4 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")
metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
}
-
- class AclAuthorizationAdmin extends AuthorizationAdmin {
-
- override def authorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
-
- override def initializeAcls(): Unit = {
- val authorizer = CoreUtils.createObject[Authorizer](classOf[AclAuthorizer].getName)
- try {
- authorizer.configure(configs.head.originals())
- val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, ALL, ALLOW)
- authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(TOPIC, "*", LITERAL), ace)).asJava)
- authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(GROUP, "*", LITERAL), ace)).asJava)
-
- authorizer.createAcls(null, List(clusterAcl(ALLOW, CREATE),
- clusterAcl(ALLOW, DELETE),
- clusterAcl(ALLOW, CLUSTER_ACTION),
- clusterAcl(ALLOW, ALTER_CONFIGS),
- clusterAcl(ALLOW, ALTER))
- .map(ace => new AclBinding(clusterResourcePattern, ace)).asJava)
- } finally {
- authorizer.close()
- }
- }
-
- override def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
- val ace = clusterAcl(permissionType, operation)
- val aclBinding = new AclBinding(clusterResourcePattern, ace)
- val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
- val prevAcls = authorizer.acls(new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY))
- .asScala.map(_.entry).toSet
- authorizer.createAcls(null, Collections.singletonList(aclBinding))
- TestUtils.waitAndVerifyAcls(prevAcls ++ Set(ace), authorizer, clusterResourcePattern)
- }
-
- override def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
- val ace = clusterAcl(permissionType, operation)
- val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
- val clusterFilter = new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY)
- val prevAcls = authorizer.acls(clusterFilter).asScala.map(_.entry).toSet
- val deleteFilter = new AclBindingFilter(clusterResourcePattern.toFilter, ace.toFilter)
- assertFalse(authorizer.deleteAcls(null, Collections.singletonList(deleteFilter))
- .get(0).toCompletableFuture.get.aclBindingDeleteResults().asScala.head.exception.isPresent)
- TestUtils.waitAndVerifyAcls(prevAcls -- Set(ace), authorizer, clusterResourcePattern)
- }
-
- private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = {
- new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*").toString,
- WildcardHost, operation, permissionType)
- }
- }
}
diff --git a/core/src/test/scala/kafka/security/auth/ResourceTest.scala b/core/src/test/scala/kafka/security/auth/ResourceTest.scala
deleted file mode 100644
index 9c905ecb00870..0000000000000
--- a/core/src/test/scala/kafka/security/auth/ResourceTest.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Assertions._
-
-@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
-class ResourceTest {
- @Test
- def shouldThrowOnTwoPartStringWithUnknownResourceType(): Unit = {
- assertThrows(classOf[KafkaException], () => Resource.fromString("Unknown:fred"))
- }
-
- @Test
- def shouldThrowOnBadResourceTypeSeparator(): Unit = {
- assertThrows(classOf[KafkaException], () => Resource.fromString("Topic-fred"))
- }
-
- @Test
- def shouldParseOldTwoPartString(): Unit = {
- assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred"))
- assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:t"))
- }
-
- @Test
- def shouldParseOldTwoPartWithEmbeddedSeparators(): Unit = {
- assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group::This:is:a:weird:group:name:"))
- }
-
- @Test
- def shouldParseThreePartString(): Unit = {
- assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("Group:PREFIXED:fred"))
- assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:LITERAL:t"))
- }
-
- @Test
- def shouldParseThreePartWithEmbeddedSeparators(): Unit = {
- assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("Group:PREFIXED::This:is:a:weird:group:name:"))
- assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group:LITERAL::This:is:a:weird:group:name:"))
- }
-
- @Test
- def shouldRoundTripViaString(): Unit = {
- val expected = Resource(Group, "fred", PREFIXED)
-
- val actual = Resource.fromString(expected.toString)
-
- assertEquals(expected, actual)
- }
-}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala b/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
deleted file mode 100644
index 5c58a08e9693a..0000000000000
--- a/core/src/test/scala/unit/kafka/security/auth/OperationTest.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.auth
-
-import org.apache.kafka.common.acl.AclOperation
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-
-@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
-class OperationTest {
- /**
- * Test round trip conversions between org.apache.kafka.common.acl.AclOperation and
- * kafka.security.auth.Operation.
- */
- @Test
- def testJavaConversions(): Unit = {
- AclOperation.values.foreach {
- case AclOperation.UNKNOWN | AclOperation.ANY =>
- case aclOp =>
- val op = Operation.fromJava(aclOp)
- val aclOp2 = op.toJava
- assertEquals(aclOp, aclOp2)
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
deleted file mode 100644
index 1471202ee5f27..0000000000000
--- a/core/src/test/scala/unit/kafka/security/auth/PermissionTypeTest.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import org.apache.kafka.common.acl.AclPermissionType
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
-class PermissionTypeTest {
-
- @Test
- def testFromString(): Unit = {
- val permissionType = PermissionType.fromString("Allow")
- assertEquals(Allow, permissionType)
-
- assertThrows(classOf[KafkaException], () => PermissionType.fromString("badName"))
- }
-
- /**
- * Test round trip conversions between org.apache.kafka.common.acl.AclPermissionType and
- * kafka.security.auth.PermissionType.
- */
- @Test
- def testJavaConversions(): Unit = {
- AclPermissionType.values().foreach {
- case AclPermissionType.UNKNOWN | AclPermissionType.ANY =>
- case aclPerm =>
- val perm = PermissionType.fromJava(aclPerm)
- val aclPerm2 = perm.toJava
- assertEquals(aclPerm, aclPerm2)
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala b/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
deleted file mode 100644
index 02c35c783d496..0000000000000
--- a/core/src/test/scala/unit/kafka/security/auth/ResourceTypeTest.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import kafka.common.KafkaException
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-import org.apache.kafka.common.resource.{ResourceType => JResourceType}
-
-@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
-class ResourceTypeTest {
-
- @Test
- def testFromString(): Unit = {
- val resourceType = ResourceType.fromString("Topic")
- assertEquals(Topic, resourceType)
- assertThrows(classOf[KafkaException], () => ResourceType.fromString("badName"))
- }
-
- /**
- * Test round trip conversions between org.apache.kafka.common.acl.ResourceType and
- * kafka.security.auth.ResourceType.
- */
- @Test
- def testJavaConversions(): Unit = {
- JResourceType.values.foreach {
- case JResourceType.UNKNOWN | JResourceType.ANY =>
- case jResourceType =>
- val resourceType = ResourceType.fromJava(jResourceType)
- val jResourceType2 = resourceType.toJava
- assertEquals(jResourceType, jResourceType2)
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
deleted file mode 100644
index 6e2a0bd41e999..0000000000000
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ /dev/null
@@ -1,731 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.auth
-
-import java.net.InetAddress
-import java.nio.charset.StandardCharsets.UTF_8
-import java.util.UUID
-
-import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
-import kafka.network.RequestChannel.Session
-import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
-import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
-import org.apache.kafka.common.errors.UnsupportedVersionException
-import org.apache.kafka.common.resource.PatternType
-import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Time
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-
-@deprecated("Use AclAuthorizer", "Since 2.4")
-class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
-
- private val allowReadAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Read)
- private val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
- private val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
-
- private val wildCardResource = Resource(Topic, WildCardResource, LITERAL)
- private val prefixedResource = Resource(Topic, "foo", PREFIXED)
-
- private val simpleAclAuthorizer = new SimpleAclAuthorizer
- private val simpleAclAuthorizer2 = new SimpleAclAuthorizer
- private var resource: Resource = _
- private val superUsers = "User:superuser1; User:superuser2"
- private val username = "alice"
- private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- private val session = Session(principal, InetAddress.getByName("192.168.0.1"))
- private var config: KafkaConfig = _
- private var zooKeeperClient: ZooKeeperClient = _
-
- class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) {
- override def equals(o: scala.Any): Boolean = false
- }
-
- @BeforeEach
- override def setUp(): Unit = {
- super.setUp()
-
- // Increase maxUpdateRetries to avoid transient failures
- simpleAclAuthorizer.maxUpdateRetries = Int.MaxValue
- simpleAclAuthorizer2.maxUpdateRetries = Int.MaxValue
-
- val props = TestUtils.createBrokerConfig(0, zkConnect)
- props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
-
- config = KafkaConfig.fromProps(props)
- simpleAclAuthorizer.configure(config.originals)
- simpleAclAuthorizer2.configure(config.originals)
- resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL)
-
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
- Time.SYSTEM, "kafka.test", "SimpleAclAuthorizerTest")
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- simpleAclAuthorizer.close()
- simpleAclAuthorizer2.close()
- zooKeeperClient.close()
- super.tearDown()
- }
-
- @Test
- def testAuthorizeThrowsOnNonLiteralResource(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () => simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", PREFIXED)))
- }
-
- @Test
- def testAuthorizeWithEmptyResourceName(): Unit = {
- assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Group, "", LITERAL)))
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, WildCardResource, LITERAL))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Group, "", LITERAL)))
- }
-
- // Authorizing the empty resource is not supported because we create a znode with the resource name.
- @Test
- def testEmptyAclThrowsException(): Unit = {
- assertThrows(classOf[IllegalArgumentException], () => simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, "", LITERAL)))
- }
-
- @Test
- def testTopicAcl(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
- val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
- val host1 = InetAddress.getByName("192.168.1.1")
- val host2 = InetAddress.getByName("192.168.1.2")
-
- //user1 has READ access from host1 and host2.
- val acl1 = new Acl(user1, Allow, host1.getHostAddress, Read)
- val acl2 = new Acl(user1, Allow, host2.getHostAddress, Read)
-
- //user1 does not have READ access from host1.
- val acl3 = new Acl(user1, Deny, host1.getHostAddress, Read)
-
- //user1 has Write access from host1 only.
- val acl4 = new Acl(user1, Allow, host1.getHostAddress, Write)
-
- //user1 has DESCRIBE access from all hosts.
- val acl5 = new Acl(user1, Allow, WildCardHost, Describe)
-
- //user2 has READ access from all hosts.
- val acl6 = new Acl(user2, Allow, WildCardHost, Read)
-
- //user3 has WRITE access from all hosts.
- val acl7 = new Acl(user3, Allow, WildCardHost, Write)
-
- val acls = Set[Acl](acl1, acl2, acl3, acl4, acl5, acl6, acl7)
-
- changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
-
- val host1Session = Session(user1, host1)
- val host2Session = Session(user1, host2)
-
- assertTrue(simpleAclAuthorizer.authorize(host2Session, Read, resource), "User1 should have READ access from host2")
- assertFalse(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should not have READ access from host1 due to denyAcl")
- assertTrue(simpleAclAuthorizer.authorize(host1Session, Write, resource), "User1 should have WRITE access from host1")
- assertFalse(simpleAclAuthorizer.authorize(host2Session, Write, resource), "User1 should not have WRITE access from host2 as no allow acl is defined")
- assertTrue(simpleAclAuthorizer.authorize(host1Session, Describe, resource), "User1 should not have DESCRIBE access from host1")
- assertTrue(simpleAclAuthorizer.authorize(host2Session, Describe, resource), "User1 should have DESCRIBE access from host2")
- assertFalse(simpleAclAuthorizer.authorize(host1Session, Alter, resource), "User1 should not have edit access from host1")
- assertFalse(simpleAclAuthorizer.authorize(host2Session, Alter, resource), "User1 should not have edit access from host2")
-
- //test if user has READ and write access they also get describe access
- val user2Session = Session(user2, host1)
- val user3Session = Session(user3, host1)
- assertTrue(simpleAclAuthorizer.authorize(user2Session, Describe, resource), "User2 should have DESCRIBE access from host1")
- assertTrue(simpleAclAuthorizer.authorize(user3Session, Describe, resource), "User3 should have DESCRIBE access from host2")
- assertTrue(simpleAclAuthorizer.authorize(user2Session, Read, resource), "User2 should have READ access from host1")
- assertTrue(simpleAclAuthorizer.authorize(user3Session, Write, resource), "User3 should have WRITE access from host2")
- }
-
- /**
- CustomPrincipals should be compared with their principal type and name
- */
- @Test
- def testAllowAccessWithCustomPrincipal(): Unit = {
- val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host1 = InetAddress.getByName("192.168.1.1")
- val host2 = InetAddress.getByName("192.168.1.2")
-
- // user has READ access from host2 but not from host1
- val acl1 = new Acl(user, Deny, host1.getHostAddress, Read)
- val acl2 = new Acl(user, Allow, host2.getHostAddress, Read)
- val acls = Set[Acl](acl1, acl2)
- changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
-
- val host1Session = Session(customUserPrincipal, host1)
- val host2Session = Session(customUserPrincipal, host2)
-
- assertTrue(simpleAclAuthorizer.authorize(host2Session, Read, resource), "User1 should have READ access from host2")
- assertFalse(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should not have READ access from host1 due to denyAcl")
- }
-
- @Test
- def testDenyTakesPrecedence(): Unit = {
- val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host = InetAddress.getByName("192.168.2.1")
- val session = Session(user, host)
-
- val allowAll = Acl.AllowAllAcl
- val denyAcl = new Acl(user, Deny, host.getHostAddress, All)
- val acls = Set[Acl](allowAll, denyAcl)
-
- changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
-
- assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "deny should take precedence over allow.")
- }
-
- @Test
- def testAllowAllAccess(): Unit = {
- val allowAllAcl = Acl.AllowAllAcl
-
- changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl])
-
- val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4"))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource), "allow all acl should allow access to all.")
- }
-
- @Test
- def testSuperUserHasAccess(): Unit = {
- val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
-
- changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
-
- val session1 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
- val session2 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4"))
-
- assertTrue(simpleAclAuthorizer.authorize(session1, Read, resource), "superuser always has access, no matter what acls.")
- assertTrue(simpleAclAuthorizer.authorize(session2, Read, resource), "superuser always has access, no matter what acls.")
- }
-
- /**
- CustomPrincipals should be compared with their principal type and name
- */
- @Test
- def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
- val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
- changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
-
- val session = Session(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
-
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource), "superuser with custom principal always has access, no matter what acls.")
- }
-
- @Test
- def testWildCardAcls(): Unit = {
- assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "when acls = [], authorizer should fail close.")
-
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host1 = InetAddress.getByName("192.168.3.1")
- val readAcl = new Acl(user1, Allow, host1.getHostAddress, Read)
-
- val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource)
-
- val host1Session = Session(user1, host1)
- assertTrue(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should have Read access from host1")
-
- //allow Write to specific topic.
- val writeAcl = new Acl(user1, Allow, host1.getHostAddress, Write)
- changeAclAndVerify(Set.empty[Acl], Set[Acl](writeAcl), Set.empty[Acl])
-
- //deny Write to wild card topic.
- val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1.getHostAddress, Write)
- changeAclAndVerify(acls, Set[Acl](denyWriteOnWildCardResourceAcl), Set.empty[Acl], wildCardResource)
-
- assertFalse(simpleAclAuthorizer.authorize(host1Session, Write, resource), "User1 should not have Write access from host1")
- }
-
- @Test
- def testNoAclFound(): Unit = {
- assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "when acls = [], authorizer should fail close.")
- }
-
- @Test
- def testNoAclFoundOverride(): Unit = {
- val props = TestUtils.createBrokerConfig(1, zkConnect)
- props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
-
- val cfg = KafkaConfig.fromProps(props)
- val testAuthorizer = new SimpleAclAuthorizer
- try {
- testAuthorizer.configure(cfg.originals)
- assertTrue(testAuthorizer.authorize(session, Read, resource), "when acls = null or [], authorizer should fail open with allow.everyone = true.")
- } finally {
- testAuthorizer.close()
- }
- }
-
- @Test
- def testAclManagementAPIs(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val host1 = "host1"
- val host2 = "host2"
-
- val acl1 = new Acl(user1, Allow, host1, Read)
- val acl2 = new Acl(user1, Allow, host1, Write)
- val acl3 = new Acl(user2, Allow, host2, Read)
- val acl4 = new Acl(user2, Allow, host2, Write)
-
- var acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](acl1, acl2, acl3, acl4), Set.empty[Acl])
-
- //test addAcl is additive
- val acl5 = new Acl(user2, Allow, WildCardHost, Read)
- acls = changeAclAndVerify(acls, Set[Acl](acl5), Set.empty[Acl])
-
- //test get by principal name.
- TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == simpleAclAuthorizer.getAcls(user1), "changes not propagated in timeout period")
- TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propagated in timeout period")
-
- val resourceToAcls = Map[Resource, Set[Acl]](
- new Resource(Topic, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
- new Resource(Cluster, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
- new Resource(Group, Resource.WildCardResource, LITERAL) -> acls,
- new Resource(Group, "test-ConsumerGroup", LITERAL) -> acls
- )
-
- resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
- TestUtils.waitUntilTrue(() => resourceToAcls + (resource -> acls) == simpleAclAuthorizer.getAcls(), "changes not propagated in timeout period.")
-
- //test remove acl from existing acls.
- acls = changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
-
- //test remove all acls for resource
- simpleAclAuthorizer.removeAcls(resource)
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
- assertFalse(zkClient.resourceExists(resource.toPattern))
-
- //test removing last acl also deletes ZooKeeper path
- acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
- changeAclAndVerify(acls, Set.empty[Acl], acls)
- assertFalse(zkClient.resourceExists(resource.toPattern))
- }
-
- @Test
- def testLoadCache(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val acl1 = new Acl(user1, Allow, "host-1", Read)
- val acls = Set[Acl](acl1)
- simpleAclAuthorizer.addAcls(acls, resource)
-
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val resource1 = Resource(Topic, "test-2", LITERAL)
- val acl2 = new Acl(user2, Deny, "host3", Read)
- val acls1 = Set[Acl](acl2)
- simpleAclAuthorizer.addAcls(acls1, resource1)
-
- zkClient.deleteAclChangeNotifications()
- val authorizer = new SimpleAclAuthorizer
- try {
- authorizer.configure(config.originals)
-
- assertEquals(acls, authorizer.getAcls(resource))
- assertEquals(acls1, authorizer.getAcls(resource1))
- } finally {
- authorizer.close()
- }
- }
-
- @Test
- def testLocalConcurrentModificationOfResourceAcls(): Unit = {
- val commonResource = Resource(Topic, "test", LITERAL)
-
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val acl1 = new Acl(user1, Allow, WildCardHost, Read)
-
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val acl2 = new Acl(user2, Deny, WildCardHost, Read)
-
- simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
- simpleAclAuthorizer.addAcls(Set(acl2), commonResource)
-
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
- }
-
- @Test
- def testDistributedConcurrentModificationOfResourceAcls(): Unit = {
- val commonResource = Resource(Topic, "test", LITERAL)
-
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val acl1 = new Acl(user1, Allow, WildCardHost, Read)
-
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val acl2 = new Acl(user2, Deny, WildCardHost, Read)
-
- // Add on each instance
- simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
- simpleAclAuthorizer2.addAcls(Set(acl2), commonResource)
-
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
-
- val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "joe")
- val acl3 = new Acl(user3, Deny, WildCardHost, Read)
-
- // Add on one instance and delete on another
- simpleAclAuthorizer.addAcls(Set(acl3), commonResource)
- val deleted = simpleAclAuthorizer2.removeAcls(Set(acl3), commonResource)
-
- assertTrue(deleted, "The authorizer should see a value that needs to be deleted")
-
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
- TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
- }
-
- @Test
- def testHighConcurrencyModificationOfResourceAcls(): Unit = {
- val commonResource = Resource(Topic, "test", LITERAL)
-
- val acls = (0 to 50).map { i =>
- val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
- new Acl(useri, Allow, WildCardHost, Read)
- }
-
- // Alternate authorizer, Remove all acls that end in 0
- val concurrentFuctions = acls.map { acl =>
- () => {
- val aclId = acl.principal.getName.toInt
- if (aclId % 2 == 0) {
- simpleAclAuthorizer.addAcls(Set(acl), commonResource)
- } else {
- simpleAclAuthorizer2.addAcls(Set(acl), commonResource)
- }
- if (aclId % 10 == 0) {
- simpleAclAuthorizer2.removeAcls(Set(acl), commonResource)
- }
- }
- }
-
- val expectedAcls = acls.filter { acl =>
- val aclId = acl.principal.getName.toInt
- aclId % 10 != 0
- }.toSet
-
- TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
-
- TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer, commonResource)
- TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer2, commonResource)
- }
-
- /**
- * Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation}
- */
- @Test
- def testAclInheritance(): Unit = {
- testImplicationsOfAllow(All, Set(Read, Write, Create, Delete, Alter, Describe,
- ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
- testImplicationsOfDeny(All, Set(Read, Write, Create, Delete, Alter, Describe,
- ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
- testImplicationsOfAllow(Read, Set(Describe))
- testImplicationsOfAllow(Write, Set(Describe))
- testImplicationsOfAllow(Delete, Set(Describe))
- testImplicationsOfAllow(Alter, Set(Describe))
- testImplicationsOfDeny(Describe, Set())
- testImplicationsOfAllow(AlterConfigs, Set(DescribeConfigs))
- testImplicationsOfDeny(DescribeConfigs, Set())
- }
-
- private def testImplicationsOfAllow(parentOp: Operation, allowedOps: Set[Operation]): Unit = {
- val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host = InetAddress.getByName("192.168.3.1")
- val hostSession = Session(user, host)
- val acl = Acl(user, Allow, WildCardHost, parentOp)
- simpleAclAuthorizer.addAcls(Set(acl), Resource.ClusterResource)
- Operation.values.foreach { op =>
- val authorized = simpleAclAuthorizer.authorize(hostSession, op, Resource.ClusterResource)
- if (allowedOps.contains(op) || op == parentOp)
- assertTrue(authorized, s"ALLOW $parentOp should imply ALLOW $op")
- else
- assertFalse(authorized, s"ALLOW $parentOp should not imply ALLOW $op")
- }
- simpleAclAuthorizer.removeAcls(Set(acl), Resource.ClusterResource)
- }
-
- private def testImplicationsOfDeny(parentOp: Operation, deniedOps: Set[Operation]): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val host1 = InetAddress.getByName("192.168.3.1")
- val host1Session = Session(user1, host1)
- val acls = Set(Acl(user1, Deny, WildCardHost, parentOp), Acl(user1, Allow, WildCardHost, All))
- simpleAclAuthorizer.addAcls(acls, Resource.ClusterResource)
- Operation.values.foreach { op =>
- val authorized = simpleAclAuthorizer.authorize(host1Session, op, Resource.ClusterResource)
- if (deniedOps.contains(op) || op == parentOp)
- assertFalse(authorized, s"DENY $parentOp should imply DENY $op")
- else
- assertTrue(authorized, s"DENY $parentOp should not imply DENY $op")
- }
- simpleAclAuthorizer.removeAcls(acls, Resource.ClusterResource)
- }
-
- @Test
- def testHighConcurrencyDeletionOfResourceAcls(): Unit = {
- val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
-
- // Alternate authorizer to keep adding and removing ZooKeeper path
- val concurrentFuctions = (0 to 50).map { _ =>
- () => {
- simpleAclAuthorizer.addAcls(Set(acl), resource)
- simpleAclAuthorizer2.removeAcls(Set(acl), resource)
- }
- }
-
- TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
-
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer2, resource)
- }
-
- @Test
- def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
-
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
- }
-
- @Test
- def testDeleteAclOnWildcardResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
-
- simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), wildCardResource)
-
- assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(wildCardResource))
- }
-
- @Test
- def testDeleteAllAclOnWildcardResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
-
- simpleAclAuthorizer.removeAcls(wildCardResource)
-
- assertEquals(Map(), simpleAclAuthorizer.getAcls())
- }
-
- @Test
- def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
-
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
- }
-
- @Test
- def testDeleteAclOnPrefixedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
-
- simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), prefixedResource)
-
- assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(prefixedResource))
- }
-
- @Test
- def testDeleteAllAclOnPrefixedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
-
- simpleAclAuthorizer.removeAcls(prefixedResource)
-
- assertEquals(Map(), simpleAclAuthorizer.getAcls())
- }
-
- @Test
- def testAddAclsOnLiteralResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), resource)
- simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), resource)
-
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(resource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
- }
-
- @Test
- def testAddAclsOnWildcardResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
- simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), wildCardResource)
-
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(wildCardResource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
- }
-
- @Test
- def testAddAclsOnPrefiexedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
- simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), prefixedResource)
-
- assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(prefixedResource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
- assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
- }
-
- @Test
- def testAuthorizeWithPrefixedResource(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", LITERAL))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED))
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", LITERAL))
-
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
-
- assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
- }
-
- @Test
- def testSingleCharacterResourceAcls(): Unit = {
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Topic, "f", LITERAL))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "f", LITERAL)))
- assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "foo", LITERAL)))
-
- simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Topic, "_", PREFIXED))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "_foo", LITERAL)))
- assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "_", LITERAL)))
- assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "foo_", LITERAL)))
- }
-
- @Test
- def testGetAclsPrincipal(): Unit = {
- val aclOnSpecificPrincipal = new Acl(principal, Allow, WildCardHost, Write)
- simpleAclAuthorizer.addAcls(Set[Acl](aclOnSpecificPrincipal), resource)
-
- assertEquals(0, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size,
- "acl on specific should not be returned for wildcard request")
- assertEquals(1, simpleAclAuthorizer.getAcls(principal).size,
- "acl on specific should be returned for specific request")
- assertEquals(1, simpleAclAuthorizer.getAcls(new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size,
- "acl on specific should be returned for different principal instance")
-
- simpleAclAuthorizer.removeAcls(resource)
- val aclOnWildcardPrincipal = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
- simpleAclAuthorizer.addAcls(Set[Acl](aclOnWildcardPrincipal), resource)
-
- assertEquals(1, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size,
- "acl on wildcard should be returned for wildcard request")
- assertEquals(0, simpleAclAuthorizer.getAcls(principal).size,
- "acl on wildcard should not be returned for specific request")
- }
-
- @Test
- def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
- givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
- assertThrows(classOf[UnsupportedVersionException], () => simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)))
- }
-
- @Test
- def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = {
- givenAuthorizerWithProtocolVersion(Option.empty)
- val resource = Resource(Topic, "z_other", PREFIXED)
- val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(PREFIXED)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): Unit = {
- givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
- val resource = Resource(Topic, "z_other", PREFIXED)
- val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(PREFIXED)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions(): Unit = {
- givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
- val resource = Resource(Topic, "z_other", LITERAL)
- val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(LITERAL)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit = {
- givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
- val resource = Resource(Topic, "z_other", LITERAL)
- val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
-
- simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(LITERAL)
-
- assertEquals(expected, actual)
- }
-
- private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]): Unit = {
- simpleAclAuthorizer.close()
-
- val props = TestUtils.createBrokerConfig(0, zkConnect)
- props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
- protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString))
-
- config = KafkaConfig.fromProps(props)
-
- simpleAclAuthorizer.configure(config.originals)
- }
-
- private def getAclChangeEventAsString(patternType: PatternType) = {
- val store = ZkAclStore(patternType)
- val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.changeStore.aclChangePath, registerWatch = true))
- children.maybeThrow()
- assertEquals(1, children.children.size, "Expecting 1 change event")
-
- val data = zooKeeperClient.handleRequest(GetDataRequest(s"${store.changeStore.aclChangePath}/${children.children.head}"))
- data.maybeThrow()
-
- new String(data.data, UTF_8)
- }
-
- private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
- var acls = originalAcls
-
- if(addedAcls.nonEmpty) {
- simpleAclAuthorizer.addAcls(addedAcls, resource)
- acls ++= addedAcls
- }
-
- if(removedAcls.nonEmpty) {
- simpleAclAuthorizer.removeAcls(removedAcls, resource)
- acls --=removedAcls
- }
-
- TestUtils.waitAndVerifyAcls(acls, simpleAclAuthorizer, resource)
-
- acls
- }
-}
diff --git a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala b/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
deleted file mode 100644
index 7c575e425a100..0000000000000
--- a/core/src/test/scala/unit/kafka/security/authorizer/AuthorizerWrapperTest.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.security.authorizer
-
-import java.util.UUID
-
-import kafka.security.auth.SimpleAclAuthorizer
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
-import kafka.zookeeper.ZooKeeperClient
-import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl._
-import org.apache.kafka.common.resource.PatternType.LITERAL
-import org.apache.kafka.common.resource.ResourcePattern
-import org.apache.kafka.common.resource.ResourceType._
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.authorizer._
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-
-import scala.annotation.nowarn
-
-class AuthorizerWrapperTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
- @nowarn("cat=deprecation")
- private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new SimpleAclAuthorizer)
- @nowarn("cat=deprecation")
- private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new SimpleAclAuthorizer)
-
- override def authorizer: Authorizer = wrappedSimpleAuthorizer
-
- @BeforeEach
- @nowarn("cat=deprecation")
- override def setUp(): Unit = {
- super.setUp()
-
- val props = TestUtils.createBrokerConfig(0, zkConnect)
-
- props.put(AclAuthorizer.SuperUsersProp, superUsers)
- config = KafkaConfig.fromProps(props)
- wrappedSimpleAuthorizer.configure(config.originals)
-
- props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
- config = KafkaConfig.fromProps(props)
- wrappedSimpleAuthorizerAllowEveryone.configure(config.originals)
-
- resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
- Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest")
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- val authorizers = Seq(wrappedSimpleAuthorizer, wrappedSimpleAuthorizerAllowEveryone)
- authorizers.foreach(a => {
- a.close()
- })
- zooKeeperClient.close()
- super.tearDown()
- }
-
- @Test
- def testAuthorizeByResourceTypeEnableAllowEveryOne(): Unit = {
- testAuthorizeByResourceTypeEnableAllowEveryOne(wrappedSimpleAuthorizer)
- }
-
- private def testAuthorizeByResourceTypeEnableAllowEveryOne(authorizer: Authorizer): Unit = {
- assertTrue(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
- "If allow.everyone.if.no.acl.found = true, " +
- "caller should have read access to at least one topic")
- val allUser = AclEntry.WildcardPrincipalString
- val allHost = AclEntry.WildcardHost
- val denyAll = new AccessControlEntry(allUser, allHost, ALL, AclPermissionType.DENY)
- val wildcardResource = new ResourcePattern(resource.resourceType(), AclEntry.WildcardResource, LITERAL)
-
- addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), resource)
- assertTrue(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
- "Should still allow since the deny only apply on the specific resource")
-
- addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), wildcardResource)
- assertFalse(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
- "When an ACL binding which can deny all users and hosts exists, " +
- "even if allow.everyone.if.no.acl.found = true, caller shouldn't have read access any topic")
- }
-
- @Test
- def testAuthorizeByResourceTypeDisableAllowEveryoneOverride(): Unit = {
- assertFalse (authorizeByResourceType(wrappedSimpleAuthorizer, requestContext, READ, resource.resourceType()),
- "If allow.everyone.if.no.acl.found = false, " +
- "caller shouldn't have read access to any topic")
- }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 38a5a12289bfc..b9e5667dc3b4d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -34,7 +34,6 @@ import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log._
import kafka.metrics.KafkaYammerMetrics
-import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, MetadataBroker}
@@ -1325,15 +1324,6 @@ object TestUtils extends Logging {
s"but got:${authorizer.acls(filter).asScala.map(_.entry).mkString(newLine + "\t", newLine + "\t", newLine)}")
}
- @deprecated("Use org.apache.kafka.server.authorizer.Authorizer", "Since 2.5")
- def waitAndVerifyAcls(expected: Set[Acl], authorizer: LegacyAuthorizer, resource: Resource): Unit = {
- val newLine = scala.util.Properties.lineSeparator
-
- waitUntilTrue(() => authorizer.getAcls(resource) == expected,
- s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
- s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}")
- }
-
/**
* Verifies that this ACL is the secure one.
*/
diff --git a/docs/upgrade.html b/docs/upgrade.html
index b7666ec0541ac..a486b6c4c910f 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -27,6 +27,8 @@ Authorizer
, SimpleAclAuthorizer
and related classes have been removed. Please use the Java Authorizer
+ and AclAuthorizer
instead.Metric#value()
method was removed (KAFKA-12573).PrincipalBuilder
, DefaultPrincipalBuilder
and ResourceFilter
.
Furthermore, deprecated constants and constructors were removed from SslConfigs
, SaslConfigs
,
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index e93a1c569148c..fcf390056c270 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -155,8 +155,6 @@ class for details.
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
# Kafka Authorizer
ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
- # Old Kafka Authorizer. This is deprecated but still supported.
- SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
INTERBROKER_LISTENER_NAME = 'INTERNAL'
JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index 4a176b06145b7..6e66ffc0b8554 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -70,8 +70,8 @@ def roll_in_secured_settings(self, client_protocol, broker_protocol):
self.kafka.close_port(SecurityConfig.PLAINTEXT)
self.set_authorizer_and_bounce(client_protocol, broker_protocol)
- def set_authorizer_and_bounce(self, client_protocol, broker_protocol, authorizer_class_name = KafkaService.ACL_AUTHORIZER):
- self.kafka.authorizer_class_name = authorizer_class_name
+ def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
+ self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
# Force use of direct ZooKeeper access due to SecurityDisabledException: No Authorizer is configured on the broker.
self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
@@ -93,8 +93,8 @@ def roll_in_sasl_mechanism(self, security_protocol, new_sasl_mechanism):
self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
self.bounce()
- # Bounce again with ACLs for new mechanism. Use old SimpleAclAuthorizer here to ensure that is also tested.
- self.set_authorizer_and_bounce(security_protocol, security_protocol, KafkaService.SIMPLE_AUTHORIZER)
+ # Bounce again with ACLs for new mechanism.
+ self.set_authorizer_and_bounce(security_protocol, security_protocol)
def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism):
# Enable the new internal listener on all brokers first