From e97cff2702b6ba836c7925caa36ab18066a7c95d Mon Sep 17 00:00:00 2001 From: David Arthur Date: Thu, 3 Jun 2021 19:23:32 -0400 Subject: [PATCH] KAFKA-12620 Allocate Producer IDs in KRaft controller (#10752) This is part 2 of KIP-730. Part 1 was in #10504. This PR adds QuorumController support for handling AllocateProducerIDs requests and managing the state of the latest producer ID block in the controller by committing this state to the metadata log. Reviewers: Colin P. McCabe --- .../apache/kafka/common/protocol/ApiKeys.java | 2 +- .../message/AllocateProducerIdsRequest.json | 2 +- .../common/message/InitProducerIdRequest.json | 2 +- .../scala/kafka/server/BrokerServer.scala | 29 +-- .../scala/kafka/server/ControllerApis.scala | 17 ++ .../main/scala/kafka/server/KafkaApis.scala | 2 +- .../metadata/BrokerMetadataListener.scala | 6 + .../test/java/kafka/test/MockController.java | 7 + .../ProducerIdsIntegrationTest.scala | 3 +- .../kafka/server/ControllerApisTest.scala | 8 + .../apache/kafka/controller/Controller.java | 11 ++ .../controller/ProducerIdControlManager.java | 85 +++++++++ .../kafka/controller/QuorumController.java | 26 ++- .../kafka/controller/ResultOrError.java | 8 + .../apache/kafka/timeline/TimelineLong.java | 6 +- .../common/metadata/ProducerIdsRecord.json | 29 +++ .../ProducerIdControlManagerTest.java | 173 ++++++++++++++++++ .../controller/QuorumControllerTest.java | 10 +- 18 files changed, 398 insertions(+), 28 deletions(-) create mode 100644 metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java create mode 100644 metadata/src/main/resources/common/metadata/ProducerIdsRecord.json create mode 100644 metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index bd28aa26aa977..3f42ee9d6889f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -108,7 +108,7 @@ public enum ApiKeys { UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true), DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS), LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS), - ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, false); + ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true); private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json index 0cfa494291a36..6f37313c3a6e4 100644 --- a/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json +++ b/clients/src/main/resources/common/message/AllocateProducerIdsRequest.json @@ -16,7 +16,7 @@ { "apiKey": 67, "type": "request", - "listeners": ["controller", "zkBroker"], + "listeners": ["zkBroker", "broker", "controller"], "name": "AllocateProducerIdsRequest", "validVersions": "0", "flexibleVersions": "0+", diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json index 5537aa95d3db8..4e75352db6f33 100644 --- a/clients/src/main/resources/common/message/InitProducerIdRequest.json +++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json @@ -16,7 +16,7 @@ { "apiKey": 22, "type": "request", - "listeners": ["zkBroker"], + "listeners": ["zkBroker", "broker"], "name": "InitProducerIdRequest", // Version 1 is the same as version 0. // diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index bea0c53c1d78f..4c76903173ab3 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils} -import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException} +import org.apache.kafka.common.{ClusterResource, Endpoint} import org.apache.kafka.metadata.{BrokerState, VersionRange} import org.apache.kafka.raft.RaftConfig import org.apache.kafka.raft.RaftConfig.AddressSpec @@ -244,11 +244,18 @@ class BrokerServer( // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics) + val producerIdManagerSupplier = () => ProducerIdManager.rpc( + config.brokerId, + brokerEpochSupplier = () => lifecycleManager.brokerEpoch(), + clientToControllerChannelManager, + config.requestTimeoutMs + ) + // Create transaction coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), - createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM) + producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM) autoTopicCreationManager = new DefaultAutoTopicCreationManager( config, Some(clientToControllerChannelManager), None, None, @@ -376,24 +383,6 @@ class BrokerServer( } } - class TemporaryProducerIdManager() extends ProducerIdManager { - val maxProducerIdsPerBrokerEpoch = 1000000 - var currentOffset = -1 - override def generateProducerId(): Long = { - currentOffset = currentOffset + 1 - if (currentOffset >= maxProducerIdsPerBrokerEpoch) { - fatal(s"Exhausted all demo/temporary producerIds as the next one will has extend past the block size of $maxProducerIdsPerBrokerEpoch") - throw new KafkaException("Have exhausted all demo/temporary producerIds.") - } - lifecycleManager.initialCatchUpFuture.get() - lifecycleManager.brokerEpoch() * maxProducerIdsPerBrokerEpoch + currentOffset - } - } - - def createTemporaryProducerIdManager(): ProducerIdManager = { - new TemporaryProducerIdManager() - } - def shutdown(): Unit = { if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return try { diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index fa0dc79465af3..96822e8e8810c 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -100,6 +100,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) + case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.CREATE_PARTITIONS => handleCreatePartitions(request) case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request) @@ -767,4 +768,20 @@ class ControllerApis(val requestChannel: RequestChannel, requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => new ListPartitionReassignmentsResponse(response.setThrottleTimeMs(requestThrottleMs))) } + + def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = { + val allocatedProducerIdsRequest = request.body[AllocateProducerIdsRequest] + authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + controller.allocateProducerIds(allocatedProducerIdsRequest.data) + .whenComplete((results, exception) => { + if (exception != null) { + requestHelper.handleError(request, exception) + } else { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + results.setThrottleTimeMs(requestThrottleMs) + new AllocateProducerIdsResponse(results) + }) + } + }) + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c5e140494b31e..2870bce6faa94 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -216,7 +216,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) - case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) + case ApiKeys.ALLOCATE_PRODUCER_IDS => maybeForwardToController(request, handleAllocateProducerIdsRequest) case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}") } } catch { diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala index 70e44c89e10ea..4b4d15ef48e6c 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala @@ -182,6 +182,7 @@ class BrokerMetadataListener( case rec: RemoveTopicRecord => handleRemoveTopicRecord(imageBuilder, rec) case rec: ConfigRecord => handleConfigRecord(rec) case rec: QuotaRecord => handleQuotaRecord(imageBuilder, rec) + case rec: ProducerIdsRecord => handleProducerIdRecord(rec) case _ => throw new RuntimeException(s"Unhandled record $record with type $recordType") } } @@ -259,6 +260,11 @@ class BrokerMetadataListener( clientQuotaManager.handleQuotaRecord(record) } + def handleProducerIdRecord(record: ProducerIdsRecord): Unit = { + // This is a no-op since brokers get their producer ID blocks directly from the controller via + // AllocateProducerIds RPC response + } + class HandleNewLeaderEvent(leaderAndEpoch: LeaderAndEpoch) extends EventQueue.FailureLoggingEvent(log) { override def run(): Unit = { diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index 1fba2952165b0..68e73fd331bd7 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterIsrRequestData; import org.apache.kafka.common.message.AlterIsrResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -301,6 +303,11 @@ public CompletableFuture waitForReadyBrokers(int minBrokers) { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture allocateProducerIds(AllocateProducerIdsRequestData request) { + throw new UnsupportedOperationException(); + } + @Override synchronized public CompletableFuture> createPartitions(long deadlineNs, List topicList) { diff --git a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala index 6c7c248e63188..d5e082a0a8e57 100644 --- a/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIdsIntegrationTest.scala @@ -44,7 +44,8 @@ class ProducerIdsIntegrationTest { @ClusterTests(Array( new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "2.8"), - new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0") + new ClusterTest(clusterType = Type.ZK, brokers = 3, ibp = "3.0-IV0"), + new ClusterTest(clusterType = Type.RAFT, brokers = 3, ibp = "3.0-IV0") )) def testUniqueProducerIds(clusterInstance: ClusterInstance): Unit = { verifyUniqueIds(clusterInstance) diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala index 418ee31e3ae03..89fbd0526e690 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala @@ -410,6 +410,14 @@ class ControllerApisTest { new AlterPartitionReassignmentsRequestData()).build()))) } + @Test + def testUnauthorizedHandleAllocateProducerIds(): Unit = { + assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis( + Some(createDenyAllAuthorizer()), new MockController.Builder().build()). + handleAllocateProducerIdsRequest(buildRequest(new AllocateProducerIdsRequest.Builder( + new AllocateProducerIdsRequestData()).build()))) + } + @Test def testUnauthorizedHandleListPartitionReassignments(): Unit = { assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis( diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index a34b084ea1302..3cb0d26ee6b60 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterIsrRequestData; import org.apache.kafka.common.message.AlterIsrResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -223,6 +225,15 @@ CompletableFuture> alterClientQuotas( Collection quotaAlterations, boolean validateOnly ); + /** + * Allocate a block of producer IDs for transactional and idempotent producers + * @param request The allocate producer IDs request + * @return A future which yields a new producer ID block as a response + */ + CompletableFuture allocateProducerIds( + AllocateProducerIdsRequestData request + ); + /** * Begin writing a controller snapshot. If there was already an ongoing snapshot, it * simply returns information about that snapshot rather than starting a new one. diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java new file mode 100644 index 0000000000000..924605c6d910f --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.ProducerIdsBlock; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineLong; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + + +public class ProducerIdControlManager { + + private final ClusterControlManager clusterControlManager; + private final TimelineLong lastProducerId; + + ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) { + this.clusterControlManager = clusterControlManager; + this.lastProducerId = new TimelineLong(snapshotRegistry, 0L); + } + + ControllerResult generateNextProducerId(int brokerId, long brokerEpoch) { + clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch); + + long producerId = lastProducerId.get(); + + if (producerId > Long.MAX_VALUE - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) { + throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId " + + "is will has exceeded long type limit"); + } + + long nextProducerId = producerId + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE; + ProducerIdsRecord record = new ProducerIdsRecord() + .setProducerIdsEnd(nextProducerId) + .setBrokerId(brokerId) + .setBrokerEpoch(brokerEpoch); + ProducerIdsBlock block = new ProducerIdsBlock(brokerId, producerId, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE); + return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), block); + } + + void replay(ProducerIdsRecord record) { + long currentProducerId = lastProducerId.get(); + if (record.producerIdsEnd() <= currentProducerId) { + throw new RuntimeException("Producer ID from record is not monotonically increasing"); + } else { + lastProducerId.set(record.producerIdsEnd()); + } + } + + Iterator> iterator(long epoch) { + List records = new ArrayList<>(1); + + long producerId = lastProducerId.get(epoch); + if (producerId > 0) { + records.add(new ApiMessageAndVersion( + new ProducerIdsRecord() + .setProducerIdsEnd(producerId) + .setBrokerId(0) + .setBrokerEpoch(0L), + (short) 0)); + } + return Collections.singleton(records).iterator(); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 746d9068488d9..bee0b69eff4e3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.NotControllerException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; +import org.apache.kafka.common.message.AllocateProducerIdsResponseData; import org.apache.kafka.common.message.AlterIsrRequestData; import org.apache.kafka.common.message.AlterIsrResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; @@ -44,6 +46,7 @@ import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.QuotaRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; @@ -359,7 +362,8 @@ void createSnapshotGenerator(long epoch) { new Section("cluster", clusterControl.iterator(epoch)), new Section("replication", replicationControl.iterator(epoch)), new Section("configuration", configurationControl.iterator(epoch)), - new Section("clientQuotas", clientQuotaControlManager.iterator(epoch)))); + new Section("clientQuotas", clientQuotaControlManager.iterator(epoch)), + new Section("producerIds", producerIdControlManager.iterator(epoch)))); reschedule(0); } @@ -855,6 +859,9 @@ private void replay(ApiMessage message, Optional snapshotId, lon case QUOTA_RECORD: clientQuotaControlManager.replay((QuotaRecord) message); break; + case PRODUCER_IDS_RECORD: + producerIdControlManager.replay((ProducerIdsRecord) message); + break; default: throw new RuntimeException("Unhandled record type " + type); } @@ -929,6 +936,12 @@ private void replay(ApiMessage message, Optional snapshotId, lon */ private final FeatureControlManager featureControl; + /** + * An object which stores the controller's view of the latest producer ID + * that has been generated. This must be accessed only by the event queue thread. + */ + private final ProducerIdControlManager producerIdControlManager; + /** * An object which stores the controller's view of topics and partitions. * This must be accessed only by the event queue thread. @@ -995,6 +1008,7 @@ private QuorumController(LogContext logContext, this.clusterControl = new ClusterControlManager(logContext, time, snapshotRegistry, sessionTimeoutNs, replicaPlacer); this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry); + this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder); this.replicationControl = new ReplicationControlManager(snapshotRegistry, logContext, defaultReplicationFactor, defaultNumPartitions, @@ -1199,6 +1213,16 @@ public CompletableFuture> alterClientQuotas( }); } + @Override + public CompletableFuture allocateProducerIds( + AllocateProducerIdsRequestData request) { + return appendWriteEvent("allocateProducerIds", + () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch())) + .thenApply(result -> new AllocateProducerIdsResponseData() + .setProducerIdStart(result.producerIdStart()) + .setProducerIdLen(result.producerIdLen())); + } + @Override public CompletableFuture> createPartitions(long deadlineNs, List topics) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java index 2fedacdb5e1c8..6a548c4e40255 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java @@ -42,6 +42,14 @@ public ResultOrError(T result) { this.result = result; } + public static ResultOrError of(T result) { + return new ResultOrError<>(result); + } + + public static ResultOrError of(ApiError error) { + return new ResultOrError<>(error); + } + public boolean isError() { return error != null; } diff --git a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java index e057391c4bcd3..36a300ff94998 100644 --- a/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java +++ b/metadata/src/main/java/org/apache/kafka/timeline/TimelineLong.java @@ -47,8 +47,12 @@ public void mergeFrom(long destinationEpoch, Delta delta) { private long value; public TimelineLong(SnapshotRegistry snapshotRegistry) { + this(snapshotRegistry, 0L); + } + + public TimelineLong(SnapshotRegistry snapshotRegistry, long value) { this.snapshotRegistry = snapshotRegistry; - this.value = 0; + this.value = value; } public long get() { diff --git a/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json new file mode 100644 index 0000000000000..09e6b536129e2 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/ProducerIdsRecord.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 15, + "type": "metadata", + "name": "ProducerIdsRecord", + "validVersions": "0", + "fields": [ + { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the requesting broker" }, + { "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1", + "about": "The epoch of the requesting broker" }, + { "name": "ProducerIdsEnd", "type": "int64", "versions": "0+", + "about": "The highest producer ID that has been generated"} + ] +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java new file mode 100644 index 0000000000000..f96510ddae11e --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.StaleBrokerEpochException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.ProducerIdsBlock; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +public class ProducerIdControlManagerTest { + + private SnapshotRegistry snapshotRegistry; + private ClusterControlManager clusterControl; + private ProducerIdControlManager producerIdControlManager; + + @BeforeEach + public void setUp() { + final LogContext logContext = new LogContext(); + final MockTime time = new MockTime(); + final Random random = new Random(); + snapshotRegistry = new SnapshotRegistry(logContext); + clusterControl = new ClusterControlManager( + logContext, time, snapshotRegistry, 1000, + new StripedReplicaPlacer(random)); + + clusterControl.activate(); + for (int i = 0; i < 4; i++) { + RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(i); + brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint(). + setSecurityProtocol(SecurityProtocol.PLAINTEXT.id). + setPort((short) 9092). + setName("PLAINTEXT"). + setHost(String.format("broker-%02d.example.org", i))); + clusterControl.replay(brokerRecord); + } + + this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); + } + + @Test + public void testInitialResult() { + ControllerResult result = + producerIdControlManager.generateNextProducerId(1, 100); + assertEquals(0, result.response().producerIdStart()); + assertEquals(1000, result.response().producerIdLen()); + ProducerIdsRecord record = (ProducerIdsRecord) result.records().get(0).message(); + assertEquals(1000, record.producerIdsEnd()); + } + + @Test + public void testMonotonic() { + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(42)); + + ProducerIdsBlock range = + producerIdControlManager.generateNextProducerId(1, 100).response(); + assertEquals(42, range.producerIdStart()); + + // Can't go backwards in Producer IDs + assertThrows(RuntimeException.class, () -> { + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(40)); + }, "Producer ID range must only increase"); + range = producerIdControlManager.generateNextProducerId(1, 100).response(); + assertEquals(42, range.producerIdStart()); + + // Gaps in the ID range are okay. + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(50)); + range = producerIdControlManager.generateNextProducerId(1, 100).response(); + assertEquals(50, range.producerIdStart()); + } + + @Test + public void testUnknownBrokerOrEpoch() { + ControllerResult result; + + assertThrows(StaleBrokerEpochException.class, () -> + producerIdControlManager.generateNextProducerId(99, 0)); + + assertThrows(StaleBrokerEpochException.class, () -> + producerIdControlManager.generateNextProducerId(1, 99)); + } + + @Test + public void testMaxValue() { + producerIdControlManager.replay( + new ProducerIdsRecord() + .setBrokerId(1) + .setBrokerEpoch(100) + .setProducerIdsEnd(Long.MAX_VALUE - 1)); + + assertThrows(UnknownServerException.class, () -> + producerIdControlManager.generateNextProducerId(1, 100)); + } + + @Test + public void testSnapshotIterator() { + ProducerIdsBlock range = null; + for (int i = 0; i < 100; i++) { + range = generateProducerIds(producerIdControlManager, i % 4, 100); + } + + Iterator> snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE); + assertTrue(snapshotIterator.hasNext()); + List batch = snapshotIterator.next(); + assertEquals(1, batch.size(), "Producer IDs record batch should only contain a single record"); + assertEquals(range.producerIdStart() + range.producerIdLen(), ((ProducerIdsRecord) batch.get(0).message()).producerIdsEnd()); + assertFalse(snapshotIterator.hasNext(), "Producer IDs iterator should only contain a single batch"); + + ProducerIdControlManager newProducerIdManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); + snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE); + while (snapshotIterator.hasNext()) { + snapshotIterator.next().forEach(message -> newProducerIdManager.replay((ProducerIdsRecord) message.message())); + } + + // Verify that after reloading state from this "snapshot", we don't produce any overlapping IDs + long lastProducerID = range.producerIdStart() + range.producerIdLen() - 1; + range = generateProducerIds(producerIdControlManager, 1, 100); + assertTrue(range.producerIdStart() > lastProducerID); + } + + static ProducerIdsBlock generateProducerIds( + ProducerIdControlManager producerIdControlManager, int brokerId, long brokerEpoch) { + ControllerResult result = + producerIdControlManager.generateNextProducerId(brokerId, brokerEpoch); + result.records().forEach(apiMessageAndVersion -> + producerIdControlManager.replay((ProducerIdsRecord) apiMessageAndVersion.message())); + return result.response(); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index c6114dee0891b..5a39f82c98cfc 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.message.AllocateProducerIdsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; @@ -54,6 +55,7 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.ProducerIdsRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection; import org.apache.kafka.common.metadata.RegisterBrokerRecord; @@ -267,6 +269,8 @@ public void testSnapshotSaveAndLoad() throws Throwable { setBrokerIds(Arrays.asList(1, 2, 0))). iterator()))).iterator()))).get(); fooId = fooData.topics().find("foo").topicId(); + active.allocateProducerIds( + new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get(); long snapshotEpoch = active.beginWritingSnapshot().get(); writer = snapshotWriterBuilder.writers.takeFirst(); assertEquals(snapshotEpoch, writer.epoch()); @@ -338,7 +342,11 @@ private void checkSnapshotContents(Uuid fooId, setEndPoints(new BrokerEndpointCollection(Arrays.asList( new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost"). setPort(9095).setSecurityProtocol((short) 0)).iterator())). - setRack(null), (short) 0))), + setRack(null), (short) 0)), + Arrays.asList(new ApiMessageAndVersion(new ProducerIdsRecord(). + setBrokerId(0). + setBrokerEpoch(brokerEpochs.get(0)). + setProducerIdsEnd(1000), (short) 0))), iterator); }