Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT SQUASH]: revert KIP-695 #10119

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.consumer.internals.FetchedRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -34,99 +32,12 @@
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(
Collections.emptyMap(),
Collections.emptyMap()
);
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.emptyMap());

private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, Metadata> metadata;

public static final class Metadata {

private final long receivedTimestamp;
private final Long position;
private final Long endOffset;

public Metadata(final long receivedTimestamp,
final Long position,
final Long endOffset) {
this.receivedTimestamp = receivedTimestamp;
this.position = position;
this.endOffset = endOffset;
}

/**
* @return The timestamp of the broker response that contained this metadata
*/
public long receivedTimestamp() {
return receivedTimestamp;
}

/**
* @return The next position the consumer will fetch, or null if the consumer has no position.
*/
public Long position() {
return position;
}

/**
* @return The lag between the next position to fetch and the current end of the partition, or
* null if the end offset is not known or there is no position.
*/
public Long lag() {
return endOffset == null || position == null ? null : endOffset - position;
}

/**
* @return The current last offset in the partition. The determination of the "last" offset
* depends on the Consumer's isolation level. Under "read_uncommitted," this is the last successfully
* replicated offset plus one. Under "read_committed," this is the minimum of the last successfully
* replicated offset plus one or the smallest offset of any open transaction. Null if the end offset
* is not known.
*/
public Long endOffset() {
return endOffset;
}

@Override
public String toString() {
return "Metadata{" +
"receivedTimestamp=" + receivedTimestamp +
", position=" + position +
", endOffset=" + endOffset +
'}';
}
}

private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final FetchedRecords<K, V> fetchedRecords) {
final Map<TopicPartition, Metadata> metadata = new HashMap<>();
for (final Map.Entry<TopicPartition, FetchedRecords.FetchMetadata> entry : fetchedRecords.metadata().entrySet()) {
metadata.put(
entry.getKey(),
new Metadata(
entry.getValue().receivedTimestamp(),
entry.getValue().position() == null ? null : entry.getValue().position().offset,
entry.getValue().endOffset()
)
);
}
return metadata;
}

public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this.records = records;
this.metadata = new HashMap<>();
}

public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
final Map<TopicPartition, Metadata> metadata) {
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this.records = records;
this.metadata = metadata;
}

ConsumerRecords(final FetchedRecords<K, V> fetchedRecords) {
this(fetchedRecords.records(), extractMetadata(fetchedRecords));
}

/**
Expand All @@ -142,16 +53,6 @@ public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
return Collections.unmodifiableList(recs);
}

/**
* Get the updated metadata returned by the brokers along with this record set.
* May be empty or partial depending on the responses from the broker during this particular poll.
* May also include metadata for additional partitions than the ones for which there are records
* in this {@code ConsumerRecords} object.
*/
public Map<TopicPartition, Metadata> metadata() {
return Collections.unmodifiableMap(metadata);
}

/**
* Get just the records for the given topic
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.FetchedRecords;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
Expand Down Expand Up @@ -1235,7 +1234,7 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
}
}

final FetchedRecords<K, V> records = pollForFetches(timer);
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
Expand Down Expand Up @@ -1269,12 +1268,12 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitFo
/**
* @throws KafkaException if the rebalance callback throws exception
*/
private FetchedRecords<K, V> pollForFetches(Timer timer) {
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

// if data is available already, return it immediately
final FetchedRecords<K, V> records = fetcher.fetchedRecords();
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,21 +218,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
}

toClear.forEach(p -> this.records.remove(p));

final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>();
for (final TopicPartition partition : subscriptions.assignedPartitions()) {
if (subscriptions.hasValidPosition(partition) && endOffsets.containsKey(partition)) {
final SubscriptionState.FetchPosition position = subscriptions.position(partition);
final long offset = position.offset;
final long endOffset = endOffsets.get(partition);
metadata.put(
partition,
new ConsumerRecords.Metadata(System.currentTimeMillis(), offset, endOffset)
);
}
}

return new ConsumerRecords<>(results, metadata);
return new ConsumerRecords<>(results);
}

public synchronized void addRecord(ConsumerRecord<K, V> record) {
Expand All @@ -243,7 +229,6 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
recs.add(record);
endOffsets.compute(tp, (ignore, offset) -> offset == null ? record.offset() : Math.max(offset, record.offset()));
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void onSuccess(ClientResponse resp) {
short responseVersion = resp.requestHeader().apiVersion();

completedFetches.add(new CompletedFetch(partition, partitionData,
metricAggregator, batches, fetchOffset, responseVersion, resp.receivedTimeMs()));
metricAggregator, batches, fetchOffset, responseVersion));
}
}

Expand Down Expand Up @@ -597,8 +597,8 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
* the defaultResetPolicy is NONE
* @throws TopicAuthorizationException If there is TopicAuthorization error in fetchResponse.
*/
public FetchedRecords<K, V> fetchedRecords() {
FetchedRecords<K, V> fetched = new FetchedRecords<>();
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
int recordsRemaining = maxPollRecords;

Expand Down Expand Up @@ -636,28 +636,20 @@ public FetchedRecords<K, V> fetchedRecords() {
} else {
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining);

TopicPartition partition = nextInLineFetch.partition;

// This can be false when a rebalance happened before fetched records
// are returned to the consumer's poll call
if (subscriptions.isAssigned(partition)) {

// initializeCompletedFetch, above, has already persisted the metadata from the fetch in the
// SubscriptionState, so we can just read it out, which in particular lets us re-use the logic
// for determining the end offset
final long receivedTimestamp = nextInLineFetch.receivedTimestamp;
final Long endOffset = subscriptions.logEndOffset(partition, isolationLevel);
final FetchPosition fetchPosition = subscriptions.position(partition);

final FetchedRecords.FetchMetadata metadata =
new FetchedRecords.FetchMetadata(receivedTimestamp, fetchPosition, endOffset);

fetched.addMetadata(partition, metadata);

}

if (!records.isEmpty()) {
fetched.addRecords(partition, records);
TopicPartition partition = nextInLineFetch.partition;
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
Expand Down Expand Up @@ -1466,7 +1458,6 @@ private class CompletedFetch {
private final FetchResponse.PartitionData<Records> partitionData;
private final FetchResponseMetricAggregator metricAggregator;
private final short responseVersion;
private final long receivedTimestamp;

private int recordsRead;
private int bytesRead;
Expand All @@ -1485,15 +1476,13 @@ private CompletedFetch(TopicPartition partition,
FetchResponseMetricAggregator metricAggregator,
Iterator<? extends RecordBatch> batches,
Long fetchOffset,
short responseVersion,
final long receivedTimestamp) {
short responseVersion) {
this.partition = partition;
this.partitionData = partitionData;
this.metricAggregator = metricAggregator;
this.batches = batches;
this.nextFetchOffset = fetchOffset;
this.responseVersion = responseVersion;
this.receivedTimestamp = receivedTimestamp;
this.lastEpoch = Optional.empty();
this.abortedProducerIds = new HashSet<>();
this.abortedTransactions = abortedTransactions(partitionData);
Expand Down
Loading