Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/trunk' into reduce-produc…
Browse files Browse the repository at this point in the history
…e-allocations-lz4

* apache-github/trunk: (43 commits)
  KAFKA-12800: Configure generator to fail on trailing JSON tokens (apache#10717)
  MINOR: clarify message ordering with max in-flight requests and idempotent producer (apache#10690)
  MINOR: Add log identifier/prefix printing in Log layer static functions (apache#10742)
  MINOR: update java doc for deprecated methods (apache#10722)
  MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (apache#10703)
  KAFKA-12499: add transaction timeout verification (apache#10482)
  KAFKA-12620 Allocate producer ids on the controller (apache#10504)
  MINOR: Kafka Streams code samples formating unification (apache#10651)
  KAFKA-12808: Remove Deprecated Methods under StreamsMetrics (apache#10724)
  KAFKA-12522: Cast SMT should allow null value records to pass through (apache#10375)
  KAFKA-12820: Upgrade maven-artifact dependency to resolve CVE-2021-26291
  HOTFIX: fix checkstyle issue in KAFKA-12697
  KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (apache#10572)
  KAFKA-12342: Remove MetaLogShim and use RaftClient directly (apache#10705)
  KAFKA-12779: KIP-740, Clean up public API in TaskId and fix TaskMetadata#taskId() (apache#10735)
  KAFKA-12814: Remove Deprecated Method StreamsConfig getConsumerConfigs (apache#10737)
  MINOR: Eliminate redundant functions in LogTest suite (apache#10732)
  MINOR: Remove unused maxProducerIdExpirationMs parameter in Log constructor (apache#10723)
  MINOR: Updating files with release 2.7.1 (apache#10660)
  KAFKA-12809: Remove deprecated methods of Stores factory (apache#10729)
  ...
  • Loading branch information
ijuma committed May 26, 2021
2 parents 00c29d0 + 63b6db3 commit 9daeb3c
Show file tree
Hide file tree
Showing 325 changed files with 7,382 additions and 5,376 deletions.
4 changes: 2 additions & 2 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ netty-transport-native-unix-common-4.1.59.Final
plexus-utils-3.2.1
rocksdbjni-5.18.4
scala-collection-compat_2.13-2.3.0
scala-library-2.13.5
scala-library-2.13.6
scala-logging_2.13-3.9.2
scala-reflect-2.13.5
scala-reflect-2.13.6
scala-java8-compat_2.13-0.9.1
snappy-java-1.1.8.1
zookeeper-3.5.9
Expand Down
2 changes: 1 addition & 1 deletion bin/kafka-run-class.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ should_include_file() {
base_dir=$(dirname $0)/..

if [ -z "$SCALA_VERSION" ]; then
SCALA_VERSION=2.13.5
SCALA_VERSION=2.13.6
if [[ -f "$base_dir/gradle.properties" ]]; then
SCALA_VERSION=`grep "^scalaVersion=" "$base_dir/gradle.properties" | cut -d= -f 2`
fi
Expand Down
2 changes: 1 addition & 1 deletion bin/windows/kafka-run-class.bat
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ set BASE_DIR=%CD%
popd

IF ["%SCALA_VERSION%"] EQU [""] (
set SCALA_VERSION=2.13.5
set SCALA_VERSION=2.13.6
)

IF ["%SCALA_BINARY_VERSION%"] EQU [""] (
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,7 @@ project(':metadata') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
implementation project(':raft')
implementation libs.jacksonDatabind
implementation libs.jacksonJDK8Datatypes
implementation libs.metrics
Expand All @@ -1077,6 +1078,7 @@ project(':metadata') {
testImplementation libs.hamcrest
testImplementation libs.slf4jlog4j
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':raft').sourceSets.test.output
}

task processMessages(type:JavaExec) {
Expand Down Expand Up @@ -1267,7 +1269,6 @@ project(':raft') {
dependencies {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation libs.slf4jApi
implementation libs.jacksonDatabind

Expand Down
10 changes: 8 additions & 2 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,15 @@
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.controller" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
</subpackage>
Expand All @@ -233,6 +236,9 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
Expand All @@ -242,6 +248,8 @@
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.raft" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
Expand Down Expand Up @@ -291,7 +299,6 @@
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.metadata"/>
<allow pkg="org.apache.kafka.metalog"/>
<allow pkg="org.apache.kafka.queue"/>
<allow pkg="org.apache.kafka.raft"/>
<allow pkg="org.apache.kafka.server.common" />
Expand Down Expand Up @@ -417,7 +424,6 @@
<allow pkg="org.apache.kafka.common.record" />
<allow pkg="org.apache.kafka.common.requests" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.test"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1089,8 +1089,8 @@ public void handleSuccessfulResponse(RequestHeader requestHeader, long now, Meta
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);

// Don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
// created which means we will get errors and no nodes until it exists
// When talking to the startup phase of a broker, it is possible to receive an empty metadata set, which
// we should retry later.
if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,19 +524,13 @@ private TransactionManager configureTransactionState(ProducerConfig config,
final String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
final int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
final boolean autoDowngradeTxnCommit = config.getBoolean(ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT);
// Only log a warning if being used outside of Streams, which we know includes "StreamThread-" in the client id
if (autoDowngradeTxnCommit && !clientId.contains("StreamThread-")) {
log.warn("The configuration parameter `{}` is internal and not intended for public use, it will be " +
"removed in 4.0", ProducerConfig.AUTO_DOWNGRADE_TXN_COMMIT);
}
transactionManager = new TransactionManager(
logContext,
transactionalId,
transactionTimeoutMs,
retryBackoffMs,
apiVersions,
autoDowngradeTxnCommit);
apiVersions
);

if (transactionManager.isTransactional())
log.info("Instantiated a transactional producer.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void beginTransaction() throws ProducerFencedException {
this.sentOffsets = false;
}

@SuppressWarnings("deprecation")
@Deprecated
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
/**
* Partitioner Interface
*/

public interface Partitioner extends Configurable, Closeable {

/**
Expand All @@ -37,13 +36,12 @@ public interface Partitioner extends Configurable, Closeable {
* @param valueBytes The serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

/**
* This is called when partitioner is closed.
*/
public void close();

void close();

/**
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
Expand All @@ -52,6 +50,6 @@ public interface Partitioner extends Configurable, Closeable {
* @param cluster The current cluster metadata
* @param prevPartition The partition previously selected for the record that triggered a new batch
*/
default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ public class ProducerConfig extends AbstractConfig {
/** <code>max.in.flight.requests.per.connection</code> */
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
+ " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of"
+ " message re-ordering due to retries (i.e., if retries are enabled).";
+ " Note that if this config is set to be greater than 1 and <code>enable.idempotence</code> is set to false, there is a risk of"
+ " message re-ordering after a failed send due to retries (i.e., if retries are enabled).";

/** <code>retries</code> */
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
Expand Down Expand Up @@ -246,10 +246,10 @@ public class ProducerConfig extends AbstractConfig {
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
+ "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to 5, "
+ "<code>" + RETRIES_CONFIG + "</code> to be greater than 0 and <code>" + ACKS_CONFIG + "</code> must be 'all'. If these values "
+ "are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, "
+ "a <code>ConfigException</code> will be thrown.";
+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to 5 "
+ "(with message ordering preserved for any allowable value), <code>" + RETRIES_CONFIG + "</code> to be greater than 0, and <code>"
+ ACKS_CONFIG + "</code> must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible "
+ "values are set, a <code>ConfigException</code> will be thrown.";

/** <code> transaction.timeout.ms </code> */
public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
Expand All @@ -269,21 +269,6 @@ public class ProducerConfig extends AbstractConfig {
public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;

/**
* <code>internal.auto.downgrade.txn.commit</code>
* Whether or not the producer should automatically downgrade the transactional commit request when the new group metadata
* feature is not supported by the broker.
* <p>
* The purpose of this flag is to make Kafka Streams being capable of working with old brokers when applying this new API.
* Non Kafka Streams users who are building their own EOS applications should be careful playing around
* with config as there is a risk of violating EOS semantics when turning on this flag.
*
* <p>
* Note: this is an internal configuration and could be changed in the future in a backward incompatible way
*
*/
static final String AUTO_DOWNGRADE_TXN_COMMIT = "internal.auto.downgrade.txn.commit";

private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);

static {
Expand Down Expand Up @@ -424,11 +409,7 @@ public class ProducerConfig extends AbstractConfig {
null,
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC)
.defineInternal(AUTO_DOWNGRADE_TXN_COMMIT,
Type.BOOLEAN,
false,
Importance.LOW);
TRANSACTIONAL_ID_DOC);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

/*
* A thread-safe helper class to hold batches that haven't been acknowledged yet (including those
Expand Down Expand Up @@ -51,6 +52,12 @@ public Iterable<ProducerBatch> copyAll() {
}
}

public Iterable<ProduceRequestResult> requestResults() {
synchronized (incomplete) {
return incomplete.stream().map(batch -> batch.produceFuture).collect(Collectors.toList());
}
}

public boolean isEmpty() {
synchronized (incomplete) {
return incomplete.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,12 @@ private boolean appendsInProgress() {
*/
public void awaitFlushCompletion() throws InterruptedException {
try {
for (ProducerBatch batch : this.incomplete.copyAll())
batch.produceFuture.await();
// Obtain a copy of all of the incomplete ProduceRequestResult(s) at the time of the flush.
// We must be careful not to hold a reference to the ProduceBatch(s) so that garbage
// collection can occur on the contents.
// The sender will remove ProducerBatch(s) from the original incomplete collection.
for (ProduceRequestResult result : this.incomplete.requestResults())
result.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public class TransactionManager {
private final String transactionalId;
private final int transactionTimeoutMs;
private final ApiVersions apiVersions;
private final boolean autoDowngradeTxnCommit;

private static class TopicPartitionBookkeeper {

Expand Down Expand Up @@ -304,8 +303,7 @@ public TransactionManager(final LogContext logContext,
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
final ApiVersions apiVersions,
final boolean autoDowngradeTxnCommit) {
final ApiVersions apiVersions) {
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.transactionalId = transactionalId;
this.log = logContext.logger(TransactionManager.class);
Expand All @@ -322,7 +320,6 @@ public TransactionManager(final LogContext logContext,
this.retryBackoffMs = retryBackoffMs;
this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
this.apiVersions = apiVersions;
this.autoDowngradeTxnCommit = autoDowngradeTxnCommit;
}

public synchronized TransactionalRequestResult initializeTransactions() {
Expand Down Expand Up @@ -1179,8 +1176,7 @@ private TxnOffsetCommitHandler txnOffsetCommitHandler(TransactionalRequestResult
pendingTxnOffsetCommits,
groupMetadata.memberId(),
groupMetadata.generationId(),
groupMetadata.groupInstanceId(),
autoDowngradeTxnCommit
groupMetadata.groupInstanceId()
);
return new TxnOffsetCommitHandler(result, builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,11 @@ else if (value instanceof Class)
* The config types
*/
public enum Type {
BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD;

public boolean isSensitive() {
return this == PASSWORD;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public enum ApiKeys {
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false),
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true),
DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS);
LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, false);

private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return DescribeTransactionsRequest.parse(buffer, apiVersion);
case LIST_TRANSACTIONS:
return ListTransactionsRequest.parse(buffer, apiVersion);
case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return DescribeTransactionsResponse.parse(responseBuffer, version);
case LIST_TRANSACTIONS:
return ListTransactionsResponse.parse(responseBuffer, version);
case ALLOCATE_PRODUCER_IDS:
return AllocateProducerIdsResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Loading

0 comments on commit 9daeb3c

Please sign in to comment.