Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12268: Implement task idling semantics via currentLag API #10137

Merged
merged 3 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,6 @@ project(':clients') {
testCompile libs.bcpkix
testCompile libs.junitJupiter
testCompile libs.mockitoCore
testCompile libs.hamcrest

testRuntime libs.slf4jlog4j
testRuntime libs.jacksonDatabind
Expand Down
5 changes: 1 addition & 4 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>

<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark|MockConsumer"/>

<suppress checks="CyclomaticComplexity"
files="MockConsumer"/>
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>

<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -243,6 +244,11 @@ public interface Consumer<K, V> extends Closeable {
*/
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);

/**
* @see KafkaConsumer#currentLag(TopicPartition)
*/
OptionalLong currentLag(TopicPartition topicPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon me, KIP-695 does not include this change. It seems KIP-695 is still based on metadata? Please correct me If I misunderstand anything :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah, you are fast, @chia7712 !

I just sent a message to the vote thread. I wanted to submit this PR first so that the vote thread message can have the full context available.

Do you mind reading over what I said there? If it sounds good to you, then I'll update the KIP, and we can maybe put this whole mess to bed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question, should the API take a Collection<TopicPartition> like other APIs?

Copy link
Contributor Author

@vvcephei vvcephei Feb 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ijuma, I considered it, but decided on the current API because:

  1. This is a very quick, local in-memory lookup, so there's no reason to batch multiple requests in one
  2. It complicates the return type. We'd have to return either a Map<TP, Long>, with mappings missing for unknown lags (which creates unfortunate null semantics for users), or a Map<TP, OptionalLong> which creates a complex-to-understand two hop lookup (lag:=result.get(tp).get()). Or else, we could return a more complex domain object object like @chia7712 proposed in the mailing list. All these complications seem like unnecessary complexity in the case of this particular API, given the first point.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no batching benefit, then the simpler API makes sense. @hachikuji Any reason why batching could be useful here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For API calls that may incur a broker round trip, have batching of partitions makes sense. For this API I think single partition lookup is good enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we concern that users may call this function too frequent looping a large number of partitions, and each call is synchronizing on the subscription state, then maybe we can make it in a batching mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, @guozhangwang. Entering the synchronized block will have some overhead each time it's called.

I think we can just reason about the use cases here. My guess is that people would either tend to spot-check specific lags, as we are doing here, or they would tend to periodically check all lags. In the former case, I'd hazard that the current API is fine. In the latter case, we'd face more overhead. I'm sure this is motivated reasoning, but perhaps we can lump the latter case in with @chia7712 's suggestion to expose more metadata and defer it to the future.


/**
* @see KafkaConsumer#groupMetadata()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.consumer.internals.FetchedRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -34,99 +32,12 @@
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(
Collections.emptyMap(),
Collections.emptyMap()
);
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.emptyMap());

private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, Metadata> metadata;

public static final class Metadata {

private final long receivedTimestamp;
private final Long position;
private final Long endOffset;

public Metadata(final long receivedTimestamp,
final Long position,
final Long endOffset) {
this.receivedTimestamp = receivedTimestamp;
this.position = position;
this.endOffset = endOffset;
}

/**
* @return The timestamp of the broker response that contained this metadata
*/
public long receivedTimestamp() {
return receivedTimestamp;
}

/**
* @return The next position the consumer will fetch, or null if the consumer has no position.
*/
public Long position() {
return position;
}

/**
* @return The lag between the next position to fetch and the current end of the partition, or
* null if the end offset is not known or there is no position.
*/
public Long lag() {
return endOffset == null || position == null ? null : endOffset - position;
}

/**
* @return The current last offset in the partition. The determination of the "last" offset
* depends on the Consumer's isolation level. Under "read_uncommitted," this is the last successfully
* replicated offset plus one. Under "read_committed," this is the minimum of the last successfully
* replicated offset plus one or the smallest offset of any open transaction. Null if the end offset
* is not known.
*/
public Long endOffset() {
return endOffset;
}

@Override
public String toString() {
return "Metadata{" +
"receivedTimestamp=" + receivedTimestamp +
", position=" + position +
", endOffset=" + endOffset +
'}';
}
}

private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final FetchedRecords<K, V> fetchedRecords) {
final Map<TopicPartition, Metadata> metadata = new HashMap<>();
for (final Map.Entry<TopicPartition, FetchedRecords.FetchMetadata> entry : fetchedRecords.metadata().entrySet()) {
metadata.put(
entry.getKey(),
new Metadata(
entry.getValue().receivedTimestamp(),
entry.getValue().position() == null ? null : entry.getValue().position().offset,
entry.getValue().endOffset()
)
);
}
return metadata;
}

public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this.records = records;
this.metadata = new HashMap<>();
}

public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
final Map<TopicPartition, Metadata> metadata) {
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this.records = records;
this.metadata = metadata;
}

ConsumerRecords(final FetchedRecords<K, V> fetchedRecords) {
this(fetchedRecords.records(), extractMetadata(fetchedRecords));
}

/**
Expand All @@ -142,16 +53,6 @@ public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
return Collections.unmodifiableList(recs);
}

/**
* Get the updated metadata returned by the brokers along with this record set.
* May be empty or partial depending on the responses from the broker during this particular poll.
* May also include metadata for additional partitions than the ones for which there are records
* in this {@code ConsumerRecords} object.
*/
public Map<TopicPartition, Metadata> metadata() {
return Collections.unmodifiableMap(metadata);
}

/**
* Get just the records for the given topic
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.FetchedRecords;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
Expand Down Expand Up @@ -74,6 +73,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -578,6 +578,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Deserializer<V> valueDeserializer;
private final Fetcher<K, V> fetcher;
private final ConsumerInterceptors<K, V> interceptors;
private final IsolationLevel isolationLevel;

private final Time time;
private final ConsumerNetworkClient client;
Expand Down Expand Up @@ -736,7 +737,7 @@ public KafkaConsumer(Map<String, Object> configs,

FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
IsolationLevel isolationLevel = IsolationLevel.valueOf(
this.isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
Expand Down Expand Up @@ -849,6 +850,7 @@ public KafkaConsumer(Map<String, Object> configs,
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.fetcher = fetcher;
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
this.client = client;
Expand Down Expand Up @@ -1235,7 +1237,7 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
}
}

final FetchedRecords<K, V> records = pollForFetches(timer);
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
Expand Down Expand Up @@ -1269,12 +1271,12 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitFo
/**
* @throws KafkaException if the rebalance callback throws exception
*/
private FetchedRecords<K, V> pollForFetches(Timer timer) {
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

// if data is available already, return it immediately
final FetchedRecords<K, V> records = fetcher.fetchedRecords();
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
Expand Down Expand Up @@ -2219,6 +2221,30 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
}
}

/**
* Get the consumer's current lag on the partition. Returns an "empty" {@link OptionalLong} if the lag is not known,
* for example if there is no position yet, or if the end offset is not known yet.
*
* <p>
* This method uses locally cached metadata and never makes a remote call.
*
* @param topicPartition The partition to get the lag for.
*
* @return This {@code Consumer} instance's current lag for the given partition.
*
* @throws IllegalStateException if the {@code topicPartition} is not assigned
**/
@Override
public OptionalLong currentLag(TopicPartition topicPartition) {
acquireAndEnsureOpen();
try {
final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
} finally {
release();
}
}

/**
* Return the current group metadata associated with this consumer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static java.util.Collections.singleton;


/**
* A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
Expand Down Expand Up @@ -218,21 +221,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
}

toClear.forEach(p -> this.records.remove(p));

final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>();
for (final TopicPartition partition : subscriptions.assignedPartitions()) {
if (subscriptions.hasValidPosition(partition) && endOffsets.containsKey(partition)) {
final SubscriptionState.FetchPosition position = subscriptions.position(partition);
final long offset = position.offset;
final long endOffset = endOffsets.get(partition);
metadata.put(
partition,
new ConsumerRecords.Metadata(System.currentTimeMillis(), offset, endOffset)
);
}
}

return new ConsumerRecords<>(results, metadata);
return new ConsumerRecords<>(results);
}

public synchronized void addRecord(ConsumerRecord<K, V> record) {
Expand All @@ -243,7 +232,6 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
recs.add(record);
endOffsets.compute(tp, (ignore, offset) -> offset == null ? record.offset() : Math.max(offset, record.offset()));
}

/**
Expand Down Expand Up @@ -318,7 +306,7 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
@Deprecated
@Override
public synchronized OffsetAndMetadata committed(final TopicPartition partition) {
return committed(Collections.singleton(partition)).get(partition);
return committed(singleton(partition)).get(partition);
}

@Deprecated
Expand Down Expand Up @@ -556,6 +544,16 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
return endOffsets(partitions);
}

@Override
public OptionalLong currentLag(TopicPartition topicPartition) {
if (endOffsets.containsKey(topicPartition)) {
return OptionalLong.of(endOffsets.get(topicPartition) - position(topicPartition));
} else {
// if the test doesn't bother to set an end offset, we assume it wants to model being caught up.
return OptionalLong.of(0L);
}
}

@Override
public ConsumerGroupMetadata groupMetadata() {
return new ConsumerGroupMetadata("dummy.group.id", 1, "1", Optional.empty());
Expand Down
Loading