Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/trunk' into gradle-7.0-ready
Browse files Browse the repository at this point in the history
* apache-github/trunk: (37 commits)
  KAFKA-10357: Extract setup of changelog from Streams partition assignor (apache#10163)
  KAFKA-10251: increase timeout for consuming records (apache#10228)
  KAFKA-12394; Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission (apache#10223)
  MINOR: Disable transactional/idempotent system tests for Raft quorums (apache#10224)
  KAFKA-10766: Unit test cases for RocksDBRangeIterator (apache#9717)
  KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore (apache#10052)
  KAFKA-12268: Implement task idling semantics via currentLag API (apache#10137)
  MINOR: Time and log producer state recovery phases (apache#10241)
  MINOR: correct the error message of validating uint32 (apache#10193)
  MINOR: Format the revoking active log output in `StreamsPartitionAssignor` (apache#10242)
  KAFKA-12323 Follow-up: Refactor the unit test a bit (apache#10205)
  MINOR: Remove stack trace of the lock exception in a debug log4j (apache#10231)
  MINOR: Word count should account for extra whitespaces between words (apache#10229)
  MINOR; Small refactor in `GroupMetadata` (apache#10236)
  KAFKA-10340: Proactively close producer when cancelling source tasks (apache#10016)
  KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist (apache#10141)
  KAFKA-12254: Ensure MM2 creates topics with source topic configs (apache#10217)
  MINOR: fix kafka-metadata-shell.sh (apache#10226)
  KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (apache#10199)
  KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs (apache#8812)
  ...
  • Loading branch information
ijuma committed Mar 2, 2021
2 parents 73c9b0f + a848e0c commit 09ffca7
Show file tree
Hide file tree
Showing 164 changed files with 4,179 additions and 1,778 deletions.
12 changes: 12 additions & 0 deletions bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ do
CLASSPATH="$CLASSPATH":"$file"
done

for file in "$base_dir"/shell/build/libs/kafka-shell*.jar;
do
if should_include_file "$file"; then
CLASSPATH="$CLASSPATH":"$file"
fi
done

for dir in "$base_dir"/shell/build/dependant-libs-${SCALA_VERSION}*;
do
CLASSPATH="$CLASSPATH:$dir/*"
done

for file in "$base_dir"/tools/build/libs/kafka-tools*.jar;
do
if should_include_file "$file"; then
Expand Down
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,6 @@ project(':clients') {
testImplementation libs.bcpkix
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation libs.hamcrest

testRuntimeOnly libs.slf4jlog4j
testRuntimeOnly libs.jacksonDatabind
Expand Down
5 changes: 1 addition & 4 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>

<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark|MockConsumer"/>

<suppress checks="CyclomaticComplexity"
files="MockConsumer"/>
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>

<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -243,6 +244,11 @@ public interface Consumer<K, V> extends Closeable {
*/
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);

/**
* @see KafkaConsumer#currentLag(TopicPartition)
*/
OptionalLong currentLag(TopicPartition topicPartition);

/**
* @see KafkaConsumer#groupMetadata()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
*/
package org.apache.kafka.clients.consumer;

import org.apache.kafka.clients.consumer.internals.FetchedRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -34,99 +32,12 @@
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(
Collections.emptyMap(),
Collections.emptyMap()
);
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.emptyMap());

private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, Metadata> metadata;

public static final class Metadata {

private final long receivedTimestamp;
private final Long position;
private final Long endOffset;

public Metadata(final long receivedTimestamp,
final Long position,
final Long endOffset) {
this.receivedTimestamp = receivedTimestamp;
this.position = position;
this.endOffset = endOffset;
}

/**
* @return The timestamp of the broker response that contained this metadata
*/
public long receivedTimestamp() {
return receivedTimestamp;
}

/**
* @return The next position the consumer will fetch, or null if the consumer has no position.
*/
public Long position() {
return position;
}

/**
* @return The lag between the next position to fetch and the current end of the partition, or
* null if the end offset is not known or there is no position.
*/
public Long lag() {
return endOffset == null || position == null ? null : endOffset - position;
}

/**
* @return The current last offset in the partition. The determination of the "last" offset
* depends on the Consumer's isolation level. Under "read_uncommitted," this is the last successfully
* replicated offset plus one. Under "read_committed," this is the minimum of the last successfully
* replicated offset plus one or the smallest offset of any open transaction. Null if the end offset
* is not known.
*/
public Long endOffset() {
return endOffset;
}

@Override
public String toString() {
return "Metadata{" +
"receivedTimestamp=" + receivedTimestamp +
", position=" + position +
", endOffset=" + endOffset +
'}';
}
}

private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final FetchedRecords<K, V> fetchedRecords) {
final Map<TopicPartition, Metadata> metadata = new HashMap<>();
for (final Map.Entry<TopicPartition, FetchedRecords.FetchMetadata> entry : fetchedRecords.metadata().entrySet()) {
metadata.put(
entry.getKey(),
new Metadata(
entry.getValue().receivedTimestamp(),
entry.getValue().position() == null ? null : entry.getValue().position().offset,
entry.getValue().endOffset()
)
);
}
return metadata;
}

public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this.records = records;
this.metadata = new HashMap<>();
}

public ConsumerRecords(final Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
final Map<TopicPartition, Metadata> metadata) {
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
this.records = records;
this.metadata = metadata;
}

ConsumerRecords(final FetchedRecords<K, V> fetchedRecords) {
this(fetchedRecords.records(), extractMetadata(fetchedRecords));
}

/**
Expand All @@ -142,16 +53,6 @@ public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
return Collections.unmodifiableList(recs);
}

/**
* Get the updated metadata returned by the brokers along with this record set.
* May be empty or partial depending on the responses from the broker during this particular poll.
* May also include metadata for additional partitions than the ones for which there are records
* in this {@code ConsumerRecords} object.
*/
public Map<TopicPartition, Metadata> metadata() {
return Collections.unmodifiableMap(metadata);
}

/**
* Get just the records for the given topic
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.FetchedRecords;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
Expand Down Expand Up @@ -74,6 +73,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -578,6 +578,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Deserializer<V> valueDeserializer;
private final Fetcher<K, V> fetcher;
private final ConsumerInterceptors<K, V> interceptors;
private final IsolationLevel isolationLevel;

private final Time time;
private final ConsumerNetworkClient client;
Expand Down Expand Up @@ -736,7 +737,7 @@ public KafkaConsumer(Map<String, Object> configs,

FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config, time, logContext);
IsolationLevel isolationLevel = IsolationLevel.valueOf(
this.isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
Expand Down Expand Up @@ -849,6 +850,7 @@ public KafkaConsumer(Map<String, Object> configs,
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.fetcher = fetcher;
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
this.client = client;
Expand Down Expand Up @@ -1235,7 +1237,7 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
}
}

final FetchedRecords<K, V> records = pollForFetches(timer);
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
Expand Down Expand Up @@ -1269,12 +1271,12 @@ boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitFo
/**
* @throws KafkaException if the rebalance callback throws exception
*/
private FetchedRecords<K, V> pollForFetches(Timer timer) {
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

// if data is available already, return it immediately
final FetchedRecords<K, V> records = fetcher.fetchedRecords();
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
Expand Down Expand Up @@ -2219,6 +2221,30 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
}
}

/**
* Get the consumer's current lag on the partition. Returns an "empty" {@link OptionalLong} if the lag is not known,
* for example if there is no position yet, or if the end offset is not known yet.
*
* <p>
* This method uses locally cached metadata and never makes a remote call.
*
* @param topicPartition The partition to get the lag for.
*
* @return This {@code Consumer} instance's current lag for the given partition.
*
* @throws IllegalStateException if the {@code topicPartition} is not assigned
**/
@Override
public OptionalLong currentLag(TopicPartition topicPartition) {
acquireAndEnsureOpen();
try {
final Long lag = subscriptions.partitionLag(topicPartition, isolationLevel);
return lag == null ? OptionalLong.empty() : OptionalLong.of(lag);
} finally {
release();
}
}

/**
* Return the current group metadata associated with this consumer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static java.util.Collections.singleton;


/**
* A mock of the {@link Consumer} interface you can use for testing code that uses Kafka. This class is <i> not
Expand Down Expand Up @@ -218,21 +221,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
}

toClear.forEach(p -> this.records.remove(p));

final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>();
for (final TopicPartition partition : subscriptions.assignedPartitions()) {
if (subscriptions.hasValidPosition(partition) && endOffsets.containsKey(partition)) {
final SubscriptionState.FetchPosition position = subscriptions.position(partition);
final long offset = position.offset;
final long endOffset = endOffsets.get(partition);
metadata.put(
partition,
new ConsumerRecords.Metadata(System.currentTimeMillis(), offset, endOffset)
);
}
}

return new ConsumerRecords<>(results, metadata);
return new ConsumerRecords<>(results);
}

public synchronized void addRecord(ConsumerRecord<K, V> record) {
Expand All @@ -243,7 +232,6 @@ public synchronized void addRecord(ConsumerRecord<K, V> record) {
throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
List<ConsumerRecord<K, V>> recs = this.records.computeIfAbsent(tp, k -> new ArrayList<>());
recs.add(record);
endOffsets.compute(tp, (ignore, offset) -> offset == null ? record.offset() : Math.max(offset, record.offset()));
}

/**
Expand Down Expand Up @@ -318,7 +306,7 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
@Deprecated
@Override
public synchronized OffsetAndMetadata committed(final TopicPartition partition) {
return committed(Collections.singleton(partition)).get(partition);
return committed(singleton(partition)).get(partition);
}

@Deprecated
Expand Down Expand Up @@ -556,6 +544,16 @@ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partition
return endOffsets(partitions);
}

@Override
public OptionalLong currentLag(TopicPartition topicPartition) {
if (endOffsets.containsKey(topicPartition)) {
return OptionalLong.of(endOffsets.get(topicPartition) - position(topicPartition));
} else {
// if the test doesn't bother to set an end offset, we assume it wants to model being caught up.
return OptionalLong.of(0L);
}
}

@Override
public ConsumerGroupMetadata groupMetadata() {
return new ConsumerGroupMetadata("dummy.group.id", 1, "1", Optional.empty());
Expand Down
Loading

0 comments on commit 09ffca7

Please sign in to comment.