From aa25176e77c7643d8c7ca5c6ded797d9fe315aee Mon Sep 17 00:00:00 2001
From: Josep Prat Invoke the application reset tool from the command line Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with The tool accepts the following parameters: Set the parameters. For example: You would then define the custom timestamp extractor in your Streams configuration as follows: The RocksDB configuration. Kafka Streams uses RocksDB as the default storage engine for persistent stores. To change the default
configuration for RocksDB, you can implement Here is an example that adjusts the memory size consumed by RocksDB.Step 1: Run the application reset tool
--dry-run
to preview your changes before making them.<path-to-kafka>/bin/kafka-streams-application-reset
<path-to-kafka>/bin/kafka-streams-application-reset
Option
(* = required) Description
+
Option (* = required) Description
--------------------- -----------
* --application-id <String: id> The Kafka Streams application ID
- (application.id).
+ (application.id).
--bootstrap-servers <String: urls> Comma-separated list of broker urls with
format: HOST1:PORT1,HOST2:PORT2
- (default: localhost:9092)
---by-duration <String: urls> Reset offsets to offset by duration from
- current timestamp. Format: 'PnDTnHnMnS'
+ (default: localhost:9092)
+--by-duration <String: urls> Reset offsets to offset by duration from
+ current timestamp. Format: 'PnDTnHnMnS'
--config-file <String: file name> Property file containing configs to be
passed to admin clients and embedded
consumer.
--dry-run Display the actions that would be
performed without executing the reset
commands.
---from-file <String: urls> Reset offsets to values defined in CSV
+--from-file <String: urls> Reset offsets to values defined in CSV
file.
--input-topics <String: list> Comma-separated list of user input
topics. For these topics, the tool will
reset the offset to the earliest
available offset.
--intermediate-topics <String: list> Comma-separated list of intermediate user
- topics (topics used in the through()
- method). For these topics, the tool
+ topics (topics used in the through()
+ method). For these topics, the tool
will skip to the end.
--internal-topics <String: list> Comma-separated list of internal topics
to delete. Must be a subset of the
internal topics marked for deletion by
the default behaviour (do a dry-run without
this option to view these topics).
---shift-by <Long: number-of-offsets> Reset offsets shifting current offset by
- 'n', where 'n' can be positive or
+--shift-by <Long: number-of-offsets> Reset offsets shifting current offset by
+ 'n', where 'n' can be positive or
negative
--to-datetime <String> Reset offsets to offset from datetime.
- Format: 'YYYY-MM-DDTHH:mm:SS.sss'
+ Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long> Reset offsets to a specific offset.
@@ -125,8 +124,7 @@
Step 1: Run the application reset toolCreate a
java.util.Properties
instance.
import java.util.Properties;
-import org.apache.kafka.streams.StreamsConfig;
-
-Properties settings = new Properties();
-// Set a few key parameters
-settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
-settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
-// Any further settings
-settings.put(... , ...);
import java.util.Properties;
+import org.apache.kafka.streams.StreamsConfig;
+
+Properties settings = new Properties();
+// Set a few key parameters
+settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
+settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
+// Any further settings
+settings.put(... , ...);
acceptable.recovery.lag
-
public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
- KafkaProducer<byte[], byte[]> dlqProducer;
- String dlqTopic;
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ dlqProducer = .. // get a producer from the configs map
+ dlqTopic = .. // get the topic name from the configs map
+ }
+}public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
+ KafkaProducer<byte[], byte[]> dlqProducer;
+ String dlqTopic;
- @Override
- public DeserializationHandlerResponse handle(final ProcessorContext context,
- final ConsumerRecord<byte[], byte[]> record,
- final Exception exception) {
+ @Override
+ public DeserializationHandlerResponse handle(final ProcessorContext context,
+ final ConsumerRecord<byte[], byte[]> record,
+ final Exception exception) {
- log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
- "taskId: {}, topic: {}, partition: {}, offset: {}",
- context.taskId(), record.topic(), record.partition(), record.offset(),
- exception);
+ log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
+ "taskId: {}, topic: {}, partition: {}, offset: {}",
+ context.taskId(), record.topic(), record.partition(), record.offset(),
+ exception);
- dlqProducer.send(new ProducerRecord<>(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
+ dlqProducer.send(new ProducerRecord<>(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
- return DeserializationHandlerResponse.CONTINUE;
- }
+ return DeserializationHandlerResponse.CONTINUE;
+ }
- @Override
- public void configure(final Map<String, ?> configs) {
- dlqProducer = .. // get a producer from the configs map
- dlqTopic = .. // get the topic name from the configs map
- }
- }
acceptable.recovery.lag
- import java.util.Properties;
- import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.common.errors.RecordTooLargeException;
- import org.apache.kafka.streams.errors.ProductionExceptionHandler;
- import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
+
import java.util.Properties;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
- public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
- public void configure(Map<String, Object> config) {}
+public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
+ public void configure(Map<String, Object> config) {}
- public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
- final Exception exception) {
- if (exception instanceof RecordTooLargeException) {
- return ProductionExceptionHandlerResponse.CONTINUE;
- } else {
- return ProductionExceptionHandlerResponse.FAIL;
- }
- }
- }
+ public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
+ final Exception exception) {
+ if (exception instanceof RecordTooLargeException) {
+ return ProductionExceptionHandlerResponse.CONTINUE;
+ } else {
+ return ProductionExceptionHandlerResponse.FAIL;
+ }
+ }
+}
- Properties settings = new Properties();
+Properties settings = new Properties();
- // other various kafka streams settings, e.g. bootstrap servers, application id, etc
+// other various kafka streams settings, e.g. bootstrap servers, application id, etc
- settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
- IgnoreRecordTooLargeHandler.class);
acceptable.recovery.lagpreviousTimestamp (i.e., a Kafka Streams timestamp estimation). Here is an example of a custom
TimestampExtractor
implementation:
- import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-// Extracts the embedded timestamp of a record (giving you "event-time" semantics).
-public class MyEventTimeExtractor implements TimestampExtractor {
-
- @Override
- public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
- // `Foo` is your own custom class, which we assume has a method that returns
- // the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).
- long timestamp = -1;
- final Foo myPojo = (Foo) record.value();
- if (myPojo != null) {
- timestamp = myPojo.getTimestampInMillis();
- }
- if (timestamp < 0) {
- // Invalid timestamp! Attempt to estimate a new timestamp,
- // otherwise fall back to wall-clock time (processing-time).
- if (previousTimestamp >= 0) {
- return previousTimestamp;
- } else {
- return System.currentTimeMillis();
- }
- }
- }
-
-}
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+// Extracts the embedded timestamp of a record (giving you "event-time" semantics).
+public class MyEventTimeExtractor implements TimestampExtractor {
+
+ @Override
+ public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {
+ // `Foo` is your own custom class, which we assume has a method that returns
+ // the embedded timestamp (milliseconds since midnight, January 1, 1970 UTC).
+ long timestamp = -1;
+ final Foo myPojo = (Foo) record.value();
+ if (myPojo != null) {
+ timestamp = myPojo.getTimestampInMillis();
+ }
+ if (timestamp < 0) {
+ // Invalid timestamp! Attempt to estimate a new timestamp,
+ // otherwise fall back to wall-clock time (processing-time).
+ if (previousTimestamp >= 0) {
+ return previousTimestamp;
+ } else {
+ return System.currentTimeMillis();
+ }
+ }
+ }
+
+}
import java.util.Properties;
-import org.apache.kafka.streams.StreamsConfig;
+
import java.util.Properties;
+import org.apache.kafka.streams.StreamsConfig;
-Properties streamsConfiguration = new Properties();
-streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
-
probing.rebalance.interval.ms
RocksDBConfigSetter
and provide your custom class via rocksdb.config.setter.
-public static class CustomRocksDBConfig implements RocksDBConfigSetter {
- // This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
- private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);
-
- @Override
- public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
- // See #1 below.
- BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
- tableConfig.setBlockCache(cache);
- // See #2 below.
- tableConfig.setBlockSize(16 * 1024L);
- // See #3 below.
- tableConfig.setCacheIndexAndFilterBlocks(true);
- options.setTableFormatConfig(tableConfig);
- // See #4 below.
- options.setMaxWriteBufferNumber(2);
- }
-
- @Override
- public void close(final String storeName, final Options options) {
- // See #5 below.
- cache.close();
- }
-}
-
-Properties streamsSettings = new Properties();
-streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
-
- public static class CustomRocksDBConfig implements RocksDBConfigSetter {
+ // This object should be a member variable so it can be closed in RocksDBConfigSetter#close.
+ private org.rocksdb.Cache cache = new org.rocksdb.LRUCache(16 * 1024L * 1024L);
+
+ @Override
+ public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {
+ // See #1 below.
+ BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();
+ tableConfig.setBlockCache(cache);
+ // See #2 below.
+ tableConfig.setBlockSize(16 * 1024L);
+ // See #3 below.
+ tableConfig.setCacheIndexAndFilterBlocks(true);
+ options.setTableFormatConfig(tableConfig);
+ // See #4 below.
+ options.setMaxWriteBufferNumber(2);
+ }
+
+ @Override
+ public void close(final String storeName, final Options options) {
+ // See #5 below.
+ cache.close();
+ }
+}
+
+Properties streamsSettings = new Properties();
+streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
@@ -798,12 +788,12 @@
Kafka consumers, producer and admin clie
and admin client that are used internally.
The consumer, producer and admin client settings are defined by specifying parameters in a
StreamsConfig
instance.
In this example, the Kafka consumer session timeout is configured to be 60000 milliseconds in the Streams settings:
-Properties streamsSettings = new Properties();
-// Example of a "normal" setting for Kafka Streams
-streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
-// Customize the Kafka consumer settings of your Streams application
-streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
-
Properties streamsSettings = new Properties();
+// Example of a "normal" setting for Kafka Streams
+streamsSettings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-01:9092");
+// Customize the Kafka consumer settings of your Streams application
+streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
+
request.timeout.ms
and retry.backoff.ms
control retries for client request;
retries
are used to configure how many retries are allowed when handling retriable errors from broker request responses.
You can avoid duplicate names by prefix parameter names with consumer.
, producer.
, or admin.
(e.g., consumer.send.buffer.bytes
and producer.send.buffer.bytes
).
- Properties streamsSettings = new Properties();
-// same value for consumer, producer, and admin client
-streamsSettings.put("PARAMETER_NAME", "value");
-// different values for consumer and producer
-streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
-streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
-streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
-// alternatively, you can use
-streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
-streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
-streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");
-
Properties streamsSettings = new Properties();
+// same value for consumer, producer, and admin client
+streamsSettings.put("PARAMETER_NAME", "value");
+// different values for consumer and producer
+streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
+streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
+streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
+// alternatively, you can use
+streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
+streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
+streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");
You could further separate consumer configuration by adding different prefixes:
main.consumer.
for main consumer which is the default consumer of stream source.For example, if you only want to set restore consumer config without touching other consumers' settings, you could simply use restore.consumer.
to set the config.
Properties streamsSettings = new Properties();
-// same config value for all consumer types
-streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
-// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
-// while main consumer and global consumer stay with general-consumer-value
-streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
-// alternatively, you can use
-streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");
-
Properties streamsSettings = new Properties();
+// same config value for all consumer types
+streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
+// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
+// while main consumer and global consumer stay with general-consumer-value
+streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
+// alternatively, you can use
+streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");
Same applied to main.consumer.
and main.consumer.
, if you only want to specify one consumer type config.
Additionally, to configure the internal repartition/changelog topics, you could use the topic.
prefix, followed by any of the standard topic configs.
Properties streamsSettings = new Properties();
-// Override default for both changelog and repartition topics
-streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
-// alternatively, you can use
-streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
-
Properties streamsSettings = new Properties();
+// Override default for both changelog and repartition topics
+streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
+// alternatively, you can use
+streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
-See the description here.
Properties streamsSettings = new Properties();
-streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
-streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
Properties streamsSettings = new Properties();
+streamsSettings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
+streamsSettings.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 2);
+streamsSettings.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
+
SerDes specified in the Streams configuration are used as the default in your Kafka Streams application.
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.StreamsConfig;
-
-Properties settings = new Properties();
-// Default serde for keys of data records (here: built-in serde for String type)
-settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-// Default serde for values of data records (here: built-in serde for Long type)
-settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+
+Properties settings = new Properties();
+// Default serde for keys of data records (here: built-in serde for String type)
+settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+// Default serde for values of data records (here: built-in serde for Long type)
+settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
+ import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
-final Serde<String> stringSerde = Serdes.String();
-final Serde<Long> longSerde = Serdes.Long();
+final Serde<String> stringSerde = Serdes.String();
+final Serde<Long> longSerde = Serdes.Long();
-// The stream userCountByRegion has type `String` for record keys (for region)
-// and type `Long` for record values (for user counts).
-KStream<String, Long> userCountByRegion = ...;
-userCountByRegion.to("RegionCountsTopic", Produced.with(stringSerde, longSerde));
If you want to override serdes selectively, i.e., keep the defaults for some fields, then don’t specify the serde whenever you want to leverage the default settings:
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-
-// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
-// but override the default serializer for record values (here: userCount as Long).
-final Serde<Long> longSerde = Serdes.Long();
-KStream<String, Long> userCountByRegion = ...;
-userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+
+// Use the default serializer for record keys (here: region as String) by not specifying the key serde,
+// but override the default serializer for record values (here: userCount as Long).
+final Serde<Long> longSerde = Serdes.Long();
+KStream<String, Long> userCountByRegion = ...;
+userCountByRegion.to("RegionCountsTopic", Produced.valueSerde(Serdes.Long()));
If some of your incoming records are corrupted or ill-formatted, they will cause the deserializer class to report an error.
Since 1.0.x we have introduced an DeserializationExceptionHandler
interface which allows
you to customize how to handle such records. The customized implementation of the interface can be specified via the StreamsConfig
.
@@ -101,12 +98,11 @@
Apache Kafka includes several built-in serde implementations for Java primitives and basic types such as byte[]
in
its kafka-clients
Maven artifact:
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>{{fullDotVersion}}</version>
-</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>2.8.0</version>
+</dependency>
This artifact provides the following serde implementations under the package org.apache.kafka.common.serialization, which you can leverage when e.g., defining default serializers in your Streams configuration.