From 72d108274c98dca44514007254552481c731c958 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Fri, 21 May 2021 15:58:49 -0400 Subject: [PATCH] KAFKA-12620 Allocate producer ids on the controller (#10504) Introduce new AllocateProducerIds RPC and IBP 3.0-IV0 as part of KIP-730. This change adds a new AllocateProducerIds RPC which is used by the broker to request a block of producer IDs from the controller. The new IBP added will determine if the broker should talk directly to ZooKeeper (IBP < 3.0) or it if should use the new RPC to talk to the controller (IBP >= 3.0). Per-broker property overrides for ClusterTests were also added (in order to test mixed IBPs in a cluster) Reviewers: Colin P. McCabe --- .../apache/kafka/common/protocol/ApiKeys.java | 3 +- .../common/requests/AbstractRequest.java | 2 + .../common/requests/AbstractResponse.java | 2 + .../requests/AllocateProducerIdsRequest.java | 72 +++++ .../requests/AllocateProducerIdsResponse.java | 63 +++++ .../message/AllocateProducerIdsRequest.json | 29 ++ .../message/AllocateProducerIdsResponse.json | 32 +++ .../kafka/common/protocol/ApiKeysTest.java | 2 +- .../src/main/scala/kafka/api/ApiVersion.scala | 13 +- .../kafka/controller/KafkaController.scala | 64 ++++- .../transaction/ProducerIdManager.scala | 255 ++++++++++++------ .../transaction/TransactionCoordinator.scala | 14 +- .../kafka/network/RequestConvertToJson.scala | 2 + .../scala/kafka/server/BrokerServer.scala | 6 +- .../main/scala/kafka/server/KafkaApis.scala | 17 ++ .../main/scala/kafka/server/KafkaServer.scala | 44 +-- core/src/main/scala/kafka/zk/ZkData.scala | 28 +- .../test/java/kafka/test/ClusterConfig.java | 26 +- .../test/java/kafka/test/ClusterInstance.java | 2 + .../kafka/test/annotation/ClusterTest.java | 1 + .../test/junit/ClusterTestExtensions.java | 6 +- .../junit/RaftClusterInvocationContext.java | 21 +- .../junit/ZkClusterInvocationContext.java | 26 +- .../test/java/kafka/testkit/BrokerNode.java | 8 + .../kafka/testkit/KafkaClusterTestKit.java | 1 + .../ProducerIdsIntegrationTest.scala | 85 ++++++ .../transaction/ProducerIdManagerTest.scala | 94 +++++-- ...ransactionCoordinatorConcurrencyTest.scala | 2 +- .../TransactionCoordinatorTest.scala | 2 +- .../integration/KafkaServerTestHarness.scala | 13 +- .../unit/kafka/server/RequestQuotaTest.scala | 7 +- .../kafka/server/common/ProducerIdsBlock.java | 80 ++++++ 32 files changed, 868 insertions(+), 154 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsRequest.java create mode 100644 clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java create mode 100644 clients/src/main/resources/common/message/AllocateProducerIdsRequest.json create mode 100644 clients/src/main/resources/common/message/AllocateProducerIdsResponse.json create mode 100644 core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala create mode 100644 server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 8ec2d02ea3de6..bd28aa26aa977 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -107,7 +107,8 @@ public enum ApiKeys { BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false), UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true), DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS), - LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS); + LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS), + ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, false); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 3137e3e500aef..0c38e998fe247 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -301,6 +301,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion, return DescribeTransactionsRequest.parse(buffer, apiVersion); case LIST_TRANSACTIONS: return ListTransactionsRequest.parse(buffer, apiVersion); + case ALLOCATE_PRODUCER_IDS: + return AllocateProducerIdsRequest.parse(buffer, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 5f7b88f269405..47f2b3c7f3099 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -245,6 +245,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response return DescribeTransactionsResponse.parse(responseBuffer, version); case LIST_TRANSACTIONS: return ListTransactionsResponse.parse(responseBuffer, version); + case ALLOCATE_PRODUCER_IDS: + return AllocateProducerIdsResponse.parse(responseBuffer, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsRequest.java new file mode 100644 index 0000000000000..7938f92df56d9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsRequest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class AllocateProducerIdsRequest extends AbstractRequest { + private final AllocateProducerIdsRequestData data; + + public AllocateProducerIdsRequest(AllocateProducerIdsRequestData data, short version) { + super(ApiKeys.ALLOCATE_PRODUCER_IDS, version); + this.data = data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData() + .setThrottleTimeMs(throttleTimeMs) + .setErrorCode(Errors.forException(e).code())); + } + + @Override + public AllocateProducerIdsRequestData data() { + return data; + } + + public static class Builder extends AbstractRequest.Builder { + + private final AllocateProducerIdsRequestData data; + + public Builder(AllocateProducerIdsRequestData data) { + super(ApiKeys.ALLOCATE_PRODUCER_IDS); + this.data = data; + } + + @Override + public AllocateProducerIdsRequest build(short version) { + return new AllocateProducerIdsRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + public static AllocateProducerIdsRequest parse(ByteBuffer buffer, short version) { + return new AllocateProducerIdsRequest(new AllocateProducerIdsRequestData( + new ByteBufferAccessor(buffer), version), version); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java new file mode 100644 index 0000000000000..5d48c39e8019d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; + +public class AllocateProducerIdsResponse extends AbstractResponse { + + private final AllocateProducerIdsResponseData data; + + public AllocateProducerIdsResponse(AllocateProducerIdsResponseData data) { + super(ApiKeys.ALLOCATE_PRODUCER_IDS); + this.data = data; + } + + @Override + public AllocateProducerIdsResponseData data() { + return data; + } + + /** + * The number of each type of error in the response, including {@link Errors#NONE} and top-level errors as well as + * more specifically scoped errors (such as topic or partition-level errors). + * + * @return A count of errors. + */ + @Override + public Map errorCounts() { + return Collections.singletonMap(Errors.forCode(data.errorCode()), 1); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short version) { + return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData( + new ByteBufferAccessor(buffer), version)); + } +} diff --git a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json new file mode 100644 index 0000000000000..0cfa494291a36 --- /dev/null +++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implie +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 67, + "type": "request", + "listeners": ["controller", "zkBroker"], + "name": "AllocateProducerIdsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the requesting broker" }, + { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1", + "about": "The epoch of the requesting broker" } + ] +} diff --git a/clients/src/main/resources/common/message/AllocateProducerIdsResponse.json b/clients/src/main/resources/common/message/AllocateProducerIdsResponse.json new file mode 100644 index 0000000000000..0d849c098568b --- /dev/null +++ b/clients/src/main/resources/common/message/AllocateProducerIdsResponse.json @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 67, + "type": "response", + "name": "AllocateProducerIdsResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top level response error code" }, + { "name": "ProducerIdStart", "type": "int64", "versions": "0+", "entityType": "producerId", + "about": "The first producer ID in this range, inclusive"}, + { "name": "ProducerIdLen", "type": "int32", "versions": "0+", + "about": "The number of producer IDs in this range"} + ] +} diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java index 17d2e1ce26e43..3c66b211bec4f 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java @@ -62,7 +62,7 @@ public void testAlterIsrIsClusterAction() { public void testResponseThrottleTime() { Set authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE); // Newer protocol apis include throttle time ms even for cluster actions - Set clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR); + Set clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR, ApiKeys.ALLOCATE_PRODUCER_IDS); for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) { Schema responseSchema = apiKey.messageType.responseSchemas()[apiKey.latestVersion()]; BoundField throttleTimeField = responseSchema.get("throttle_time_ms"); diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 879373787365f..724bc9894045a 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -112,7 +112,9 @@ object ApiVersion { // Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) KAFKA_2_8_IV0, // Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) - KAFKA_2_8_IV1 + KAFKA_2_8_IV1, + // Introduce AllocateProducerIds (KIP-730) + KAFKA_3_0_IV0 ) // Map keys are the union of the short and full versions @@ -197,6 +199,8 @@ sealed trait ApiVersion extends Ordered[ApiVersion] { def isAlterIsrSupported: Boolean = this >= KAFKA_2_7_IV2 + def isAllocateProducerIdsSupported: Boolean = this >= KAFKA_3_0_IV0 + override def compare(that: ApiVersion): Int = ApiVersion.orderingByVersion.compare(this, that) @@ -447,6 +451,13 @@ case object KAFKA_2_8_IV1 extends DefaultApiVersion { val id: Int = 32 } +case object KAFKA_3_0_IV0 extends DefaultApiVersion { + val shortVersion: String = "3.0" + val subVersion = "IV0" + val recordVersion = RecordVersion.V2 + val id: Int = 33 +} + object ApiVersionValidator extends Validator { override def ensureValid(name: String, value: Any): Unit = { diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b17a6e8e6f7da..654649b4f48c2 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -18,13 +18,13 @@ package kafka.controller import java.util import java.util.concurrent.TimeUnit - import kafka.admin.AdminOperationException import kafka.api._ import kafka.common._ import kafka.controller.KafkaController.AlterIsrCallback import kafka.cluster.Broker import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback} +import kafka.coordinator.transaction.ZkProducerIdManager import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server._ import kafka.utils._ @@ -37,20 +37,20 @@ import org.apache.kafka.common.ElectionType import org.apache.kafka.common.KafkaException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException} -import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData} +import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterIsrRequestData, AlterIsrResponseData, UpdateFeaturesRequestData} import org.apache.kafka.common.feature.{Features, FinalizedVersionRange} -import org.apache.kafka.common.message.UpdateFeaturesRequestData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse} import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.server.common.ProducerIdsBlock import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ -import scala.util.{Failure, Try} +import scala.util.{Failure, Success, Try} sealed trait ElectionTrigger final case object AutoTriggered extends ElectionTrigger @@ -2376,6 +2376,54 @@ class KafkaController(val config: KafkaConfig, } } + def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData, + callback: AllocateProducerIdsResponseData => Unit): Unit = { + + def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit = { + results match { + case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code)) + case Right(pidBlock) => callback.apply( + new AllocateProducerIdsResponseData() + .setProducerIdStart(pidBlock.producerIdStart()) + .setProducerIdLen(pidBlock.producerIdLen())) + } + } + eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId, + allocateProducerIdsRequest.brokerEpoch, eventManagerCallback)) + } + + def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = { + // Handle a few short-circuits + if (!isActive) { + callback.apply(Left(Errors.NOT_CONTROLLER)) + return + } + + val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId) + if (brokerEpochOpt.isEmpty) { + warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId") + callback.apply(Left(Errors.BROKER_ID_NOT_REGISTERED)) + return + } + + if (!brokerEpochOpt.contains(brokerEpoch)) { + warn(s"Ignoring AllocateProducerIds due to stale broker epoch $brokerEpoch for broker $brokerId") + callback.apply(Left(Errors.STALE_BROKER_EPOCH)) + return + } + + val maybeNewProducerIdsBlock = try { + Try(ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this)) + } catch { + case ke: KafkaException => Failure(ke) + } + + maybeNewProducerIdsBlock match { + case Failure(exception) => callback.apply(Left(Errors.forException(exception))) + case Success(newProducerIdBlock) => callback.apply(Right(newProducerIdBlock)) + } + } + private def processControllerChange(): Unit = { maybeResign() } @@ -2454,6 +2502,8 @@ class KafkaController(val config: KafkaConfig, processIsrChangeNotification() case AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback) => processAlterIsr(brokerId, brokerEpoch, isrsToAlter, callback) + case AllocateProducerIds(brokerId, brokerEpoch, callback) => + processAllocateProducerIds(brokerId, brokerEpoch, callback) case Startup => processStartup() } @@ -2747,6 +2797,12 @@ case class UpdateFeatures(request: UpdateFeaturesRequest, override def preempt(): Unit = {} } +case class AllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit) + extends ControllerEvent { + override def state: ControllerState = ControllerState.Idle + override def preempt(): Unit = {} +} + // Used only in test cases abstract class MockEvent(val state: ControllerState) extends ControllerEvent { diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala index 8d3f542c2b725..b5d419ddf4acb 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala @@ -16,146 +16,227 @@ */ package kafka.coordinator.transaction -import java.nio.charset.StandardCharsets - -import kafka.utils.{Json, Logging} +import kafka.server.{BrokerToControllerChannelManager, ControllerRequestCompletionHandler} +import kafka.utils.Logging import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode} +import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.message.AllocateProducerIdsRequestData +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AllocateProducerIdsRequest, AllocateProducerIdsResponse} +import org.apache.kafka.server.common.ProducerIdsBlock -import scala.jdk.CollectionConverters._ +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean +import scala.util.{Failure, Success, Try} /** * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way * such that the same producerId will not be assigned twice across multiple transaction coordinators. * - * ProducerIds are managed via ZooKeeper, where the latest producerId block is written on the corresponding ZK - * path by the manager who claims the block, where the written block_start and block_end are both inclusive. + * ProducerIds are managed by the controller. When requesting a new range of IDs, we are guaranteed to receive + * a unique block. */ -object ProducerIdManager extends Logging { - val CurrentVersion: Long = 1L - val PidBlockSize: Long = 1000L - - def generateProducerIdBlockJson(producerIdBlock: ProducerIdBlock): Array[Byte] = { - Json.encodeAsBytes(Map("version" -> CurrentVersion, - "broker" -> producerIdBlock.brokerId, - "block_start" -> producerIdBlock.blockStartId.toString, - "block_end" -> producerIdBlock.blockEndId.toString).asJava - ) - } - def parseProducerIdBlockData(jsonData: Array[Byte]): ProducerIdBlock = { - try { - Json.parseBytes(jsonData).map(_.asJsonObject).flatMap { js => - val brokerId = js("broker").to[Int] - val blockStart = js("block_start").to[String].toLong - val blockEnd = js("block_end").to[String].toLong - Some(ProducerIdBlock(brokerId, blockStart, blockEnd)) - }.getOrElse(throw new KafkaException(s"Failed to parse the producerId block json $jsonData")) - } catch { - case e: java.lang.NumberFormatException => - // this should never happen: the written data has exceeded long type limit - fatal(s"Read jason data $jsonData contains producerIds that have exceeded long type limit") - throw e - } +object ProducerIdManager { + // Once we reach this percentage of PIDs consumed from the current block, trigger a fetch of the next block + val PidPrefetchThreshold = 0.90 + + // Creates a ProducerIdGenerate that directly interfaces with ZooKeeper, IBP < 3.0-IV0 + def zk(brokerId: Int, zkClient: KafkaZkClient): ZkProducerIdManager = { + new ZkProducerIdManager(brokerId, zkClient) } -} -case class ProducerIdBlock(brokerId: Int, blockStartId: Long, blockEndId: Long) { - override def toString: String = { - val producerIdBlockInfo = new StringBuilder - producerIdBlockInfo.append("(brokerId:" + brokerId) - producerIdBlockInfo.append(",blockStartProducerId:" + blockStartId) - producerIdBlockInfo.append(",blockEndProducerId:" + blockEndId + ")") - producerIdBlockInfo.toString() + // Creates a ProducerIdGenerate that uses AllocateProducerIds RPC, IBP >= 3.0-IV0 + def rpc(brokerId: Int, + brokerEpochSupplier: () => Long, + controllerChannel: BrokerToControllerChannelManager, + maxWaitMs: Int): RPCProducerIdManager = { + new RPCProducerIdManager(brokerId, brokerEpochSupplier, controllerChannel, maxWaitMs) } } -trait ProducerIdGenerator { +trait ProducerIdManager { def generateProducerId(): Long def shutdown() : Unit = {} } -class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging { - - this.logIdent = "[ProducerId Manager " + brokerId + "]: " - - private var currentProducerIdBlock: ProducerIdBlock = null - private var nextProducerId: Long = -1L - - // grab the first block of producerIds - this synchronized { - getNewProducerIdBlock() - nextProducerId = currentProducerIdBlock.blockStartId - } - - private def getNewProducerIdBlock(): Unit = { +object ZkProducerIdManager { + def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = { + // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other + // brokers may be generating PID blocks during a rolling upgrade var zkWriteComplete = false while (!zkWriteComplete) { // refresh current producerId block from zookeeper again val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path) // generate the new producerId block - currentProducerIdBlock = dataOpt match { + val newProducerIdBlock = dataOpt match { case Some(data) => - val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data) - debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion") + val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data) + logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion") - if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) { + if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) { // we have exhausted all producerIds (wow!), treat it as a fatal error - fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})") + logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})") throw new KafkaException("Have exhausted all producerIds.") } - ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize) + new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) case None => - debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block") - ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1) + logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block") + new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) } - val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock) + val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock) // try to write the new producerId block into zookeeper - val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, - newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData)) + val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None) zkWriteComplete = succeeded - if (zkWriteComplete) - info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version") + if (zkWriteComplete) { + logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version") + return newProducerIdBlock + } } + throw new IllegalStateException() } +} - private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: Array[Byte]): (Boolean, Int) = { - try { - val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData) - zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match { - case (Some(data), zkVersion) => - val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data) - (currProducerIdBLock == expectedPidBlock, zkVersion) - case (None, _) => (false, -1) - } - } catch { - case e: Exception => - warn(s"Error while checking for producerId block Zk data on path $path: expected data " + - s"${new String(expectedData, StandardCharsets.UTF_8)}", e) - (false, -1) +class ZkProducerIdManager(brokerId: Int, + zkClient: KafkaZkClient) extends ProducerIdManager with Logging { + + this.logIdent = "[ZK ProducerId Manager " + brokerId + "]: " + + private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY + private var nextProducerId: Long = _ + + // grab the first block of producerIds + this synchronized { + allocateNewProducerIdBlock() + nextProducerId = currentProducerIdBlock.producerIdStart + } + + private def allocateNewProducerIdBlock(): Unit = { + this synchronized { + currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this) } } def generateProducerId(): Long = { this synchronized { // grab a new block of producerIds if this block has been exhausted - if (nextProducerId > currentProducerIdBlock.blockEndId) { - getNewProducerIdBlock() - nextProducerId = currentProducerIdBlock.blockStartId + 1 + if (nextProducerId > currentProducerIdBlock.producerIdEnd) { + allocateNewProducerIdBlock() + nextProducerId = currentProducerIdBlock.producerIdStart + } + nextProducerId += 1 + nextProducerId - 1 + } + } +} + +class RPCProducerIdManager(brokerId: Int, + brokerEpochSupplier: () => Long, + controllerChannel: BrokerToControllerChannelManager, + maxWaitMs: Int) extends ProducerIdManager with Logging { + + this.logIdent = "[RPC ProducerId Manager " + brokerId + "]: " + + private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1) + private val requestInFlight = new AtomicBoolean(false) + private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY + private var nextProducerId: Long = -1L + + override def generateProducerId(): Long = { + this synchronized { + if (nextProducerId == -1L) { + // Send an initial request to get the first block + maybeRequestNextBlock() + nextProducerId = 0L } else { nextProducerId += 1 + + // Check if we need to fetch the next block + if (nextProducerId >= (currentProducerIdBlock.producerIdStart + currentProducerIdBlock.producerIdLen * ProducerIdManager.PidPrefetchThreshold)) { + maybeRequestNextBlock() + } } - nextProducerId - 1 + // If we've exhausted the current block, grab the next block (waiting if necessary) + if (nextProducerId > currentProducerIdBlock.producerIdEnd) { + val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) + if (block == null) { + throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block") + } else { + block match { + case Success(nextBlock) => + currentProducerIdBlock = nextBlock + nextProducerId = currentProducerIdBlock.producerIdStart + case Failure(t) => throw t + } + } + } + nextProducerId + } + } + + + private def maybeRequestNextBlock(): Unit = { + if (nextProducerIdBlock.isEmpty && requestInFlight.compareAndSet(false, true)) { + sendRequest() + } + } + + private[transaction] def sendRequest(): Unit = { + val message = new AllocateProducerIdsRequestData() + .setBrokerEpoch(brokerEpochSupplier.apply()) + .setBrokerId(brokerId) + + val request = new AllocateProducerIdsRequest.Builder(message) + debug("Requesting next Producer ID block") + controllerChannel.sendRequest(request, new ControllerRequestCompletionHandler() { + override def onComplete(response: ClientResponse): Unit = { + val message = response.responseBody().asInstanceOf[AllocateProducerIdsResponse] + handleAllocateProducerIdsResponse(message) + } + + override def onTimeout(): Unit = handleTimeout() + }) + } + + private[transaction] def handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit = { + requestInFlight.set(false) + val data = response.data + Errors.forCode(data.errorCode()) match { + case Errors.NONE => + debug(s"Got next producer ID block from controller $data") + // Do some sanity checks on the response + if (data.producerIdStart() < currentProducerIdBlock.producerIdEnd) { + nextProducerIdBlock.put(Failure(new KafkaException( + s"Producer ID block is not monotonic with current block: current=$currentProducerIdBlock response=$data"))) + } else if (data.producerIdStart() < 0 || data.producerIdLen() < 0 || data.producerIdStart() > Long.MaxValue - data.producerIdLen()) { + nextProducerIdBlock.put(Failure(new KafkaException(s"Producer ID block includes invalid ID range: $data"))) + } else { + nextProducerIdBlock.put( + Success(new ProducerIdsBlock(brokerId, data.producerIdStart(), data.producerIdLen()))) + } + case Errors.STALE_BROKER_EPOCH => + warn("Our broker epoch was stale, trying again.") + maybeRequestNextBlock() + case Errors.BROKER_ID_NOT_REGISTERED => + warn("Our broker ID is not yet known by the controller, trying again.") + maybeRequestNextBlock() + case e: Errors => + warn("Had an unknown error from the controller, giving up.") + nextProducerIdBlock.put(Failure(e.exception())) } } - override def shutdown(): Unit = { - info(s"Shutdown complete: last producerId assigned $nextProducerId") + private[transaction] def handleTimeout(): Unit = { + warn("Timed out when requesting AllocateProducerIds from the controller.") + requestInFlight.set(false) + nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception)) + maybeRequestNextBlock() } } diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 0e3fa28098118..543e9c85c36d5 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -35,7 +35,7 @@ object TransactionCoordinator { def apply(config: KafkaConfig, replicaManager: ReplicaManager, scheduler: Scheduler, - createProducerIdGenerator: () => ProducerIdGenerator, + createProducerIdGenerator: () => ProducerIdManager, metrics: Metrics, metadataCache: MetadataCache, time: Time): TransactionCoordinator = { @@ -82,7 +82,7 @@ object TransactionCoordinator { class TransactionCoordinator(brokerId: Int, txnConfig: TransactionConfig, scheduler: Scheduler, - createProducerIdGenerator: () => ProducerIdGenerator, + createProducerIdManager: () => ProducerIdManager, txnManager: TransactionStateManager, txnMarkerChannelManager: TransactionMarkerChannelManager, time: Time, @@ -99,7 +99,7 @@ class TransactionCoordinator(brokerId: Int, /* Active flag of the coordinator */ private val isActive = new AtomicBoolean(false) - val producerIdGenerator = createProducerIdGenerator() + val producerIdManager = createProducerIdManager() def handleInitProducerId(transactionalId: String, transactionTimeoutMs: Int, @@ -109,7 +109,7 @@ class TransactionCoordinator(brokerId: Int, if (transactionalId == null) { // if the transactional id is null, then always blindly accept the request // and return a new producerId from the producerId manager - val producerId = producerIdGenerator.generateProducerId() + val producerId = producerIdManager.generateProducerId() responseCallback(InitProducerIdResult(producerId, producerEpoch = 0, Errors.NONE)) } else if (transactionalId.isEmpty) { // if transactional id is empty then return error as invalid request. This is @@ -121,7 +121,7 @@ class TransactionCoordinator(brokerId: Int, } else { val coordinatorEpochAndMetadata = txnManager.getTransactionState(transactionalId).flatMap { case None => - val producerId = producerIdGenerator.generateProducerId() + val producerId = producerIdManager.generateProducerId() val createdMetadata = new TransactionMetadata(transactionalId = transactionalId, producerId = producerId, lastProducerId = RecordBatch.NO_PRODUCER_ID, @@ -225,7 +225,7 @@ class TransactionCoordinator(brokerId: Int, // If the epoch is exhausted and the expected epoch (if provided) matches it, generate a new producer ID if (txnMetadata.isProducerEpochExhausted && expectedProducerIdAndEpoch.forall(_.epoch == txnMetadata.producerEpoch)) { - val newProducerId = producerIdGenerator.generateProducerId() + val newProducerId = producerIdManager.generateProducerId() Right(txnMetadata.prepareProducerIdRotation(newProducerId, transactionTimeoutMs, time.milliseconds(), expectedProducerIdAndEpoch.isDefined)) } else { @@ -675,7 +675,7 @@ class TransactionCoordinator(brokerId: Int, info("Shutting down.") isActive.set(false) scheduler.shutdown() - producerIdGenerator.shutdown() + producerIdManager.shutdown() txnManager.shutdown() txnMarkerChannelManager.shutdown() info("Shutdown complete.") diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala index c0fbd68e75bf2..bb8e327b1890a 100644 --- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala +++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala @@ -29,6 +29,7 @@ object RequestConvertToJson { request match { case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data, request.version) case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data, request.version) + case req: AllocateProducerIdsRequest => AllocateProducerIdsRequestDataJsonConverter.write(req.data, request.version) case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data, request.version) case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data, request.version) case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data, request.version) @@ -103,6 +104,7 @@ object RequestConvertToJson { response match { case res: AddOffsetsToTxnResponse => AddOffsetsToTxnResponseDataJsonConverter.write(res.data, version) case res: AddPartitionsToTxnResponse => AddPartitionsToTxnResponseDataJsonConverter.write(res.data, version) + case res: AllocateProducerIdsResponse => AllocateProducerIdsResponseDataJsonConverter.write(res.data, version) case res: AlterClientQuotasResponse => AlterClientQuotasResponseDataJsonConverter.write(res.data, version) case res: AlterConfigsResponse => AlterConfigsResponseDataJsonConverter.write(res.data, version) case res: AlterIsrResponse => AlterIsrResponseDataJsonConverter.write(res.data, version) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 08b4653e1684f..bea0c53c1d78f 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -25,7 +25,7 @@ import java.net.InetAddress import kafka.cluster.Broker.ServerInfo import kafka.coordinator.group.GroupCoordinator -import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinator} +import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator} import kafka.log.LogManager import kafka.metrics.KafkaYammerMetrics import kafka.network.SocketServer @@ -376,7 +376,7 @@ class BrokerServer( } } - class TemporaryProducerIdManager() extends ProducerIdGenerator { + class TemporaryProducerIdManager() extends ProducerIdManager { val maxProducerIdsPerBrokerEpoch = 1000000 var currentOffset = -1 override def generateProducerId(): Long = { @@ -390,7 +390,7 @@ class BrokerServer( } } - def createTemporaryProducerIdManager(): ProducerIdGenerator = { + def createTemporaryProducerIdManager(): ProducerIdManager = { new TemporaryProducerIdManager() } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 3f83eb3af3b4e..d75e4ae3651d7 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -216,6 +216,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) + case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { @@ -3267,6 +3268,22 @@ class KafkaApis(val requestChannel: RequestChannel, new ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs))) } + def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = { + val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + + val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest] + + if (!zkSupport.controller.isActive) + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => + allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, Errors.NOT_CONTROLLER.exception)) + else + zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, producerIdsResponse => + requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => + new AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs))) + ) + } + private def updateRecordConversionStats(request: RequestChannel.Request, tp: TopicPartition, conversionStats: RecordConversionStats): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7014574eacb0b..fa182c0609daa 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -134,7 +134,7 @@ class KafkaServer( var autoTopicCreationManager: AutoTopicCreationManager = null - var clientToControllerChannelManager: Option[BrokerToControllerChannelManager] = None + var clientToControllerChannelManager: BrokerToControllerChannelManager = null var alterIsrManager: AlterIsrManager = null @@ -257,19 +257,21 @@ class KafkaServer( tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) + clientToControllerChannelManager = BrokerToControllerChannelManager( + controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache), + time = time, + metrics = metrics, + config = config, + channelName = "forwarding", + threadNamePrefix = threadNamePrefix, + retryTimeoutMs = config.requestTimeoutMs.longValue) + clientToControllerChannelManager.start() + /* start forwarding manager */ + var autoTopicCreationChannel = Option.empty[BrokerToControllerChannelManager] if (enableForwarding) { - val brokerToControllerManager = BrokerToControllerChannelManager( - controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache), - time = time, - metrics = metrics, - config = config, - channelName = "forwarding", - threadNamePrefix = threadNamePrefix, - retryTimeoutMs = config.requestTimeoutMs.longValue) - brokerToControllerManager.start() - this.forwardingManager = Some(ForwardingManager(brokerToControllerManager)) - clientToControllerChannelManager = Some(brokerToControllerManager) + this.forwardingManager = Some(ForwardingManager(clientToControllerChannelManager)) + autoTopicCreationChannel = Some(clientToControllerChannelManager) } val apiVersionManager = ApiVersionManager( @@ -330,10 +332,21 @@ class KafkaServer( groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics) groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions)) + /* create producer ids manager */ + val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) { + ProducerIdManager.rpc( + config.brokerId, + brokerEpochSupplier = () => kafkaController.brokerEpoch, + clientToControllerChannelManager, + config.requestTimeoutMs + ) + } else { + ProducerIdManager.zk(config.brokerId, zkClient) + } /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), - () => new ProducerIdManager(config.brokerId, zkClient), metrics, metadataCache, Time.SYSTEM) + () => producerIdManager, metrics, metadataCache, Time.SYSTEM) transactionCoordinator.startup( () => zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionTopicPartitions)) @@ -342,7 +355,7 @@ class KafkaServer( config, metadataCache, threadNamePrefix, - clientToControllerChannelManager, + autoTopicCreationChannel, Some(adminManager), Some(kafkaController), groupCoordinator, @@ -680,7 +693,8 @@ class KafkaServer( if (alterIsrManager != null) CoreUtils.swallow(alterIsrManager.shutdown(), this) - CoreUtils.swallow(clientToControllerChannelManager.foreach(_.shutdown()), this) + if (clientToControllerChannelManager != null) + CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) if (logManager != null) CoreUtils.swallow(logManager.shutdown(), this) diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 4941ece5163cc..0f6db4a5d3dc1 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -19,7 +19,6 @@ package kafka.zk import java.nio.charset.StandardCharsets.UTF_8 import java.util import java.util.Properties - import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.core.JsonProcessingException import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_2_7_IV0, LeaderAndIsr} @@ -40,6 +39,7 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{SecurityUtils, Time} +import org.apache.kafka.server.common.ProducerIdsBlock import org.apache.zookeeper.ZooDefs import org.apache.zookeeper.data.{ACL, Stat} @@ -766,7 +766,33 @@ object BrokerSequenceIdZNode { } object ProducerIdBlockZNode { + val CurrentVersion: Long = 1L + def path = "/latest_producer_id_block" + + def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = { + Json.encodeAsBytes(Map("version" -> CurrentVersion, + "broker" -> producerIdBlock.brokerId, + "block_start" -> producerIdBlock.producerIdStart.toString, + "block_end" -> producerIdBlock.producerIdEnd.toString).asJava + ) + } + + def parseProducerIdBlockData(jsonData: Array[Byte]): ProducerIdsBlock = { + val jsonDataAsString = jsonData.map(_.toChar).mkString + try { + Json.parseBytes(jsonData).map(_.asJsonObject).flatMap { js => + val brokerId = js("broker").to[Int] + val blockStart = js("block_start").to[String].toLong + val blockEnd = js("block_end").to[String].toLong + Some(new ProducerIdsBlock(brokerId, blockStart, Math.toIntExact(blockEnd - blockStart + 1))) + }.getOrElse(throw new KafkaException(s"Failed to parse the producerId block json $jsonDataAsString")) + } catch { + case e: java.lang.NumberFormatException => + // this should never happen: the written data has exceeded long type limit + throw new KafkaException(s"Read jason data $jsonDataAsString contains producerIds that have exceeded long type limit", e) + } + } } object DelegationTokenAuthZNode { diff --git a/core/src/test/java/kafka/test/ClusterConfig.java b/core/src/test/java/kafka/test/ClusterConfig.java index e7e8bf5a0234a..20b74cf43244b 100644 --- a/core/src/test/java/kafka/test/ClusterConfig.java +++ b/core/src/test/java/kafka/test/ClusterConfig.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import java.io.File; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -40,6 +41,7 @@ public class ClusterConfig { private final SecurityProtocol securityProtocol; private final String listenerName; private final File trustStoreFile; + private final String ibp; private final Properties serverProperties = new Properties(); private final Properties producerProperties = new Properties(); @@ -47,9 +49,11 @@ public class ClusterConfig { private final Properties adminClientProperties = new Properties(); private final Properties saslServerProperties = new Properties(); private final Properties saslClientProperties = new Properties(); + private final Map perBrokerOverrideProperties = new HashMap<>(); ClusterConfig(Type type, int brokers, int controllers, String name, boolean autoStart, - SecurityProtocol securityProtocol, String listenerName, File trustStoreFile) { + SecurityProtocol securityProtocol, String listenerName, File trustStoreFile, + String ibp) { this.type = type; this.brokers = brokers; this.controllers = controllers; @@ -58,6 +62,7 @@ public class ClusterConfig { this.securityProtocol = securityProtocol; this.listenerName = listenerName; this.trustStoreFile = trustStoreFile; + this.ibp = ibp; } public Type clusterType() { @@ -116,16 +121,25 @@ public Optional trustStoreFile() { return Optional.ofNullable(trustStoreFile); } + public Optional ibp() { + return Optional.ofNullable(ibp); + } + + public Properties brokerServerProperties(int brokerId) { + return perBrokerOverrideProperties.computeIfAbsent(brokerId, __ -> new Properties()); + } + public Map nameTags() { Map tags = new LinkedHashMap<>(3); name().ifPresent(name -> tags.put("Name", name)); + ibp().ifPresent(ibp -> tags.put("IBP", ibp)); tags.put("Security", securityProtocol.name()); listenerName().ifPresent(listener -> tags.put("Listener", listener)); return tags; } public ClusterConfig copyOf() { - ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile); + ClusterConfig copy = new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, ibp); copy.serverProperties.putAll(serverProperties); copy.producerProperties.putAll(producerProperties); copy.consumerProperties.putAll(consumerProperties); @@ -151,6 +165,7 @@ public static class Builder { private SecurityProtocol securityProtocol; private String listenerName; private File trustStoreFile; + private String ibp; Builder(Type type, int brokers, int controllers, boolean autoStart, SecurityProtocol securityProtocol) { this.type = type; @@ -200,8 +215,13 @@ public Builder trustStoreFile(File trustStoreFile) { return this; } + public Builder ibp(String ibp) { + this.ibp = ibp; + return this; + } + public ClusterConfig build() { - return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile); + return new ClusterConfig(type, brokers, controllers, name, autoStart, securityProtocol, listenerName, trustStoreFile, ibp); } } } diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index cac986fdc317c..021db5a34381e 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -94,4 +94,6 @@ default Admin createAdminClient() { void start(); void stop(); + + void rollingBrokerRestart(); } diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java index 687255c3c4705..11336ab87a15f 100644 --- a/core/src/test/java/kafka/test/annotation/ClusterTest.java +++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java @@ -40,5 +40,6 @@ String name() default ""; SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT; String listener() default ""; + String ibp() default ""; ClusterConfigProperty[] serverProperties() default {}; } diff --git a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java index 94eefbd91cb81..293f00b035ca5 100644 --- a/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java +++ b/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java @@ -183,7 +183,7 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu if (!annot.name().isEmpty()) { builder.name(annot.name()); } else { - builder.name(context.getDisplayName()); + builder.name(context.getRequiredTestMethod().getName()); } if (!annot.listener().isEmpty()) { builder.listenerName(annot.listener()); @@ -194,6 +194,10 @@ private void processClusterTest(ExtensionContext context, ClusterTest annot, Clu properties.put(property.key(), property.value()); } + if (!annot.ibp().isEmpty()) { + builder.ibp(annot.ibp()); + } + ClusterConfig config = builder.build(); config.serverProperties().putAll(properties); type.invocationContexts(config, testInvocations); diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index b4e50c59bfe98..fc6b55736c3fc 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -66,8 +66,8 @@ public RaftClusterInvocationContext(ClusterConfig clusterConfig) { @Override public String getDisplayName(int invocationIndex) { String clusterDesc = clusterConfig.nameTags().entrySet().stream() - .map(Object::toString) - .collect(Collectors.joining(", ")); + .map(Object::toString) + .collect(Collectors.joining(", ")); return String.format("[%d] Type=Raft, %s", invocationIndex, clusterDesc); } @@ -75,10 +75,14 @@ public String getDisplayName(int invocationIndex) { public List getAdditionalExtensions() { return Arrays.asList( (BeforeTestExecutionCallback) context -> { - KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder( - new TestKitNodes.Builder(). + TestKitNodes nodes = new TestKitNodes.Builder(). setNumBrokerNodes(clusterConfig.numBrokers()). - setNumControllerNodes(clusterConfig.numControllers()).build()); + setNumControllerNodes(clusterConfig.numControllers()).build(); + nodes.brokerNodes().forEach((brokerId, brokerNode) -> { + clusterConfig.brokerServerProperties(brokerId).forEach( + (key, value) -> brokerNode.propertyOverrides().put(key.toString(), value.toString())); + }); + KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes); // Copy properties into the TestKit builder clusterConfig.serverProperties().forEach((key, value) -> builder.setConfigProp(key.toString(), value.toString())); @@ -192,5 +196,10 @@ public void stop() { } } } + + @Override + public void rollingBrokerRestart() { + throw new UnsupportedOperationException("Restarting Raft servers is not yet supported."); + } } -} \ No newline at end of file +} diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java index 8d4660b77382e..cd8cdc11a800d 100644 --- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java @@ -19,6 +19,7 @@ import kafka.api.IntegrationTestHarness; import kafka.network.SocketServer; +import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.TestUtils; import org.apache.kafka.clients.admin.Admin; @@ -32,6 +33,7 @@ import org.junit.jupiter.api.extension.TestTemplateInvocationContext; import scala.Option; import scala.collection.JavaConverters; +import scala.collection.Seq; import scala.compat.java8.OptionConverters; import java.io.File; @@ -89,9 +91,20 @@ public List getAdditionalExtensions() { // This is what tests normally extend from to start a cluster, here we create it anonymously and // configure the cluster using values from ClusterConfig IntegrationTestHarness cluster = new IntegrationTestHarness() { + + @Override + public void modifyConfigs(Seq props) { + super.modifyConfigs(props); + for (int i = 0; i < props.length(); i++) { + props.apply(i).putAll(clusterConfig.brokerServerProperties(i)); + } + } + @Override public Properties serverConfig() { - return clusterConfig.serverProperties(); + Properties props = clusterConfig.serverProperties(); + clusterConfig.ibp().ifPresent(ibp -> props.put(KafkaConfig.InterBrokerProtocolVersionProp(), ibp)); + return props; } @Override @@ -248,5 +261,16 @@ public void stop() { clusterReference.get().tearDown(); } } + + @Override + public void rollingBrokerRestart() { + if (!started.get()) { + throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!"); + } + for (int i = 0; i < clusterReference.get().brokerCount(); i++) { + clusterReference.get().killBroker(i); + } + clusterReference.get().restartDeadBrokers(true); + } } } diff --git a/core/src/test/java/kafka/testkit/BrokerNode.java b/core/src/test/java/kafka/testkit/BrokerNode.java index 0b5859404b346..32bd51b4b56dc 100644 --- a/core/src/test/java/kafka/testkit/BrokerNode.java +++ b/core/src/test/java/kafka/testkit/BrokerNode.java @@ -21,7 +21,9 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class BrokerNode implements TestKitNode { public static class Builder { @@ -68,6 +70,7 @@ public BrokerNode build() { private final Uuid incarnationId; private final String metadataDirectory; private final List logDataDirectories; + private final Map propertyOverrides; BrokerNode(int id, Uuid incarnationId, @@ -77,6 +80,7 @@ public BrokerNode build() { this.incarnationId = incarnationId; this.metadataDirectory = metadataDirectory; this.logDataDirectories = new ArrayList<>(logDataDirectories); + this.propertyOverrides = new HashMap<>(); } @Override @@ -96,4 +100,8 @@ public String metadataDirectory() { public List logDataDirectories() { return logDataDirectories; } + + public Map propertyOverrides() { + return propertyOverrides; + } } diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 9c7e4e0b7e089..79d7bda1accbd 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -216,6 +216,7 @@ metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftS // Just like above, we set a placeholder voter list here until we // find out what ports the controllers picked. props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString); + props.putAll(node.propertyOverrides()); KafkaConfig config = new KafkaConfig(props, false, Option.empty()); String threadNamePrefix = String.format("broker%d_", node.id()); diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala new file mode 100644 index 0000000000000..6c7c248e63188 --- /dev/null +++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator.transaction + +import kafka.network.SocketServer +import kafka.server.{IntegrationTestUtils, KafkaConfig} +import kafka.test.annotation.{AutoStart, ClusterTest, ClusterTests, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.test.{ClusterConfig, ClusterInstance} +import org.apache.kafka.common.message.InitProducerIdRequestData +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests.{InitProducerIdRequest, InitProducerIdResponse} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.extension.ExtendWith + +import java.util.stream.{Collectors, IntStream} +import scala.jdk.CollectionConverters._ + +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +class ProducerIdsIntegrationTest { + + @BeforeEach + def setup(clusterConfig: ClusterConfig): Unit = { + clusterConfig.serverProperties().put(KafkaConfig.TransactionsTopicPartitionsProp, "1") + clusterConfig.serverProperties().put(KafkaConfig.TransactionsTopicReplicationFactorProp, "3") + } + + @ClusterTests(Array( + new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "2.8"), + new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0") + )) + def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = { + verifyUniqueIds(clusterInstance) + } + + @ClusterTest(clusterType = Type.ZK, brokers = 3, autoStart = AutoStart.NO) + def testUniqueProducerIdsBumpIBP(clusterInstance: ClusterInstance): Unit = { + clusterInstance.config().serverProperties().put(KafkaConfig.InterBrokerProtocolVersionProp, "2.8") + clusterInstance.config().brokerServerProperties(0).put(KafkaConfig.InterBrokerProtocolVersionProp, "3.0-IV0") + clusterInstance.start() + verifyUniqueIds(clusterInstance) + clusterInstance.stop() + } + + private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = { + // Request enough PIDs from each broker to ensure each broker generates two PID blocks + val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker => { + IntStream.range(0, 1001).parallel().mapToObj( _ => nextProducerId(broker, clusterInstance.clientListener())) + }).collect(Collectors.toList[Long]).asScala.toSeq + + assertEquals(3003, ids.size, "Expected exactly 3003 IDs") + assertEquals(ids.size, ids.distinct.size, "Found duplicate producer IDs") + } + + private def nextProducerId(broker: SocketServer, listener: ListenerName): Long = { + val data = new InitProducerIdRequestData() + .setProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH) + .setProducerId(RecordBatch.NO_PRODUCER_ID) + .setTransactionalId(null) + .setTransactionTimeoutMs(10) + val request = new InitProducerIdRequest.Builder(data).build() + + val response = IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request, + destination = broker, + listenerName = listener) + response.data().producerId() + } +} diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala index be3e8144f6859..9232bf03c0e0e 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala @@ -16,23 +16,44 @@ */ package kafka.coordinator.transaction -import kafka.zk.KafkaZkClient +import kafka.server.BrokerToControllerChannelManager +import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode} import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.message.AllocateProducerIdsResponseData +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.AllocateProducerIdsResponse +import org.apache.kafka.server.common.ProducerIdsBlock import org.easymock.{Capture, EasyMock} -import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{EnumSource, ValueSource} + +import java.util.stream.IntStream class ProducerIdManagerTest { - private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) + var brokerToController: BrokerToControllerChannelManager = EasyMock.niceMock(classOf[BrokerToControllerChannelManager]) + val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) - @AfterEach - def tearDown(): Unit = { - EasyMock.reset(zkClient) + // Mutable test implementation that lets us easily set the idStart and error + class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen: Int, var error: Errors = Errors.NONE) + extends RPCProducerIdManager(brokerId, () => 1, brokerToController, 100) { + + override private[transaction] def sendRequest(): Unit = { + if (error == Errors.NONE) { + handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse( + new AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen))) + idStart += idLen + } else { + handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse( + new AllocateProducerIdsResponseData().setErrorCode(error.code))) + } + } } @Test - def testGetProducerId(): Unit = { + def testGetProducerIdZk(): Unit = { var zkVersion: Option[Int] = None var data: Array[Byte] = null EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(() => @@ -53,34 +74,71 @@ class ProducerIdManagerTest { EasyMock.replay(zkClient) - val manager1 = new ProducerIdManager(0, zkClient) - val manager2 = new ProducerIdManager(1, zkClient) + val manager1 = new ZkProducerIdManager(0, zkClient) + val manager2 = new ZkProducerIdManager(1, zkClient) val pid1 = manager1.generateProducerId() val pid2 = manager2.generateProducerId() assertEquals(0, pid1) - assertEquals(ProducerIdManager.PidBlockSize, pid2) + assertEquals(ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, pid2) - for (i <- 1L until ProducerIdManager.PidBlockSize) + for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) assertEquals(pid1 + i, manager1.generateProducerId()) - for (i <- 1L until ProducerIdManager.PidBlockSize) + for (i <- 1L until ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) assertEquals(pid2 + i, manager2.generateProducerId()) - assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.generateProducerId()) - assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.generateProducerId()) + assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, manager1.generateProducerId()) + assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2, manager2.generateProducerId()) + + EasyMock.reset(zkClient) } @Test - def testExceedProducerIdLimit(): Unit = { + def testExceedProducerIdLimitZk(): Unit = { EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(() => { - val json = ProducerIdManager.generateProducerIdBlockJson( - ProducerIdBlock(0, Long.MaxValue - ProducerIdManager.PidBlockSize, Long.MaxValue)) + val json = ProducerIdBlockZNode.generateProducerIdBlockJson( + new ProducerIdsBlock(0, Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)) (Some(json), 0) }).anyTimes() EasyMock.replay(zkClient) - assertThrows(classOf[KafkaException], () => new ProducerIdManager(0, zkClient)) + assertThrows(classOf[KafkaException], () => new ZkProducerIdManager(0, zkClient)) + } + + @ParameterizedTest + @ValueSource(ints = Array(1, 2, 10)) + def testContiguousIds(idBlockLen: Int): Unit = { + val manager = new MockProducerIdManager(0, 0, idBlockLen) + + IntStream.range(0, idBlockLen * 3).forEach { i => + assertEquals(i, manager.generateProducerId()) + } + } + + @ParameterizedTest + @EnumSource(value = classOf[Errors], names = Array("UNKNOWN_SERVER_ERROR", "INVALID_REQUEST")) + def testUnrecoverableErrors(error: Errors): Unit = { + val manager = new MockProducerIdManager(0, 0, 1) + assertEquals(0, manager.generateProducerId()) + + manager.error = error + assertThrows(classOf[Throwable], () => manager.generateProducerId()) + + manager.error = Errors.NONE + assertEquals(1, manager.generateProducerId()) + } + + @Test + def testInvalidRanges(): Unit = { + var manager = new MockProducerIdManager(0, -1, 10) + assertThrows(classOf[KafkaException], () => manager.generateProducerId()) + + manager = new MockProducerIdManager(0, 0, -1) + assertThrows(classOf[KafkaException], () => manager.generateProducerId()) + + manager = new MockProducerIdManager(0, Long.MaxValue-1, 10) + assertThrows(classOf[KafkaException], () => manager.generateProducerId()) } } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala index ad337038761b4..e1786d0ee21ff 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala @@ -77,7 +77,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren for (i <- 0 until numPartitions) txnStateManager.addLoadedTransactionsToCache(i, coordinatorEpoch, new Pool[String, TransactionMetadata]()) - val pidGenerator: ProducerIdGenerator = EasyMock.createNiceMock(classOf[ProducerIdGenerator]) + val pidGenerator: ProducerIdManager = EasyMock.createNiceMock(classOf[ProducerIdManager]) EasyMock.expect(pidGenerator.generateProducerId()) .andAnswer(() => if (bumpProducerId) producerId + 1 else producerId) .anyTimes() diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index 72ccc8de6bf4d..f6b5e54dfe19a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -34,7 +34,7 @@ class TransactionCoordinatorTest { val time = new MockTime() var nextPid: Long = 0L - val pidGenerator: ProducerIdGenerator = EasyMock.createNiceMock(classOf[ProducerIdGenerator]) + val pidGenerator: ProducerIdManager = EasyMock.createNiceMock(classOf[ProducerIdManager]) val transactionManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager]) val transactionMarkerChannelManager: TransactionMarkerChannelManager = EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager]) val capturedTxn: Capture[TransactionMetadata] = EasyMock.newCapture() diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 5369fdd66fae7..6fc18f0d10c68 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -161,8 +161,19 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { /** * Restart any dead brokers */ - def restartDeadBrokers(): Unit = { + def restartDeadBrokers(reconfigure: Boolean = false): Unit = { + if (reconfigure) { + instanceConfigs = null + } for(i <- servers.indices if !alive(i)) { + if (reconfigure) { + servers(i) = TestUtils.createServer( + configs(i), + time = brokerTime(configs(i).brokerId), + threadNamePrefix = None, + enableForwarding + ) + } servers(i).startup() alive(i) = true } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 86a00b33b035d..757b82c13dbe8 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -131,7 +131,7 @@ class RequestQuotaTest extends BaseRequestTest { @Test def testResponseThrottleTime(): Unit = { - for (apiKey <- RequestQuotaTest.ClientActions) + for (apiKey <- RequestQuotaTest.ClientActions ++ RequestQuotaTest.ClusterActionsWithThrottle) submitTest(apiKey, () => checkRequestThrottleTime(apiKey)) waitAndCheckResults() @@ -160,7 +160,7 @@ class RequestQuotaTest extends BaseRequestTest { @Test def testExemptRequestTime(): Unit = { - for (apiKey <- RequestQuotaTest.ClusterActions) { + for (apiKey <- RequestQuotaTest.ClusterActions -- RequestQuotaTest.ClusterActionsWithThrottle) { submitTest(apiKey, () => checkExemptRequestMetric(apiKey)) } @@ -642,6 +642,8 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.LIST_TRANSACTIONS => new ListTransactionsRequest.Builder(new ListTransactionsRequestData()) + case ApiKeys.ALLOCATE_PRODUCER_IDS => + new AllocateProducerIdsRequest.Builder(new AllocateProducerIdsRequestData()) case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } @@ -762,6 +764,7 @@ class RequestQuotaTest extends BaseRequestTest { object RequestQuotaTest { val ClusterActions = ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet + val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS) val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE) val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions diff --git a/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java new file mode 100644 index 0000000000000..8a0fd8469f7f9 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/ProducerIdsBlock.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.Objects; + +/** + * Holds a range of Producer IDs used for Transactional and EOS producers. + * + * The start and end of the ID block are inclusive. + */ +public class ProducerIdsBlock { + public static final int PRODUCER_ID_BLOCK_SIZE = 1000; + + public static final ProducerIdsBlock EMPTY = new ProducerIdsBlock(-1, 0, 0); + + private final int brokerId; + private final long producerIdStart; + private final int producerIdLen; + + public ProducerIdsBlock(int brokerId, long producerIdStart, int producerIdLen) { + this.brokerId = brokerId; + this.producerIdStart = producerIdStart; + this.producerIdLen = producerIdLen; + } + + public int brokerId() { + return brokerId; + } + + public long producerIdStart() { + return producerIdStart; + } + + public int producerIdLen() { + return producerIdLen; + } + + public long producerIdEnd() { + return producerIdStart + producerIdLen - 1; + } + + + @Override + public String toString() { + return "ProducerIdsBlock{" + + "brokerId=" + brokerId + + ", producerIdStart=" + producerIdStart + + ", producerIdLen=" + producerIdLen + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ProducerIdsBlock that = (ProducerIdsBlock) o; + return brokerId == that.brokerId && producerIdStart == that.producerIdStart && producerIdLen == that.producerIdLen; + } + + @Override + public int hashCode() { + return Objects.hash(brokerId, producerIdStart, producerIdLen); + } +}