From a92b986c855592d630fbabf49d1e9a160ad5b230 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 2 Mar 2021 08:20:47 -0600 Subject: [PATCH] KAFKA-12268: Implement task idling semantics via currentLag API (#10137) Implements KIP-695 Reverts a previous behavior change to Consumer.poll and replaces it with a new Consumer.currentLag API, which returns the client's currently cached lag. Uses this new API to implement the desired task idling semantics improvement from KIP-695. Reverts fdcf8fbf72bee9e672d0790cdbe5539846f7dc8e / KAFKA-10866: Add metadata to ConsumerRecords (#9836) Reviewers: Chia-Ping Tsai , Guozhang Wang --- build.gradle | 1 - checkstyle/suppressions.xml | 5 +- .../kafka/clients/consumer/Consumer.java | 6 + .../clients/consumer/ConsumerRecords.java | 103 +---------- .../kafka/clients/consumer/KafkaConsumer.java | 36 +++- .../kafka/clients/consumer/MockConsumer.java | 32 ++-- .../consumer/internals/FetchedRecords.java | 102 ----------- .../clients/consumer/internals/Fetcher.java | 50 +++--- .../consumer/internals/SubscriptionState.java | 14 +- .../clients/consumer/KafkaConsumerTest.java | 163 +++++------------- .../consumer/internals/FetcherTest.java | 28 +-- .../kafka/api/PlaintextConsumerTest.scala | 36 ++-- .../processor/internals/PartitionGroup.java | 99 +++++------ .../processor/internals/StandbyTask.java | 6 - .../processor/internals/StreamTask.java | 12 +- .../processor/internals/StreamThread.java | 9 +- .../streams/processor/internals/Task.java | 6 - .../processor/internals/TaskManager.java | 3 +- .../internals/ActiveTaskCreatorTest.java | 2 +- .../internals/PartitionGroupTest.java | 29 ++-- .../processor/internals/StreamTaskTest.java | 10 -- .../processor/internals/TaskManagerTest.java | 6 - .../kafka/streams/TopologyTestDriver.java | 16 -- 23 files changed, 203 insertions(+), 571 deletions(-) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java diff --git a/build.gradle b/build.gradle index 05f77e054f04a..4838a35bf7c03 100644 --- a/build.gradle +++ b/build.gradle @@ -1125,7 +1125,6 @@ project(':clients') { testCompile libs.bcpkix testCompile libs.junitJupiter testCompile libs.mockitoCore - testCompile libs.hamcrest testRuntime libs.slf4jlog4j testRuntime libs.jacksonDatabind diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 46fb97b3e197e..33273fef15c85 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -101,10 +101,7 @@ files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/> - - + files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index ee4a70770af97..b324773093aad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -243,6 +244,11 @@ public interface Consumer extends Closeable { */ Map endOffsets(Collection partitions, Duration timeout); + /** + * @see KafkaConsumer#currentLag(TopicPartition) + */ + OptionalLong currentLag(TopicPartition topicPartition); + /** * @see KafkaConsumer#groupMetadata() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java index cc5317030bfa4..92390e91907e3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java @@ -16,13 +16,11 @@ */ package org.apache.kafka.clients.consumer; -import org.apache.kafka.clients.consumer.internals.FetchedRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.AbstractIterator; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -34,99 +32,12 @@ * partition returned by a {@link Consumer#poll(java.time.Duration)} operation. */ public class ConsumerRecords implements Iterable> { - public static final ConsumerRecords EMPTY = new ConsumerRecords<>( - Collections.emptyMap(), - Collections.emptyMap() - ); + public static final ConsumerRecords EMPTY = new ConsumerRecords<>(Collections.emptyMap()); private final Map>> records; - private final Map metadata; - public static final class Metadata { - - private final long receivedTimestamp; - private final Long position; - private final Long endOffset; - - public Metadata(final long receivedTimestamp, - final Long position, - final Long endOffset) { - this.receivedTimestamp = receivedTimestamp; - this.position = position; - this.endOffset = endOffset; - } - - /** - * @return The timestamp of the broker response that contained this metadata - */ - public long receivedTimestamp() { - return receivedTimestamp; - } - - /** - * @return The next position the consumer will fetch, or null if the consumer has no position. - */ - public Long position() { - return position; - } - - /** - * @return The lag between the next position to fetch and the current end of the partition, or - * null if the end offset is not known or there is no position. - */ - public Long lag() { - return endOffset == null || position == null ? null : endOffset - position; - } - - /** - * @return The current last offset in the partition. The determination of the "last" offset - * depends on the Consumer's isolation level. Under "read_uncommitted," this is the last successfully - * replicated offset plus one. Under "read_committed," this is the minimum of the last successfully - * replicated offset plus one or the smallest offset of any open transaction. Null if the end offset - * is not known. - */ - public Long endOffset() { - return endOffset; - } - - @Override - public String toString() { - return "Metadata{" + - "receivedTimestamp=" + receivedTimestamp + - ", position=" + position + - ", endOffset=" + endOffset + - '}'; - } - } - - private static Map extractMetadata(final FetchedRecords fetchedRecords) { - final Map metadata = new HashMap<>(); - for (final Map.Entry entry : fetchedRecords.metadata().entrySet()) { - metadata.put( - entry.getKey(), - new Metadata( - entry.getValue().receivedTimestamp(), - entry.getValue().position() == null ? null : entry.getValue().position().offset, - entry.getValue().endOffset() - ) - ); - } - return metadata; - } - - public ConsumerRecords(final Map>> records) { - this.records = records; - this.metadata = new HashMap<>(); - } - - public ConsumerRecords(final Map>> records, - final Map metadata) { + public ConsumerRecords(Map>> records) { this.records = records; - this.metadata = metadata; - } - - ConsumerRecords(final FetchedRecords fetchedRecords) { - this(fetchedRecords.records(), extractMetadata(fetchedRecords)); } /** @@ -142,16 +53,6 @@ public List> records(TopicPartition partition) { return Collections.unmodifiableList(recs); } - /** - * Get the updated metadata returned by the brokers along with this record set. - * May be empty or partial depending on the responses from the broker during this particular poll. - * May also include metadata for additional partitions than the ones for which there are records - * in this {@code ConsumerRecords} object. - */ - public Map metadata() { - return Collections.unmodifiableMap(metadata); - } - /** * Get just the records for the given topic */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index f7760bbe890a4..082d0eecd120d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; -import org.apache.kafka.clients.consumer.internals.FetchedRecords; import org.apache.kafka.clients.consumer.internals.Fetcher; import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry; import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics; @@ -74,6 +73,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -578,6 +578,7 @@ public class KafkaConsumer implements Consumer { private final Deserializer valueDeserializer; private final Fetcher fetcher; private final ConsumerInterceptors interceptors; + private final IsolationLevel isolationLevel; private final Time time; private final ConsumerNetworkClient client; @@ -736,7 +737,7 @@ public KafkaConsumer(Map configs, FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext); - IsolationLevel isolationLevel = IsolationLevel.valueOf( + this.isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry); int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG); @@ -849,6 +850,7 @@ public KafkaConsumer(Map configs, this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; this.fetcher = fetcher; + this.isolationLevel = IsolationLevel.READ_UNCOMMITTED; this.interceptors = Objects.requireNonNull(interceptors); this.time = time; this.client = client; @@ -1235,7 +1237,7 @@ private ConsumerRecords poll(final Timer timer, final boolean includeMetad } } - final FetchedRecords records = pollForFetches(timer); + final Map>> records = pollForFetches(timer); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user @@ -1269,12 +1271,12 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitFo /** * @throws KafkaException if the rebalance callback throws exception */ - private FetchedRecords pollForFetches(Timer timer) { + private Map>> pollForFetches(Timer timer) { long pollTimeout = coordinator == null ? timer.remainingMs() : Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); // if data is available already, return it immediately - final FetchedRecords records = fetcher.fetchedRecords(); + final Map>> records = fetcher.fetchedRecords(); if (!records.isEmpty()) { return records; } @@ -2219,6 +2221,30 @@ public Map endOffsets(Collection partition } } + /** + * Get the consumer's current lag on the partition. Returns an "empty" {@link OptionalLong} if the lag is not known, + * for example if there is no position yet, or if the end offset is not known yet. + * + *

+ * This method uses locally cached metadata and never makes a remote call. + * + * @param topicPartition The partition to get the lag for. + * + * @return This {@code Consumer} instance's current lag for the given partition. + * + * @throws IllegalStateException if the {@code topicPartition} is not assigned + **/ + @Override + public OptionalLong currentLag(TopicPartition topicPartition) { + acquireAndEnsureOpen(); + try { + final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel); + return lag == null ? OptionalLong.empty() : OptionalLong.of(lag); + } finally { + release(); + } + } + /** * Return the current group metadata associated with this consumer. * diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 7ddda76e416b0..b5e0c1d93fc70 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Queue; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -44,6 +45,8 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; +import static java.util.Collections.singleton; + /** * A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is not @@ -218,21 +221,7 @@ public synchronized ConsumerRecords poll(final Duration timeout) { } toClear.forEach(p -> this.records.remove(p)); - - final Map metadata = new HashMap<>(); - for (final TopicPartition partition : subscriptions.assignedPartitions()) { - if (subscriptions.hasValidPosition(partition) && endOffsets.containsKey(partition)) { - final SubscriptionState.FetchPosition position = subscriptions.position(partition); - final long offset = position.offset; - final long endOffset = endOffsets.get(partition); - metadata.put( - partition, - new ConsumerRecords.Metadata(System.currentTimeMillis(), offset, endOffset) - ); - } - } - - return new ConsumerRecords<>(results, metadata); + return new ConsumerRecords<>(results); } public synchronized void addRecord(ConsumerRecord record) { @@ -243,7 +232,6 @@ public synchronized void addRecord(ConsumerRecord record) { throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer"); List> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>()); recs.add(record); - endOffsets.compute(tp, (ignore, offset) -> offset == null ? record.offset() : Math.max(offset, record.offset())); } /** @@ -318,7 +306,7 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) @Deprecated @Override public synchronized OffsetAndMetadata committed(final TopicPartition partition) { - return committed(Collections.singleton(partition)).get(partition); + return committed(singleton(partition)).get(partition); } @Deprecated @@ -556,6 +544,16 @@ public Map endOffsets(Collection partition return endOffsets(partitions); } + @Override + public OptionalLong currentLag(TopicPartition topicPartition) { + if (endOffsets.containsKey(topicPartition)) { + return OptionalLong.of(endOffsets.get(topicPartition) - position(topicPartition)); + } else { + // if the test doesn't bother to set an end offset, we assume it wants to model being caught up. + return OptionalLong.of(0L); + } + } + @Override public ConsumerGroupMetadata groupMetadata() { return new ConsumerGroupMetadata("dummy.group.id", 1, "1", Optional.empty()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java deleted file mode 100644 index d8ef92bd6e48a..0000000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchedRecords.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class FetchedRecords { - private final Map>> records; - private final Map metadata; - - public static final class FetchMetadata { - - private final long receivedTimestamp; - private final SubscriptionState.FetchPosition position; - private final Long endOffset; - - public FetchMetadata(final long receivedTimestamp, - final SubscriptionState.FetchPosition position, - final Long endOffset) { - this.receivedTimestamp = receivedTimestamp; - this.position = position; - this.endOffset = endOffset; - } - - public long receivedTimestamp() { - return receivedTimestamp; - } - - public SubscriptionState.FetchPosition position() { - return position; - } - - public Long endOffset() { - return endOffset; - } - - @Override - public String toString() { - return "FetchMetadata{" + - "receivedTimestamp=" + receivedTimestamp + - ", position=" + position + - ", endOffset=" + endOffset + - '}'; - } - } - - public FetchedRecords() { - records = new HashMap<>(); - metadata = new HashMap<>(); - } - - public void addRecords(final TopicPartition topicPartition, final List> records) { - if (this.records.containsKey(topicPartition)) { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - final List> currentRecords = this.records.get(topicPartition); - final List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - this.records.put(topicPartition, newRecords); - } else { - this.records.put(topicPartition, records); - } - } - - public Map>> records() { - return records; - } - - public void addMetadata(final TopicPartition partition, final FetchMetadata fetchMetadata) { - metadata.put(partition, fetchMetadata); - } - - public Map metadata() { - return metadata; - } - - public boolean isEmpty() { - return records.isEmpty() && metadata.isEmpty(); - } -} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 2b9198fa1e757..5e8ad3c2454e2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -262,8 +262,9 @@ public synchronized int sendFetches() { .toForget(data.toForget()) .rackId(clientRackId); - log.debug("Sending {} {} to broker {}", isolationLevel, data, fetchTarget); - + if (log.isDebugEnabled()) { + log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget); + } RequestFuture future = client.send(fetchTarget, request); // We add the node to the set of nodes with pending fetch requests before adding the // listener because the future may have been fulfilled on another thread (e.g. during a @@ -318,7 +319,7 @@ public void onSuccess(ClientResponse resp) { short responseVersion = resp.requestHeader().apiVersion(); completedFetches.add(new CompletedFetch(partition, partitionData, - metricAggregator, batches, fetchOffset, responseVersion, resp.receivedTimeMs())); + metricAggregator, batches, fetchOffset, responseVersion)); } } @@ -597,8 +598,8 @@ private Map beginningOrEndOffset(Collection fetchedRecords() { - FetchedRecords fetched = new FetchedRecords<>(); + public Map>> fetchedRecords() { + Map>> fetched = new HashMap<>(); Queue pausedCompletedFetches = new ArrayDeque<>(); int recordsRemaining = maxPollRecords; @@ -636,28 +637,20 @@ public FetchedRecords fetchedRecords() { } else { List> records = fetchRecords(nextInLineFetch, recordsRemaining); - TopicPartition partition = nextInLineFetch.partition; - - // This can be false when a rebalance happened before fetched records - // are returned to the consumer's poll call - if (subscriptions.isAssigned(partition)) { - - // initializeCompletedFetch, above, has already persisted the metadata from the fetch in the - // SubscriptionState, so we can just read it out, which in particular lets us re-use the logic - // for determining the end offset - final long receivedTimestamp = nextInLineFetch.receivedTimestamp; - final Long endOffset = subscriptions.logEndOffset(partition, isolationLevel); - final FetchPosition fetchPosition = subscriptions.position(partition); - - final FetchedRecords.FetchMetadata metadata = - new FetchedRecords.FetchMetadata(receivedTimestamp, fetchPosition, endOffset); - - fetched.addMetadata(partition, metadata); - - } - if (!records.isEmpty()) { - fetched.addRecords(partition, records); + TopicPartition partition = nextInLineFetch.partition; + List> currentRecords = fetched.get(partition); + if (currentRecords == null) { + fetched.put(partition, records); + } else { + // this case shouldn't usually happen because we only send one fetch at a time per partition, + // but it might conceivably happen in some rare cases (such as partition leader changes). + // we have to copy to a new list because the old one may be immutable + List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); + newRecords.addAll(currentRecords); + newRecords.addAll(records); + fetched.put(partition, newRecords); + } recordsRemaining -= records.size(); } } @@ -1466,7 +1459,6 @@ private class CompletedFetch { private final FetchResponse.PartitionData partitionData; private final FetchResponseMetricAggregator metricAggregator; private final short responseVersion; - private final long receivedTimestamp; private int recordsRead; private int bytesRead; @@ -1485,15 +1477,13 @@ private CompletedFetch(TopicPartition partition, FetchResponseMetricAggregator metricAggregator, Iterator batches, Long fetchOffset, - short responseVersion, - final long receivedTimestamp) { + short responseVersion) { this.partition = partition; this.partitionData = partitionData; this.metricAggregator = metricAggregator; this.batches = batches; this.nextFetchOffset = fetchOffset; this.responseVersion = responseVersion; - this.receivedTimestamp = receivedTimestamp; this.lastEpoch = Optional.empty(); this.abortedProducerIds = new HashSet<>(); this.abortedTransactions = abortedTransactions(partitionData); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index b2a5e51aaae8a..7b971a162c319 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -537,7 +537,7 @@ public synchronized FetchPosition position(TopicPartition tp) { return assignedState(tp).position; } - synchronized Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) { + public synchronized Long partitionLag(TopicPartition tp, IsolationLevel isolationLevel) { TopicPartitionState topicPartitionState = assignedState(tp); if (isolationLevel == IsolationLevel.READ_COMMITTED) return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset - topicPartitionState.position.offset; @@ -562,18 +562,6 @@ synchronized void updateLastStableOffset(TopicPartition tp, long lastStableOffse assignedState(tp).lastStableOffset(lastStableOffset); } - synchronized Long logStartOffset(TopicPartition tp) { - return assignedState(tp).logStartOffset; - } - - synchronized Long logEndOffset(TopicPartition tp, IsolationLevel isolationLevel) { - TopicPartitionState topicPartitionState = assignedState(tp); - if (isolationLevel == IsolationLevel.READ_COMMITTED) - return topicPartitionState.lastStableOffset == null ? null : topicPartitionState.lastStableOffset; - else - return topicPartitionState.highWatermark == null ? null : topicPartitionState.highWatermark; - } - /** * Set the preferred read replica with a lease timeout. After this time, the replica will no longer be valid and * {@link #preferredReadReplica(TopicPartition, long)} will return an empty result. diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index e92e684af0c1c..cfdc2bf29de9f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -45,19 +45,18 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData; -import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; import org.apache.kafka.common.message.ListOffsetsResponseData; -import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -92,6 +91,8 @@ import org.apache.kafka.test.MockConsumerInterceptor; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.TestUtils; +import org.apache.kafka.common.metrics.stats.Avg; + import org.junit.jupiter.api.Test; import javax.management.MBeanServer; @@ -100,6 +101,7 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.ConcurrentModificationException; @@ -110,6 +112,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Properties; import java.util.Queue; import java.util.Set; @@ -126,18 +129,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Arrays.asList; -import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -607,7 +602,7 @@ public void testFetchProgressWithMissingPartitionPosition() { initMetadata(client, Collections.singletonMap(topic, 2)); KafkaConsumer consumer = newConsumerNoAutoCommit(time, client, subscription, metadata); - consumer.assign(asList(tp0, tp1)); + consumer.assign(Arrays.asList(tp0, tp1)); consumer.seekToEnd(singleton(tp0)); consumer.seekToBeginning(singleton(tp1)); @@ -767,7 +762,7 @@ public void testCommitsFetchedDuringAssign() { client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator); assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); - consumer.assign(asList(tp0, tp1)); + consumer.assign(Arrays.asList(tp0, tp1)); // fetch offset for two topics Map offsets = new HashMap<>(); @@ -839,14 +834,14 @@ public void testNoCommittedOffsets() { ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); - consumer.assign(asList(tp0, tp1)); + consumer.assign(Arrays.asList(tp0, tp1)); // lookup coordinator client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, node), node); Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); // fetch offset for one topic - client.prepareResponseFrom(offsetResponse(mkMap(mkEntry(tp0, offset1), mkEntry(tp1, -1L)), Errors.NONE), coordinator); + client.prepareResponseFrom(offsetResponse(Utils.mkMap(Utils.mkEntry(tp0, offset1), Utils.mkEntry(tp1, -1L)), Errors.NONE), coordinator); final Map committed = consumer.committed(Utils.mkSet(tp0, tp1)); assertEquals(2, committed.size()); assertEquals(offset1, committed.get(tp0).offset()); @@ -1056,7 +1051,6 @@ public void fetchResponseWithUnexpectedPartitionIsIgnored() { ConsumerRecords records = consumer.poll(Duration.ZERO); assertEquals(0, records.count()); - assertThat(records.metadata(), equalTo(emptyMap())); consumer.close(Duration.ofMillis(0)); } @@ -1086,7 +1080,7 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); // initial subscription - consumer.subscribe(asList(topic, topic2), getConsumerRebalanceListener(consumer)); + consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer)); // verify that subscription has changed but assignment is still unchanged assertEquals(2, consumer.subscription().size()); @@ -1094,7 +1088,7 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { assertTrue(consumer.assignment().isEmpty()); // mock rebalance responses - Node coordinator = prepareRebalance(client, node, assignor, asList(tp0, t2p0), null); + Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)); consumer.poll(Duration.ZERO); @@ -1123,12 +1117,10 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { // verify that the fetch occurred as expected assertEquals(11, records.count()); assertEquals(1L, consumer.position(tp0)); - assertEquals(1L, (long) records.metadata().get(tp0).position()); assertEquals(10L, consumer.position(t2p0)); - assertEquals(10L, (long) records.metadata().get(t2p0).position()); // subscription change - consumer.subscribe(asList(topic, topic3), getConsumerRebalanceListener(consumer)); + consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer)); // verify that subscription has changed but assignment is still unchanged assertEquals(2, consumer.subscription().size()); @@ -1143,7 +1135,7 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { AtomicBoolean commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets1); // mock rebalance responses - prepareRebalance(client, node, assignor, asList(tp0, t3p0), coordinator); + prepareRebalance(client, node, assignor, Arrays.asList(tp0, t3p0), coordinator); // mock a response to the next fetch from the new assignment Map fetches2 = new HashMap<>(); @@ -1156,9 +1148,7 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { // verify that the fetch occurred as expected assertEquals(101, records.count()); assertEquals(2L, consumer.position(tp0)); - assertEquals(2L, (long) records.metadata().get(tp0).position()); assertEquals(100L, consumer.position(t3p0)); - assertEquals(100L, (long) records.metadata().get(t3p0).position()); // verify that the offset commits occurred as expected assertTrue(commitReceived.get()); @@ -1501,7 +1491,7 @@ public void testGracefulClose() throws Exception { response.put(tp0, Errors.NONE); OffsetCommitResponse commitResponse = offsetCommitResponse(response); LeaveGroupResponse leaveGroupResponse = new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())); - consumerCloseTest(5000, asList(commitResponse, leaveGroupResponse), 0, false); + consumerCloseTest(5000, Arrays.asList(commitResponse, leaveGroupResponse), 0, false); } @Test @@ -1872,12 +1862,12 @@ public void testReturnRecordsDuringRebalance() throws InterruptedException { ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); - initMetadata(client, mkMap(mkEntry(topic, 1), mkEntry(topic2, 1), mkEntry(topic3, 1))); + initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); - consumer.subscribe(asList(topic, topic2), getConsumerRebalanceListener(consumer)); + consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer)); Node node = metadata.fetch().nodes().get(0); - Node coordinator = prepareRebalance(client, node, assignor, asList(tp0, t2p0), null); + Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); // a poll with non-zero milliseconds would complete three round-trips (discover, join, sync) TestUtils.waitForCondition(() -> { @@ -1908,7 +1898,7 @@ public void testReturnRecordsDuringRebalance() throws InterruptedException { client.respondFrom(fetchResponse(fetches1), node); // subscription change - consumer.subscribe(asList(topic, topic3), getConsumerRebalanceListener(consumer)); + consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer)); // verify that subscription has changed but assignment is still unchanged assertEquals(Utils.mkSet(topic, topic3), consumer.subscription()); @@ -1954,7 +1944,7 @@ public void testReturnRecordsDuringRebalance() throws InterruptedException { client.respondFrom(fetchResponse(fetches1), node); // now complete the rebalance - client.respondFrom(syncGroupResponse(asList(tp0, t3p0), Errors.NONE), coordinator); + client.respondFrom(syncGroupResponse(Arrays.asList(tp0, t3p0), Errors.NONE), coordinator); AtomicInteger count = new AtomicInteger(0); TestUtils.waitForCondition(() -> { @@ -2053,7 +2043,7 @@ public void testInvalidGroupMetadata() throws InterruptedException { } @Test - public void testPollMetadata() { + public void testCurrentLag() { final Time time = new MockTime(); final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); @@ -2065,101 +2055,28 @@ public void testPollMetadata() { final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); - consumer.assign(singleton(tp0)); - consumer.seek(tp0, 50L); - - final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); - client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo))); - - final ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); - assertEquals(5, records.count()); - assertEquals(55L, consumer.position(tp0)); - - // verify that the consumer computes the correct metadata based on the fetch response - final ConsumerRecords.Metadata actualMetadata = records.metadata().get(tp0); - assertEquals(100L, (long) actualMetadata.endOffset()); - assertEquals(55L, (long) actualMetadata.position()); - assertEquals(45L, (long) actualMetadata.lag()); - consumer.close(Duration.ZERO); - } - - - @Test - public void testPollMetadataWithExtraPartitions() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); - final ConsumerMetadata metadata = createMetadata(subscription); - final MockClient client = new MockClient(time, metadata); + // throws for unassigned partition + assertThrows(IllegalStateException.class, () -> consumer.currentLag(tp0)); - initMetadata(client, singletonMap(topic, 2)); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); + consumer.assign(singleton(tp0)); - final KafkaConsumer consumer = - newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + // no error for no current position + assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); - consumer.assign(asList(tp0, tp1)); consumer.seek(tp0, 50L); - consumer.seek(tp1, 10L); - client.prepareResponse( - fetchResponse( - mkMap( - mkEntry(tp0, new FetchInfo(1L, 99L, 50L, 5)), - mkEntry(tp1, new FetchInfo(0L, 29L, 10L, 0)) - ) - ) - ); - - final ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); - assertEquals(5, records.count()); - assertEquals(55L, consumer.position(tp0)); - - assertEquals(5, records.records(tp0).size()); - final ConsumerRecords.Metadata tp0Metadata = records.metadata().get(tp0); - assertEquals(100L, (long) tp0Metadata.endOffset()); - assertEquals(55L, (long) tp0Metadata.position()); - assertEquals(45L, (long) tp0Metadata.lag()); - - // we may get back metadata for other assigned partitions even if we don't get records for them - assertEquals(0, records.records(tp1).size()); - final ConsumerRecords.Metadata tp1Metadata = records.metadata().get(tp1); - assertEquals(30L, (long) tp1Metadata.endOffset()); - assertEquals(10L, (long) tp1Metadata.position()); - assertEquals(20L, (long) tp1Metadata.lag()); - - consumer.close(Duration.ZERO); - } - - @Test - public void testPollMetadataWithNoRecords() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); - final ConsumerMetadata metadata = createMetadata(subscription); - final MockClient client = new MockClient(time, metadata); - - initMetadata(client, singletonMap(topic, 1)); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - - final KafkaConsumer consumer = - newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); - - consumer.assign(singleton(tp0)); - consumer.seek(tp0, 50L); + // no error for no end offset (so unknown lag) + assertEquals(OptionalLong.empty(), consumer.currentLag(tp0)); - final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 0); + final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5); client.prepareResponse(fetchResponse(singletonMap(tp0, fetchInfo))); final ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); + assertEquals(5, records.count()); + assertEquals(55L, consumer.position(tp0)); - // we got no records back ... - assertEquals(0, records.count()); - assertEquals(50L, consumer.position(tp0)); - - // ... but we can still get metadata that was in the fetch response - final ConsumerRecords.Metadata actualMetadata = records.metadata().get(tp0); - assertEquals(100L, (long) actualMetadata.endOffset()); - assertEquals(50L, (long) actualMetadata.position()); - assertEquals(50L, (long) actualMetadata.lag()); + // correct lag result + assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0)); consumer.close(Duration.ZERO); } @@ -2329,7 +2246,7 @@ private OffsetFetchResponse offsetResponse(Map offsets, Er } private ListOffsetsResponse listOffsetsResponse(Map offsets) { - return listOffsetsResponse(offsets, emptyMap()); + return listOffsetsResponse(offsets, Collections.emptyMap()); } private ListOffsetsResponse listOffsetsResponse(Map partitionOffsets, @@ -2514,8 +2431,6 @@ private static class FetchInfo { } FetchInfo(long logFirstOffset, long logLastOffset, long offset, int count) { - assertThat(logFirstOffset, lessThanOrEqualTo(offset)); - assertThat(logLastOffset, greaterThanOrEqualTo(offset + count)); this.logFirstOffset = logFirstOffset; this.logLastOffset = logLastOffset; this.offset = offset; @@ -2656,7 +2571,7 @@ public void testPollIdleRatio() { } private static boolean consumerMetricPresent(KafkaConsumer consumer, String name) { - MetricName metricName = new MetricName(name, "consumer-metrics", "", emptyMap()); + MetricName metricName = new MetricName(name, "consumer-metrics", "", Collections.emptyMap()); return consumer.metrics.metrics().containsKey(metricName); } @@ -2696,11 +2611,11 @@ public void testEnforceRebalanceTriggersRebalanceOnNextPoll() { ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); MockRebalanceListener countingRebalanceListener = new MockRebalanceListener(); - initMetadata(client, mkMap(mkEntry(topic, 1), mkEntry(topic2, 1), mkEntry(topic3, 1))); + initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); - consumer.subscribe(asList(topic, topic2), countingRebalanceListener); + consumer.subscribe(Arrays.asList(topic, topic2), countingRebalanceListener); Node node = metadata.fetch().nodes().get(0); - prepareRebalance(client, node, assignor, asList(tp0, t2p0), null); + prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null); // a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group consumer.poll(Duration.ZERO); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 2c13864fb0589..0a7d5cfde49b4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -522,7 +522,7 @@ public void testParseCorruptedRecord() throws Exception { consumerClient.poll(time.timer(0)); // the first fetchedRecords() should return the first valid message - assertEquals(1, fetcher.fetchedRecords().records().get(tp0).size()); + assertEquals(1, fetcher.fetchedRecords().get(tp0).size()); assertEquals(1, subscriptions.position(tp0).offset); ensureBlockOnRecord(1L); @@ -926,7 +926,7 @@ public void testInFlightFetchOnPausedPartition() { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); - assertNull(fetcher.fetchedRecords().records().get(tp0)); + assertNull(fetcher.fetchedRecords().get(tp0)); } @Test @@ -1115,7 +1115,7 @@ public void testFetchNotLeaderOrFollower() { assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().records().size()); + assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); } @@ -1128,7 +1128,7 @@ public void testFetchUnknownTopicOrPartition() { assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().records().size()); + assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); } @@ -1142,7 +1142,7 @@ public void testFetchFencedLeaderEpoch() { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.FENCED_LEADER_EPOCH, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().records().size(), "Should not return any records"); + assertEquals(0, fetcher.fetchedRecords().size(), "Should not return any records"); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()), "Should have requested metadata update"); } @@ -1156,7 +1156,7 @@ public void testFetchUnknownLeaderEpoch() { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.UNKNOWN_LEADER_EPOCH, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().records().size(), "Should not return any records"); + assertEquals(0, fetcher.fetchedRecords().size(), "Should not return any records"); assertNotEquals(0L, metadata.timeToNextUpdate(time.milliseconds()), "Should not have requested metadata update"); } @@ -1198,7 +1198,7 @@ public void testFetchOffsetOutOfRange() { assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().records().size()); + assertEquals(0, fetcher.fetchedRecords().size()); assertTrue(subscriptions.isOffsetResetNeeded(tp0)); assertNull(subscriptions.validPosition(tp0)); assertNull(subscriptions.position(tp0)); @@ -1216,7 +1216,7 @@ public void testStaleOutOfRangeError() { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); subscriptions.seek(tp0, 1); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().records().size()); + assertEquals(0, fetcher.fetchedRecords().size()); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); assertEquals(1, subscriptions.position(tp0).offset); } @@ -1234,7 +1234,7 @@ public void testFetchedRecordsAfterSeek() { consumerClient.poll(time.timer(0)); assertFalse(subscriptions.isOffsetResetNeeded(tp0)); subscriptions.seek(tp0, 2); - assertEquals(0, fetcher.fetchedRecords().records().size()); + assertEquals(0, fetcher.fetchedRecords().size()); } @Test @@ -1390,7 +1390,7 @@ public void testSeekBeforeException() { client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(time.timer(0)); - assertEquals(2, fetcher.fetchedRecords().records().get(tp0).size()); + assertEquals(2, fetcher.fetchedRecords().get(tp0).size()); subscriptions.assignFromUser(Utils.mkSet(tp0, tp1)); subscriptions.seekUnvalidated(tp1, new SubscriptionState.FetchPosition(1, Optional.empty(), metadata.currentLeader(tp1))); @@ -1401,11 +1401,11 @@ public void testSeekBeforeException() { FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, Optional.empty(), null, MemoryRecords.EMPTY)); client.prepareResponse(new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID)); consumerClient.poll(time.timer(0)); - assertEquals(1, fetcher.fetchedRecords().records().get(tp0).size()); + assertEquals(1, fetcher.fetchedRecords().get(tp0).size()); subscriptions.seek(tp1, 10); // Should not throw OffsetOutOfRangeException after the seek - assertEquals(0, fetcher.fetchedRecords().records().size()); + assertEquals(0, fetcher.fetchedRecords().size()); } @Test @@ -1418,7 +1418,7 @@ public void testFetchDisconnected() { assertEquals(1, fetcher.sendFetches()); client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0), true); consumerClient.poll(time.timer(0)); - assertEquals(0, fetcher.fetchedRecords().records().size()); + assertEquals(0, fetcher.fetchedRecords().size()); // disconnects should have no affect on subscription state assertFalse(subscriptions.isOffsetResetNeeded(tp0)); @@ -4521,7 +4521,7 @@ private MetadataResponse newMetadataResponse(String topic, Errors error) { @SuppressWarnings("unchecked") private Map>> fetchedRecords() { - return (Map) fetcher.fetchedRecords().records(); + return (Map) fetcher.fetchedRecords(); } private void buildFetcher(int maxPollRecords) { diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index d0b9084227c49..d4c8492e9a45c 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -583,7 +583,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer.seekToEnd(List(tp).asJava) assertEquals(totalRecords, consumer.position(tp)) - assertTrue(pollForRecord(consumer, Duration.ofMillis(50)).isEmpty) + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty) consumer.seekToBeginning(List(tp).asJava) assertEquals(0L, consumer.position(tp)) @@ -601,7 +601,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer.seekToEnd(List(tp2).asJava) assertEquals(totalRecords, consumer.position(tp2)) - assertTrue(pollForRecord(consumer, Duration.ofMillis(50)).isEmpty) + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty) consumer.seekToBeginning(List(tp2).asJava) assertEquals(0L, consumer.position(tp2)) @@ -670,7 +670,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer.pause(partitions) startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords = 5, tp, startingTimestamp = startingTimestamp) - assertTrue(pollForRecord(consumer, Duration.ofMillis(100)).isEmpty) + assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty) consumer.resume(partitions) consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 5, startingTimestamp = startingTimestamp) } @@ -718,8 +718,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { // consuming a record that is too large should succeed since KIP-74 consumer.assign(List(tp).asJava) - val duration = Duration.ofMillis(20000) - val records = pollForRecord(consumer, duration) + val records = consumer.poll(Duration.ofMillis(20000)) assertEquals(1, records.count) val consumerRecord = records.iterator().next() assertEquals(0L, consumerRecord.offset) @@ -751,7 +750,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { // we should only get the small record in the first `poll` consumer.assign(List(tp).asJava) - val records = pollForRecord(consumer, Duration.ofMillis(20000)) + val records = consumer.poll(Duration.ofMillis(20000)) assertEquals(1, records.count) val consumerRecord = records.iterator().next() assertEquals(0L, consumerRecord.offset) @@ -1805,12 +1804,12 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer3.assign(asList(tp)) consumer3.seek(tp, 1) - val numRecords1 = pollForRecord(consumer1, Duration.ofMillis(5000)).count() + val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count() assertThrows(classOf[InvalidGroupIdException], () => consumer1.commitSync()) assertThrows(classOf[InvalidGroupIdException], () => consumer2.committed(Set(tp).asJava)) - val numRecords2 = pollForRecord(consumer2, Duration.ofMillis(5000)).count() - val numRecords3 = pollForRecord(consumer3, Duration.ofMillis(5000)).count() + val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count() + val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count() consumer1.unsubscribe() consumer2.unsubscribe() @@ -1859,10 +1858,10 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer1.assign(asList(tp)) consumer2.assign(asList(tp)) - val records1 = pollForRecord(consumer1, Duration.ofMillis(5000)) + val records1 = consumer1.poll(Duration.ofMillis(5000)) consumer1.commitSync() - val records2 = pollForRecord(consumer2, Duration.ofMillis(5000)) + val records2 = consumer2.poll(Duration.ofMillis(5000)) consumer2.commitSync() consumer1.close() @@ -1873,19 +1872,4 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1, "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1") } - - /** - * Consumer#poll returns early if there is metadata to return even if there are no records, - * so when we intend to wait for records, we can't just rely on long polling in the Consumer. - */ - private def pollForRecord(consumer: KafkaConsumer[Array[Byte], Array[Byte]], duration: Duration) = { - val deadline = System.currentTimeMillis() + duration.toMillis - var durationRemaining = deadline - System.currentTimeMillis() - var result = consumer.poll(Duration.ofMillis(durationRemaining)) - while (result.count() == 0 && durationRemaining > 0) { - result = consumer.poll(Duration.ofMillis(durationRemaining)) - durationRemaining = deadline - System.currentTimeMillis() - } - result - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java index 46b3429f226e6..199bc0e6456cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; @@ -30,6 +29,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.OptionalLong; import java.util.PriorityQueue; import java.util.Set; import java.util.function.Function; @@ -60,6 +60,7 @@ public class PartitionGroup { private final Logger logger; private final Map partitionQueues; + private final Function lagProvider; private final Sensor enforcedProcessingSensor; private final long maxTaskIdleMs; private final Sensor recordLatenessSensor; @@ -68,7 +69,6 @@ public class PartitionGroup { private long streamTime; private int totalBuffered; private boolean allBuffered; - private final Map fetchedLags = new HashMap<>(); private final Map idlePartitionDeadlines = new HashMap<>(); static class RecordInfo { @@ -89,12 +89,14 @@ RecordQueue queue() { PartitionGroup(final LogContext logContext, final Map partitionQueues, + final Function lagProvider, final Sensor recordLatenessSensor, final Sensor enforcedProcessingSensor, final long maxTaskIdleMs) { this.logger = logContext.logger(PartitionGroup.class); nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp)); this.partitionQueues = partitionQueues; + this.lagProvider = lagProvider; this.enforcedProcessingSensor = enforcedProcessingSensor; this.maxTaskIdleMs = maxTaskIdleMs; this.recordLatenessSensor = recordLatenessSensor; @@ -103,31 +105,7 @@ RecordQueue queue() { streamTime = RecordQueue.UNKNOWN; } - public void addFetchedMetadata(final TopicPartition partition, final ConsumerRecords.Metadata metadata) { - final Long lag = metadata.lag(); - if (lag != null) { - logger.trace("added fetched lag {}: {}", partition, lag); - fetchedLags.put(partition, lag); - } - } - public boolean readyToProcess(final long wallClockTime) { - if (logger.isTraceEnabled()) { - for (final Map.Entry entry : partitionQueues.entrySet()) { - logger.trace( - "buffered/lag {}: {}/{}", - entry.getKey(), - entry.getValue().size(), - fetchedLags.get(entry.getKey()) - ); - } - } - // Log-level strategy: - // TRACE for messages that don't wait for fetches - // TRACE when we waited for a fetch and decided to wait some more, as configured - // TRACE when we are ready for processing and didn't have to enforce processing - // INFO when we enforce processing, since this has to wait for fetches AND may result in disorder - if (maxTaskIdleMs == StreamsConfig.MAX_TASK_IDLE_MS_DISABLED) { if (logger.isTraceEnabled() && !allBuffered && totalBuffered > 0) { final Set bufferedPartitions = new HashSet<>(); @@ -156,50 +134,53 @@ public boolean readyToProcess(final long wallClockTime) { final TopicPartition partition = entry.getKey(); final RecordQueue queue = entry.getValue(); - final Long nullableFetchedLag = fetchedLags.get(partition); if (!queue.isEmpty()) { // this partition is ready for processing idlePartitionDeadlines.remove(partition); queued.add(partition); - } else if (nullableFetchedLag == null) { - // must wait to fetch metadata for the partition - idlePartitionDeadlines.remove(partition); - logger.trace("Waiting to fetch data for {}", partition); - return false; - } else if (nullableFetchedLag > 0L) { - // must wait to poll the data we know to be on the broker - idlePartitionDeadlines.remove(partition); - logger.trace( - "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.", - partition, - nullableFetchedLag - ); - return false; } else { - // p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up. - // One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs - // instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero - // lag instead of when we happen to run this method, but realistically it's probably a small difference - // and using wall clock time seems more intuitive for users, - // since the log message will be as of wallClockTime. - idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs); - final long deadline = idlePartitionDeadlines.get(partition); - if (wallClockTime < deadline) { + final OptionalLong fetchedLag = lagProvider.apply(partition); + + if (!fetchedLag.isPresent()) { + // must wait to fetch metadata for the partition + idlePartitionDeadlines.remove(partition); + logger.trace("Waiting to fetch data for {}", partition); + return false; + } else if (fetchedLag.getAsLong() > 0L) { + // must wait to poll the data we know to be on the broker + idlePartitionDeadlines.remove(partition); logger.trace( - "Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).", + "Lag for {} is currently {}, but no data is buffered locally. Waiting to buffer some records.", partition, - wallClockTime, - maxTaskIdleMs, - deadline + fetchedLag.getAsLong() ); return false; } else { - // this partition is ready for processing due to the task idling deadline passing - if (enforced == null) { - enforced = new HashMap<>(); + // p is known to have zero lag. wait for maxTaskIdleMs to see if more data shows up. + // One alternative would be to set the deadline to nullableMetadata.receivedTimestamp + maxTaskIdleMs + // instead. That way, we would start the idling timer as of the freshness of our knowledge about the zero + // lag instead of when we happen to run this method, but realistically it's probably a small difference + // and using wall clock time seems more intuitive for users, + // since the log message will be as of wallClockTime. + idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + maxTaskIdleMs); + final long deadline = idlePartitionDeadlines.get(partition); + if (wallClockTime < deadline) { + logger.trace( + "Lag for {} is currently 0 and current time is {}. Waiting for new data to be produced for configured idle time {} (deadline is {}).", + partition, + wallClockTime, + maxTaskIdleMs, + deadline + ); + return false; + } else { + // this partition is ready for processing due to the task idling deadline passing + if (enforced == null) { + enforced = new HashMap<>(); + } + enforced.put(partition, deadline); } - enforced.put(partition, deadline); } } } @@ -211,7 +192,7 @@ public boolean readyToProcess(final long wallClockTime) { return false; } else { enforcedProcessingSensor.record(1.0d, wallClockTime); - logger.info("Continuing to process although some partition timestamps were not buffered locally." + + logger.trace("Continuing to process although some partitions are empty on the broker." + "\n\tThere may be out-of-order processing for this task as a result." + "\n\tPartitions with local data: {}." + "\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}." + diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index d866954668618..4efb10eb566cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; @@ -287,11 +286,6 @@ public void addRecords(final TopicPartition partition, final Iterable> records); - /** - * Add to this task any metadata returned from the poll. - */ - void addFetchedMetadata(TopicPartition partition, ConsumerRecords.Metadata metadata); - default boolean process(final long wallClockTime) { return false; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 5b9ebadb5da3b..209ebb243168f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -968,7 +968,7 @@ int commitAll() { * @param records Records, can be null */ void addRecordsToTasks(final ConsumerRecords records) { - for (final TopicPartition partition : union(HashSet::new, records.partitions(), records.metadata().keySet())) { + for (final TopicPartition partition : records.partitions()) { final Task activeTask = tasks.activeTasksForInputPartition(partition); if (activeTask == null) { @@ -978,7 +978,6 @@ void addRecordsToTasks(final ConsumerRecords records) { } activeTask.addRecords(partition, records.records(partition)); - activeTask.addFetchedMetadata(partition, records.metadata().get(partition)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index d0e8b0127f58c..e8147a17b5b66 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -470,7 +470,7 @@ private void createTasks() { assertThat( activeTaskCreator.createTasks( - null, + mockClientSupplier.consumer, mkMap( mkEntry(task00, Collections.singleton(new TopicPartition("topic", 0))), mkEntry(task01, Collections.singleton(new TopicPartition("topic", 1))) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java index 09558d23854e8..40602b5edf1c7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; @@ -41,7 +40,9 @@ import org.junit.Test; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.OptionalLong; import java.util.UUID; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -481,6 +482,7 @@ public void shouldUpdatePartitionQueuesExpand() { final PartitionGroup group = new PartitionGroup( logContext, mkMap(mkEntry(partition1, queue1)), + tp -> OptionalLong.of(0L), getValueSensor(metrics, lastLatenessValue), enforcedProcessingSensor, maxTaskIdleMs @@ -513,6 +515,7 @@ public void shouldUpdatePartitionQueuesShrinkAndExpand() { final PartitionGroup group = new PartitionGroup( logContext, mkMap(mkEntry(partition1, queue1)), + tp -> OptionalLong.of(0L), getValueSensor(metrics, lastLatenessValue), enforcedProcessingSensor, maxTaskIdleMs @@ -547,6 +550,7 @@ public void shouldNeverWaitIfIdlingIsDisabled() { mkEntry(partition1, queue1), mkEntry(partition2, queue2) ), + tp -> OptionalLong.of(0L), getValueSensor(metrics, lastLatenessValue), enforcedProcessingSensor, StreamsConfig.MAX_TASK_IDLE_MS_DISABLED @@ -584,6 +588,7 @@ public void shouldBeReadyIfAllPartitionsAreBuffered() { mkEntry(partition1, queue1), mkEntry(partition2, queue2) ), + tp -> OptionalLong.of(0L), getValueSensor(metrics, lastLatenessValue), enforcedProcessingSensor, 0L @@ -615,12 +620,14 @@ public void shouldBeReadyIfAllPartitionsAreBuffered() { @Test public void shouldWaitForFetchesWhenMetadataIsIncomplete() { + final HashMap lags = new HashMap<>(); final PartitionGroup group = new PartitionGroup( logContext, mkMap( mkEntry(partition1, queue1), mkEntry(partition2, queue2) ), + tp -> lags.getOrDefault(tp, OptionalLong.empty()), getValueSensor(metrics, lastLatenessValue), enforcedProcessingSensor, 0L @@ -643,18 +650,20 @@ public void shouldWaitForFetchesWhenMetadataIsIncomplete() { )) ); } - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L)); + lags.put(partition2, OptionalLong.of(0L)); assertThat(group.readyToProcess(0L), is(true)); } @Test public void shouldWaitForPollWhenLagIsNonzero() { + final HashMap lags = new HashMap<>(); final PartitionGroup group = new PartitionGroup( logContext, mkMap( mkEntry(partition1, queue1), mkEntry(partition2, queue2) ), + tp -> lags.getOrDefault(tp, OptionalLong.empty()), getValueSensor(metrics, lastLatenessValue), enforcedProcessingSensor, 0L @@ -664,7 +673,8 @@ public void shouldWaitForPollWhenLagIsNonzero() { new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); group.addRawRecords(partition1, list1); - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 1L)); + + lags.put(partition2, OptionalLong.of(1L)); assertThat(group.allPartitionsBufferedLocally(), is(false)); @@ -689,6 +699,7 @@ public void shouldIdleAsSpecifiedWhenLagIsZero() { mkEntry(partition1, queue1), mkEntry(partition2, queue2) ), + tp -> OptionalLong.of(0L), getValueSensor(metrics, lastLatenessValue), enforcedProcessingSensor, 1L @@ -698,7 +709,6 @@ public void shouldIdleAsSpecifiedWhenLagIsZero() { new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue), new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue)); group.addRawRecords(partition1, list1); - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L)); assertThat(group.allPartitionsBufferedLocally(), is(false)); @@ -714,16 +724,15 @@ public void shouldIdleAsSpecifiedWhenLagIsZero() { ); } - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); assertThat(group.readyToProcess(1L), is(true)); assertThat( appender.getEvents(), hasItem(Matchers.allOf( - Matchers.hasProperty("level", equalTo("INFO")), + Matchers.hasProperty("level", equalTo("TRACE")), Matchers.hasProperty("message", equalTo( - "[test] Continuing to process although some partition timestamps were not buffered locally.\n" + + "[test] Continuing to process although some partitions are empty on the broker.\n" + "\tThere may be out-of-order processing for this task as a result.\n" + "\tPartitions with local data: [topic-1].\n" + "\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n" + @@ -734,16 +743,15 @@ public void shouldIdleAsSpecifiedWhenLagIsZero() { ); } - group.addFetchedMetadata(partition2, new ConsumerRecords.Metadata(0L, 0L, 0L)); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class)) { LogCaptureAppender.setClassLoggerToTrace(PartitionGroup.class); assertThat(group.readyToProcess(2L), is(true)); assertThat( appender.getEvents(), hasItem(Matchers.allOf( - Matchers.hasProperty("level", equalTo("INFO")), + Matchers.hasProperty("level", equalTo("TRACE")), Matchers.hasProperty("message", equalTo( - "[test] Continuing to process although some partition timestamps were not buffered locally.\n" + + "[test] Continuing to process although some partitions are empty on the broker.\n" + "\tThere may be out-of-order processing for this task as a result.\n" + "\tPartitions with local data: [topic-1].\n" + "\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n" + @@ -762,6 +770,7 @@ private PartitionGroup getBasicGroup() { mkEntry(partition1, queue1), mkEntry(partition2, queue2) ), + tp -> OptionalLong.of(0L), getValueSensor(metrics, lastLatenessValue), enforcedProcessingSensor, maxTaskIdleMs diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index ea3fbdd91e1cf..6913a45c5f003 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -424,9 +423,6 @@ public void shouldProcessInOrder() { assertEquals(asList(101, 102, 103), source1.values); assertEquals(singletonList(201), source2.values); - // tell the task that it doesn't need to wait for more records on partition1 - task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 30L, 30L)); - assertTrue(task.process(0L)); assertEquals(1, task.numBuffered()); assertEquals(3, source1.numReceived); @@ -434,8 +430,6 @@ public void shouldProcessInOrder() { assertEquals(asList(101, 102, 103), source1.values); assertEquals(asList(201, 202), source2.values); - // tell the task that it doesn't need to wait for more records on partition1 - task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 30L, 30L)); assertTrue(task.process(0L)); assertEquals(0, task.numBuffered()); assertEquals(3, source1.numReceived); @@ -967,9 +961,6 @@ public void shouldPunctuateOnceStreamTimeAfterGap() { assertEquals(3, source2.numReceived); assertTrue(task.maybePunctuateStreamTime()); - // tell the task that it doesn't need to wait for new data on partition1 - task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 160L, 160L)); - // st: 161 assertTrue(task.process(0L)); assertEquals(0, task.numBuffered()); @@ -1560,7 +1551,6 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() { task.addRecords(repartition, singletonList(getConsumerRecordWithOffsetAsTimestamp(repartition, 10L))); assertTrue(task.process(0L)); - task.addFetchedMetadata(partition1, new ConsumerRecords.Metadata(0L, 5L, 5L)); assertTrue(task.process(0L)); task.prepareCommit(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 2ca734de5a0ed..501dbe051996c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; @@ -3126,11 +3125,6 @@ public void addRecords(final TopicPartition partition, final Iterable entry : startOffsets.entrySet()) { - final ConsumerRecords.Metadata metadata = new ConsumerRecords.Metadata( - mockWallClockTime.milliseconds(), - 0L, - 0L - ); - task.addFetchedMetadata(entry.getKey(), metadata); - } } else { task = null; } @@ -616,12 +606,6 @@ private void enqueueTaskRecord(final String inputTopic, value, headers)) ); - final ConsumerRecords.Metadata metadata = new ConsumerRecords.Metadata( - mockWallClockTime.milliseconds(), - offset, - offset - ); - task.addFetchedMetadata(topicOrPatternPartition, metadata); } private void completeAllProcessableWork() {