Skip to content

Commit

Permalink
KAFKA-12620 Allocate producer ids on the controller (#10504)
Browse files Browse the repository at this point in the history
Introduce new AllocateProducerIds RPC and IBP 3.0-IV0 as part of KIP-730.

This change adds a new AllocateProducerIds RPC which is used by the broker to request a block of 
producer IDs from the controller. The new IBP added will determine if the broker should talk directly to 
ZooKeeper (IBP < 3.0) or it if should use the new RPC to talk to the controller (IBP >= 3.0).

Per-broker property overrides for ClusterTests were also added (in order to test mixed IBPs in a cluster)

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
mumrah authored May 21, 2021
1 parent aa25176 commit 72d1082
Show file tree
Hide file tree
Showing 32 changed files with 868 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public enum ApiKeys {
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false),
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true),
DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS);
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, false);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return DescribeTransactionsRequest.parse(buffer, apiVersion);
case LIST_TRANSACTIONS:
return ListTransactionsRequest.parse(buffer, apiVersion);
case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return DescribeTransactionsResponse.parse(responseBuffer, version);
case LIST_TRANSACTIONS:
return ListTransactionsResponse.parse(responseBuffer, version);
case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;

public class AllocateProducerIdsRequest extends AbstractRequest {
private final AllocateProducerIdsRequestData data;

public AllocateProducerIdsRequest(AllocateProducerIdsRequestData data, short version) {
super(ApiKeys.ALLOCATE_PRODUCER_IDS, version);
this.data = data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code()));
}

@Override
public AllocateProducerIdsRequestData data() {
return data;
}

public static class Builder extends AbstractRequest.Builder<AllocateProducerIdsRequest> {

private final AllocateProducerIdsRequestData data;

public Builder(AllocateProducerIdsRequestData data) {
super(ApiKeys.ALLOCATE_PRODUCER_IDS);
this.data = data;
}

@Override
public AllocateProducerIdsRequest build(short version) {
return new AllocateProducerIdsRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

public static AllocateProducerIdsRequest parse(ByteBuffer buffer, short version) {
return new AllocateProducerIdsRequest(new AllocateProducerIdsRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

public class AllocateProducerIdsResponse extends AbstractResponse {

private final AllocateProducerIdsResponseData data;

public AllocateProducerIdsResponse(AllocateProducerIdsResponseData data) {
super(ApiKeys.ALLOCATE_PRODUCER_IDS);
this.data = data;
}

@Override
public AllocateProducerIdsResponseData data() {
return data;
}

/**
* The number of each type of error in the response, including {@link Errors#NONE} and top-level errors as well as
* more specifically scoped errors (such as topic or partition-level errors).
*
* @return A count of errors.
*/
@Override
public Map<Errors, Integer> errorCounts() {
return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short version) {
return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData(
new ByteBufferAccessor(buffer), version));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implie
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 67,
"type": "request",
"listeners": ["controller", "zkBroker"],
"name": "AllocateProducerIdsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" }
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 67,
"type": "response",
"name": "AllocateProducerIdsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "ProducerIdStart", "type": "int64", "versions": "0+", "entityType": "producerId",
"about": "The first producer ID in this range, inclusive"},
{ "name": "ProducerIdLen", "type": "int32", "versions": "0+",
"about": "The number of producer IDs in this range"}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testAlterIsrIsClusterAction() {
public void testResponseThrottleTime() {
Set<ApiKeys> authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
// Newer protocol apis include throttle time ms even for cluster actions
Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR);
Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR, ApiKeys.ALLOCATE_PRODUCER_IDS);
for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) {
Schema responseSchema = apiKey.messageType.responseSchemas()[apiKey.latestVersion()];
BoundField throttleTimeField = responseSchema.get("throttle_time_ms");
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/kafka/api/ApiVersion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ object ApiVersion {
// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
KAFKA_2_8_IV0,
// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516)
KAFKA_2_8_IV1
KAFKA_2_8_IV1,
// Introduce AllocateProducerIds (KIP-730)
KAFKA_3_0_IV0
)

// Map keys are the union of the short and full versions
Expand Down Expand Up @@ -197,6 +199,8 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {

def isAlterIsrSupported: Boolean = this >= KAFKA_2_7_IV2

def isAllocateProducerIdsSupported: Boolean = this >= KAFKA_3_0_IV0

override def compare(that: ApiVersion): Int =
ApiVersion.orderingByVersion.compare(this, that)

Expand Down Expand Up @@ -447,6 +451,13 @@ case object KAFKA_2_8_IV1 extends DefaultApiVersion {
val id: Int = 32
}

case object KAFKA_3_0_IV0 extends DefaultApiVersion {
val shortVersion: String = "3.0"
val subVersion = "IV0"
val recordVersion = RecordVersion.V2
val id: Int = 33
}

object ApiVersionValidator extends Validator {

override def ensureValid(name: String, value: Any): Unit = {
Expand Down
64 changes: 60 additions & 4 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package kafka.controller

import java.util
import java.util.concurrent.TimeUnit

import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
import kafka.controller.KafkaController.AlterIsrCallback
import kafka.cluster.Broker
import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server._
import kafka.utils._
Expand All @@ -37,20 +37,20 @@ import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData}
import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterIsrRequestData, AlterIsrResponseData, UpdateFeaturesRequestData}
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
import org.apache.kafka.common.message.UpdateFeaturesRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.ProducerIdsBlock
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code

import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Try}
import scala.util.{Failure, Success, Try}

sealed trait ElectionTrigger
final case object AutoTriggered extends ElectionTrigger
Expand Down Expand Up @@ -2376,6 +2376,54 @@ class KafkaController(val config: KafkaConfig,
}
}

def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData,
callback: AllocateProducerIdsResponseData => Unit): Unit = {

def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit = {
results match {
case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code))
case Right(pidBlock) => callback.apply(
new AllocateProducerIdsResponseData()
.setProducerIdStart(pidBlock.producerIdStart())
.setProducerIdLen(pidBlock.producerIdLen()))
}
}
eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
}

def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = {
// Handle a few short-circuits
if (!isActive) {
callback.apply(Left(Errors.NOT_CONTROLLER))
return
}

val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
if (brokerEpochOpt.isEmpty) {
warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
callback.apply(Left(Errors.BROKER_ID_NOT_REGISTERED))
return
}

if (!brokerEpochOpt.contains(brokerEpoch)) {
warn(s"Ignoring AllocateProducerIds due to stale broker epoch $brokerEpoch for broker $brokerId")
callback.apply(Left(Errors.STALE_BROKER_EPOCH))
return
}

val maybeNewProducerIdsBlock = try {
Try(ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this))
} catch {
case ke: KafkaException => Failure(ke)
}

maybeNewProducerIdsBlock match {
case Failure(exception) => callback.apply(Left(Errors.forException(exception)))
case Success(newProducerIdBlock) => callback.apply(Right(newProducerIdBlock))
}
}

private def processControllerChange(): Unit = {
maybeResign()
}
Expand Down Expand Up @@ -2454,6 +2502,8 @@ class KafkaController(val config: KafkaConfig,
processIsrChangeNotification()
case AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback) =>
processAlterIsr(brokerId, brokerEpoch, isrsToAlter, callback)
case AllocateProducerIds(brokerId, brokerEpoch, callback) =>
processAllocateProducerIds(brokerId, brokerEpoch, callback)
case Startup =>
processStartup()
}
Expand Down Expand Up @@ -2747,6 +2797,12 @@ case class UpdateFeatures(request: UpdateFeaturesRequest,
override def preempt(): Unit = {}
}

case class AllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit)
extends ControllerEvent {
override def state: ControllerState = ControllerState.Idle
override def preempt(): Unit = {}
}


// Used only in test cases
abstract class MockEvent(val state: ControllerState) extends ControllerEvent {
Expand Down
Loading

0 comments on commit 72d1082

Please sign in to comment.