Skip to content

Commit

Permalink
KAFKA-10866: Add metadata to ConsumerRecords (#9836)
Browse files Browse the repository at this point in the history
Expose fetched metadata via the ConsumerRecords
object as described in KIP-695.

Reviewers: Guozhang Wang <guozhang@apache.org>
  • Loading branch information
vvcephei authored Jan 28, 2021
1 parent f4983f4 commit fdcf8fb
Show file tree
Hide file tree
Showing 12 changed files with 473 additions and 81 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ project(':clients') {
testCompile libs.bcpkix
testCompile libs.junitJupiter
testCompile libs.mockitoCore
testCompile libs.hamcrest

testRuntime libs.slf4jlog4j
testRuntime libs.jacksonDatabind
Expand Down
5 changes: 4 additions & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>

<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark|MockConsumer"/>

<suppress checks="CyclomaticComplexity"
files="MockConsumer"/>

<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.consumer.internals.FetchedRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -32,14 +34,99 @@
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {

@SuppressWarnings("unchecked")
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.EMPTY_MAP);
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(
Collections.emptyMap(),
Collections.emptyMap()
);

private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, Metadata> metadata;

public static final class Metadata {

private final long receivedTimestamp;
private final Long position;
private final Long endOffset;

public Metadata(final long receivedTimestamp,
final Long position,
final Long endOffset) {
this.receivedTimestamp = receivedTimestamp;
this.position = position;
this.endOffset = endOffset;
}

/**
* @return The timestamp of the broker response that contained this metadata
*/
public long receivedTimestamp() {
return receivedTimestamp;
}

/**
* @return The next position the consumer will fetch, or null if the consumer has no position.
*/
public Long position() {
return position;
}

/**
* @return The lag between the next position to fetch and the current end of the partition, or
* null if the end offset is not known or there is no position.
*/
public Long lag() {
return endOffset == null || position == null ? null : endOffset - position;
}

/**
* @return The current last offset in the partition. The determination of the "last" offset
* depends on the Consumer's isolation level. Under "read_uncommitted," this is the last successfully
* replicated offset plus one. Under "read_committed," this is the minimum of the last successfully
* replicated offset plus one or the smallest offset of any open transaction. Null if the end offset
* is not known.
*/
public Long endOffset() {
return endOffset;
}

@Override
public String toString() {
return "Metadata{" +
"receivedTimestamp=" + receivedTimestamp +
", position=" + position +
", endOffset=" + endOffset +
'}';
}
}

private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final FetchedRecords<K, V> fetchedRecords) {
final Map<TopicPartition, Metadata> metadata = new HashMap<>();
for (final Map.Entry<TopicPartition, FetchedRecords.FetchMetadata> entry : fetchedRecords.metadata().entrySet()) {
metadata.put(
entry.getKey(),
new Metadata(
entry.getValue().receivedTimestamp(),
entry.getValue().position() == null ? null : entry.getValue().position().offset,
entry.getValue().endOffset()
)
);
}
return metadata;
}

public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this.records = records;
this.metadata = new HashMap<>();
}

public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
final Map<TopicPartition, Metadata> metadata) {
this.records = records;
this.metadata = metadata;
}

public ConsumerRecords(final FetchedRecords<K, V> fetchedRecords) {
this(fetchedRecords.records(), extractMetadata(fetchedRecords));
}

/**
Expand All @@ -55,6 +142,16 @@ public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
return Collections.unmodifiableList(recs);
}

/**
* Get the updated metadata returned by the brokers along with this record set.
* May be empty or partial depending on the responses from the broker during this particular poll.
* May also include metadata for additional partitions than the ones for which there are records
* in this {@code ConsumerRecords} object.
*/
public Map<TopicPartition, Metadata> metadata() {
return Collections.unmodifiableMap(metadata);
}

/**
* Get just the records for the given topic
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.FetchedRecords;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
Expand Down Expand Up @@ -1234,7 +1235,7 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
}
}

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
final FetchedRecords<K, V> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
Expand Down Expand Up @@ -1268,12 +1269,12 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitFo
/**
* @throws KafkaException if the rebalance callback throws exception
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
private FetchedRecords<K, V> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
final FetchedRecords<K, V> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,21 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
}

toClear.forEach(p -> this.records.remove(p));
return new ConsumerRecords<>(results);

final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>();
for (final TopicPartition partition : subscriptions.assignedPartitions()) {
if (subscriptions.hasValidPosition(partition) && endOffsets.containsKey(partition)) {
final SubscriptionState.FetchPosition position = subscriptions.position(partition);
final long offset = position.offset;
final long endOffset = endOffsets.get(partition);
metadata.put(
partition,
new ConsumerRecords.Metadata(System.currentTimeMillis(), offset, endOffset)
);
}
}

return new ConsumerRecords<>(results, metadata);
}

public synchronized void addRecord(ConsumerRecord<K, V> record) {
Expand All @@ -229,6 +243,7 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
recs.add(record);
endOffsets.compute(tp, (ignore, offset) -> offset == null ? record.offset() : Math.max(offset, record.offset()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FetchedRecords<K, V> {
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, FetchMetadata> metadata;

public static final class FetchMetadata {

private final long receivedTimestamp;
private final SubscriptionState.FetchPosition position;
private final Long endOffset;

public FetchMetadata(final long receivedTimestamp,
final SubscriptionState.FetchPosition position,
final Long endOffset) {
this.receivedTimestamp = receivedTimestamp;
this.position = position;
this.endOffset = endOffset;
}

public long receivedTimestamp() {
return receivedTimestamp;
}

public SubscriptionState.FetchPosition position() {
return position;
}

public Long endOffset() {
return endOffset;
}

@Override
public String toString() {
return "FetchMetadata{" +
"receivedTimestamp=" + receivedTimestamp +
", position=" + position +
", endOffset=" + endOffset +
'}';
}
}

public FetchedRecords() {
records = new HashMap<>();
metadata = new HashMap<>();
}

public void addRecords(final TopicPartition topicPartition, final List<ConsumerRecord<K, V>> records) {
if (this.records.containsKey(topicPartition)) {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
final List<ConsumerRecord<K, V>> currentRecords = this.records.get(topicPartition);
final List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
this.records.put(topicPartition, newRecords);
} else {
this.records.put(topicPartition, records);
}
}

public Map<TopicPartition, List<ConsumerRecord<K, V>>> records() {
return records;
}

public void addMetadata(final TopicPartition partition, final FetchMetadata fetchMetadata) {
metadata.put(partition, fetchMetadata);
}

public Map<TopicPartition, FetchMetadata> metadata() {
return metadata;
}

public boolean isEmpty() {
return records.isEmpty() && metadata.isEmpty();
}
}
Loading

0 comments on commit fdcf8fb

Please sign in to comment.