Skip to content

Commit

Permalink
KAFKA-12620 Allocate Producer IDs in KRaft controller (#10752)
Browse files Browse the repository at this point in the history
This is part 2 of KIP-730. Part 1 was in #10504.

This PR adds QuorumController support for handling AllocateProducerIDs requests
and managing the state of the latest producer ID block in the controller by committing
this state to the metadata log.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
mumrah authored Jun 3, 2021
1 parent 93dca8e commit e97cff2
Show file tree
Hide file tree
Showing 18 changed files with 398 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public enum ApiKeys {
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true),
DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, false);
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 67,
"type": "request",
"listeners": ["controller", "zkBroker"],
"listeners": ["zkBroker", "broker", "controller"],
"name": "AllocateProducerIdsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{
"apiKey": 22,
"type": "request",
"listeners": ["zkBroker"],
"listeners": ["zkBroker", "broker"],
"name": "InitProducerIdRequest",
// Version 1 is the same as version 0.
//
Expand Down
29 changes: 9 additions & 20 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
Expand Down Expand Up @@ -244,11 +244,18 @@ class BrokerServer(
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)

val producerIdManagerSupplier = () => ProducerIdManager.rpc(
config.brokerId,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch(),
clientToControllerChannelManager,
config.requestTimeoutMs
)

// Create transaction coordinator, but don't start it until we've started replica manager.
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
transactionCoordinator = TransactionCoordinator(config, replicaManager,
new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM)
producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)

autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config, Some(clientToControllerChannelManager), None, None,
Expand Down Expand Up @@ -376,24 +383,6 @@ class BrokerServer(
}
}

class TemporaryProducerIdManager() extends ProducerIdManager {
val maxProducerIdsPerBrokerEpoch = 1000000
var currentOffset = -1
override def generateProducerId(): Long = {
currentOffset = currentOffset + 1
if (currentOffset >= maxProducerIdsPerBrokerEpoch) {
fatal(s"Exhausted all demo/temporary producerIds as the next one will has extend past the block size of $maxProducerIdsPerBrokerEpoch")
throw new KafkaException("Have exhausted all demo/temporary producerIds.")
}
lifecycleManager.initialCatchUpFuture.get()
lifecycleManager.brokerEpoch() * maxProducerIdsPerBrokerEpoch + currentOffset
}
}

def createTemporaryProducerIdManager(): ProducerIdManager = {
new TemporaryProducerIdManager()
}

def shutdown(): Unit = {
if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
try {
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request)
case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
Expand Down Expand Up @@ -767,4 +768,20 @@ class ControllerApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs)))
}

def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
controller.allocateProducerIds(allocatedProducerIdsRequest.data)
.whenComplete((results, exception) => {
if (exception != null) {
requestHelper.handleError(request, exception)
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
results.setThrottleTimeMs(requestThrottleMs)
new AllocateProducerIdsResponse(results)
})
}
})
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => maybeForwardToController(request, handleAllocateProducerIdsRequest)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class BrokerMetadataListener(
case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec)
case rec: ConfigRecord => handleConfigRecord(rec)
case rec: QuotaRecord => handleQuotaRecord(imageBuilder, rec)
case rec: ProducerIdsRecord => handleProducerIdRecord(rec)
case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType")
}
}
Expand Down Expand Up @@ -259,6 +260,11 @@ class BrokerMetadataListener(
clientQuotaManager.handleQuotaRecord(record)
}

def handleProducerIdRecord(record: ProducerIdsRecord): Unit = {
// This is a no-op since brokers get their producer ID blocks directly from the controller via
// AllocateProducerIds RPC response
}

class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch)
extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/java/kafka/test/MockController.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
Expand Down Expand Up @@ -301,6 +303,11 @@ public CompletableFuture<Void> waitForReadyBrokers(int minBrokers) {
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(AllocateProducerIdsRequestData request) {
throw new UnsupportedOperationException();
}

@Override
synchronized public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(long deadlineNs, List<CreatePartitionsTopic> topicList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class ProducerIdsIntegrationTest {

@ClusterTests(Array(
new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "2.8"),
new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0")
new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0"),
new ClusterTest(clusterType = Type.RAFT, brokers = 3, ibp = "3.0-IV0")
))
def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = {
verifyUniqueIds(clusterInstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,14 @@ class ControllerApisTest {
new AlterPartitionReassignmentsRequestData()).build())))
}

@Test
def testUnauthorizedHandleAllocateProducerIds(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
handleAllocateProducerIdsRequest(buildRequest(new AllocateProducerIdsRequest.Builder(
new AllocateProducerIdsRequestData()).build())))
}

@Test
def testUnauthorizedHandleListPartitionReassignments(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
Expand Down
11 changes: 11 additions & 0 deletions metadata/src/main/java/org/apache/kafka/controller/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
Expand Down Expand Up @@ -223,6 +225,15 @@ CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly
);

/**
* Allocate a block of producer IDs for transactional and idempotent producers
* @param request The allocate producer IDs request
* @return A future which yields a new producer ID block as a response
*/
CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
AllocateProducerIdsRequestData request
);

/**
* Begin writing a controller snapshot. If there was already an ongoing snapshot, it
* simply returns information about that snapshot rather than starting a new one.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller;

import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;


public class ProducerIdControlManager {

private final ClusterControlManager clusterControlManager;
private final TimelineLong lastProducerId;

ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
this.clusterControlManager = clusterControlManager;
this.lastProducerId = new TimelineLong(snapshotRegistry, 0L);
}

ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {
clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch);

long producerId = lastProducerId.get();

if (producerId > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId " +
"is will has exceeded long type limit");
}

long nextProducerId = producerId + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE;
ProducerIdsRecord record = new ProducerIdsRecord()
.setProducerIdsEnd(nextProducerId)
.setBrokerId(brokerId)
.setBrokerEpoch(brokerEpoch);
ProducerIdsBlock block = new ProducerIdsBlock(brokerId, producerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE);
return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), block);
}

void replay(ProducerIdsRecord record) {
long currentProducerId = lastProducerId.get();
if (record.producerIdsEnd() <= currentProducerId) {
throw new RuntimeException("Producer ID from record is not monotonically increasing");
} else {
lastProducerId.set(record.producerIdsEnd());
}
}

Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
List<ApiMessageAndVersion> records = new ArrayList<>(1);

long producerId = lastProducerId.get(epoch);
if (producerId > 0) {
records.add(new ApiMessageAndVersion(
new ProducerIdsRecord()
.setProducerIdsEnd(producerId)
.setBrokerId(0)
.setBrokerEpoch(0L),
(short) 0));
}
return Collections.singleton(records).iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
Expand All @@ -44,6 +46,7 @@
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.QuotaRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
Expand Down Expand Up @@ -359,7 +362,8 @@ void createSnapshotGenerator(long epoch) {
new Section("cluster", clusterControl.iterator(epoch)),
new Section("replication", replicationControl.iterator(epoch)),
new Section("configuration", configurationControl.iterator(epoch)),
new Section("clientQuotas", clientQuotaControlManager.iterator(epoch))));
new Section("clientQuotas", clientQuotaControlManager.iterator(epoch)),
new Section("producerIds", producerIdControlManager.iterator(epoch))));
reschedule(0);
}

Expand Down Expand Up @@ -855,6 +859,9 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon
case QUOTA_RECORD:
clientQuotaControlManager.replay((QuotaRecord) message);
break;
case PRODUCER_IDS_RECORD:
producerIdControlManager.replay((ProducerIdsRecord) message);
break;
default:
throw new RuntimeException("Unhandled record type " + type);
}
Expand Down Expand Up @@ -929,6 +936,12 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon
*/
private final FeatureControlManager featureControl;

/**
* An object which stores the controller's view of the latest producer ID
* that has been generated. This must be accessed only by the event queue thread.
*/
private final ProducerIdControlManager producerIdControlManager;

/**
* An object which stores the controller's view of topics and partitions.
* This must be accessed only by the event queue thread.
Expand Down Expand Up @@ -995,6 +1008,7 @@ private QuorumController(LogContext logContext,
this.clusterControl = new ClusterControlManager(logContext, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacer);
this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder);
this.replicationControl = new ReplicationControlManager(snapshotRegistry,
logContext, defaultReplicationFactor, defaultNumPartitions,
Expand Down Expand Up @@ -1199,6 +1213,16 @@ public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
});
}

@Override
public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
AllocateProducerIdsRequestData request) {
return appendWriteEvent("allocateProducerIds",
() -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch()))
.thenApply(result -> new AllocateProducerIdsResponseData()
.setProducerIdStart(result.producerIdStart())
.setProducerIdLen(result.producerIdLen()));
}

@Override
public CompletableFuture<List<CreatePartitionsTopicResult>>
createPartitions(long deadlineNs, List<CreatePartitionsTopic> topics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public ResultOrError(T result) {
this.result = result;
}

public static <T> ResultOrError<T> of(T result) {
return new ResultOrError<>(result);
}

public static <T> ResultOrError<T> of(ApiError error) {
return new ResultOrError<>(error);
}

public boolean isError() {
return error != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ public void mergeFrom(long destinationEpoch, Delta delta) {
private long value;

public TimelineLong(SnapshotRegistry snapshotRegistry) {
this(snapshotRegistry, 0L);
}

public TimelineLong(SnapshotRegistry snapshotRegistry, long value) {
this.snapshotRegistry = snapshotRegistry;
this.value = 0;
this.value = value;
}

public long get() {
Expand Down
Loading

0 comments on commit e97cff2

Please sign in to comment.