Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12620 Allocate producer ids on the controller #10504

Merged
merged 23 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2d4be26
Add new AllocateProducerIds RPC and support for ZK-mode
mumrah Apr 7, 2021
f8ef8a4
Fix ApiKeysTest throttling test
mumrah Apr 8, 2021
2c80252
Actually use throttling in KafkaApis for AllocateProducerIds RPC
mumrah Apr 8, 2021
8243d9c
Rely on request timeouts
mumrah Apr 8, 2021
fa8b4b2
Merge remote-tracking branch 'apache-github/trunk' into kafka-12620-p…
mumrah May 5, 2021
ed5df4a
Merge remote-tracking branch 'apache-github/trunk' into kafka-12620-p…
mumrah May 11, 2021
16cc8bf
Move data class to common module.
mumrah May 11, 2021
e89c4bb
Use new ProducerIdsBlock as callback type in controller
mumrah May 11, 2021
1861526
Clean up a few things
mumrah May 11, 2021
aede4fc
Remove unwanted whitespace
mumrah May 11, 2021
1ff2398
Add IBP 3.0-IV0 for gating this new RPC
mumrah May 12, 2021
1260dac
Merge remote-tracking branch 'apache-github/trunk' into kafka-12620-p…
mumrah May 12, 2021
1915452
Ensure the broker to controller channel is shutdown
mumrah May 12, 2021
d5ceeab
Remove redundant test case
mumrah May 12, 2021
e72431b
Add per-broker override to testkit
mumrah May 12, 2021
726ab08
Fix an integration test
mumrah May 12, 2021
86d7d8d
Fix dependencies in KafkaServer
mumrah May 14, 2021
e88a37a
Fixed throttle integration test
mumrah May 14, 2021
8afade4
Don't eaglery fetch first ID block when using the RPC
mumrah May 14, 2021
e37571b
More PR feedback
mumrah May 18, 2021
437ee4b
Clean up ZK impl
mumrah May 20, 2021
4c83cd7
Make the naming consistent
mumrah May 20, 2021
3262171
Loosen test case
mumrah May 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public enum ApiKeys {
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false),
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true),
DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS);
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, false);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return DescribeTransactionsRequest.parse(buffer, apiVersion);
case LIST_TRANSACTIONS:
return ListTransactionsRequest.parse(buffer, apiVersion);
case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return DescribeTransactionsResponse.parse(responseBuffer, version);
case LIST_TRANSACTIONS:
return ListTransactionsResponse.parse(responseBuffer, version);
case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;

public class AllocateProducerIdsRequest extends AbstractRequest {
private final AllocateProducerIdsRequestData data;

public AllocateProducerIdsRequest(AllocateProducerIdsRequestData data, short version) {
super(ApiKeys.ALLOCATE_PRODUCER_IDS, version);
this.data = data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code()));
}

@Override
public AllocateProducerIdsRequestData data() {
return data;
}

public static class Builder extends AbstractRequest.Builder<AllocateProducerIdsRequest> {

private final AllocateProducerIdsRequestData data;

public Builder(AllocateProducerIdsRequestData data) {
super(ApiKeys.ALLOCATE_PRODUCER_IDS);
this.data = data;
}

@Override
public AllocateProducerIdsRequest build(short version) {
return new AllocateProducerIdsRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

public static AllocateProducerIdsRequest parse(ByteBuffer buffer, short version) {
return new AllocateProducerIdsRequest(new AllocateProducerIdsRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

public class AllocateProducerIdsResponse extends AbstractResponse {

private final AllocateProducerIdsResponseData data;

public AllocateProducerIdsResponse(AllocateProducerIdsResponseData data) {
super(ApiKeys.ALLOCATE_PRODUCER_IDS);
this.data = data;
}

@Override
public AllocateProducerIdsResponseData data() {
return data;
}

/**
* The number of each type of error in the response, including {@link Errors#NONE} and top-level errors as well as
* more specifically scoped errors (such as topic or partition-level errors).
*
* @return A count of errors.
*/
@Override
public Map<Errors, Integer> errorCounts() {
return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

public static AllocateProducerIdsResponse parse(ByteBuffer buffer, short version) {
return new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData(
new ByteBufferAccessor(buffer), version));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implie
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 67,
"type": "request",
"listeners": ["controller", "zkBroker"],
"name": "AllocateProducerIdsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The ID of the requesting broker" },
{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1",
"about": "The epoch of the requesting broker" }
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 67,
"type": "response",
"name": "AllocateProducerIdsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The top level response error code" },
{ "name": "ProducerIdStart", "type": "int64", "versions": "0+", "entityType": "producerId",
"about": "The first producer ID in this range, inclusive"},
{ "name": "ProducerIdLen", "type": "int32", "versions": "0+",
"about": "The number of producer IDs in this range"}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testAlterIsrIsClusterAction() {
public void testResponseThrottleTime() {
Set<ApiKeys> authenticationKeys = EnumSet.of(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE);
// Newer protocol apis include throttle time ms even for cluster actions
Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR);
Set<ApiKeys> clusterActionsWithThrottleTimeMs = EnumSet.of(ApiKeys.ALTER_ISR, ApiKeys.ALLOCATE_PRODUCER_IDS);
for (ApiKeys apiKey: ApiKeys.zkBrokerApis()) {
Schema responseSchema = apiKey.messageType.responseSchemas()[apiKey.latestVersion()];
BoundField throttleTimeField = responseSchema.get("throttle_time_ms");
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/kafka/api/ApiVersion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ object ApiVersion {
// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
KAFKA_2_8_IV0,
// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516)
KAFKA_2_8_IV1
KAFKA_2_8_IV1,
// Introduce AllocateProducerIds (KIP-730)
KAFKA_3_0_IV0
)

// Map keys are the union of the short and full versions
Expand Down Expand Up @@ -197,6 +199,8 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {

def isAlterIsrSupported: Boolean = this >= KAFKA_2_7_IV2

def isAllocateProducerIdsSupported: Boolean = this >= KAFKA_3_0_IV0

override def compare(that: ApiVersion): Int =
ApiVersion.orderingByVersion.compare(this, that)

Expand Down Expand Up @@ -447,6 +451,13 @@ case object KAFKA_2_8_IV1 extends DefaultApiVersion {
val id: Int = 32
}

case object KAFKA_3_0_IV0 extends DefaultApiVersion {
val shortVersion: String = "3.0"
val subVersion = "IV0"
val recordVersion = RecordVersion.V2
val id: Int = 33
}

object ApiVersionValidator extends Validator {

override def ensureValid(name: String, value: Any): Unit = {
Expand Down
64 changes: 60 additions & 4 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package kafka.controller

import java.util
import java.util.concurrent.TimeUnit

import kafka.admin.AdminOperationException
import kafka.api._
import kafka.common._
import kafka.controller.KafkaController.AlterIsrCallback
import kafka.cluster.Broker
import kafka.controller.KafkaController.{AlterReassignmentsCallback, ElectLeadersCallback, ListReassignmentsCallback, UpdateFeaturesCallback}
import kafka.coordinator.transaction.ZkProducerIdManager
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server._
import kafka.utils._
Expand All @@ -37,20 +37,20 @@ import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
import org.apache.kafka.common.message.{AlterIsrRequestData, AlterIsrResponseData}
import org.apache.kafka.common.message.{AllocateProducerIdsRequestData, AllocateProducerIdsResponseData, AlterIsrRequestData, AlterIsrResponseData, UpdateFeaturesRequestData}
import org.apache.kafka.common.feature.{Features, FinalizedVersionRange}
import org.apache.kafka.common.message.UpdateFeaturesRequestData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiError, LeaderAndIsrResponse, UpdateFeaturesRequest, UpdateMetadataResponse}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.common.ProducerIdsBlock
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code

import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Try}
import scala.util.{Failure, Success, Try}

sealed trait ElectionTrigger
final case object AutoTriggered extends ElectionTrigger
Expand Down Expand Up @@ -2376,6 +2376,54 @@ class KafkaController(val config: KafkaConfig,
}
}

def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData,
callback: AllocateProducerIdsResponseData => Unit): Unit = {

def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit = {
results match {
case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code))
case Right(pidBlock) => callback.apply(
new AllocateProducerIdsResponseData()
.setProducerIdStart(pidBlock.producerIdStart())
.setProducerIdLen(pidBlock.producerIdLen()))
}
}
eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
}

def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = {
// Handle a few short-circuits
if (!isActive) {
callback.apply(Left(Errors.NOT_CONTROLLER))
return
}

val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
if (brokerEpochOpt.isEmpty) {
warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
callback.apply(Left(Errors.BROKER_ID_NOT_REGISTERED))
return
}

if (!brokerEpochOpt.contains(brokerEpoch)) {
warn(s"Ignoring AllocateProducerIds due to stale broker epoch $brokerEpoch for broker $brokerId")
callback.apply(Left(Errors.STALE_BROKER_EPOCH))
return
}

val maybeNewProducerIdsBlock = try {
Try(ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this))
} catch {
case ke: KafkaException => Failure(ke)
}

maybeNewProducerIdsBlock match {
case Failure(exception) => callback.apply(Left(Errors.forException(exception)))
case Success(newProducerIdBlock) => callback.apply(Right(newProducerIdBlock))
}
}

private def processControllerChange(): Unit = {
maybeResign()
}
Expand Down Expand Up @@ -2454,6 +2502,8 @@ class KafkaController(val config: KafkaConfig,
processIsrChangeNotification()
case AlterIsrReceived(brokerId, brokerEpoch, isrsToAlter, callback) =>
processAlterIsr(brokerId, brokerEpoch, isrsToAlter, callback)
case AllocateProducerIds(brokerId, brokerEpoch, callback) =>
processAllocateProducerIds(brokerId, brokerEpoch, callback)
case Startup =>
processStartup()
}
Expand Down Expand Up @@ -2747,6 +2797,12 @@ case class UpdateFeatures(request: UpdateFeaturesRequest,
override def preempt(): Unit = {}
}

case class AllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit)
extends ControllerEvent {
override def state: ControllerState = ControllerState.Idle
override def preempt(): Unit = {}
}


// Used only in test cases
abstract class MockEvent(val state: ControllerState) extends ControllerEvent {
Expand Down
Loading