From b317d74723ebca5a0987d02847a285d782da1fc4 Mon Sep 17 00:00:00 2001 From: Egor Krivokon <> Date: Wed, 26 Feb 2020 20:33:10 +0200 Subject: [PATCH] Adding easy access to commitable offsets (#628) * Adding SQL API to write to kafka from Spark (#567) * Branch 2.4.3 extended kafka and examples (#569) * The v2 API is in its own package - the v2 api is in a different package - the old functionality is available in a separated package * v2 API examples - All the examples are using the newest API. - I have removed the old examples since they are not relevant any more and the same functionality is shown in the new examples usin the new API. * Adding easy access to commitable offsets * Adding easy access to commitable offsets Co-authored-by: Nicolas A Perez --- .../spark/streaming/kafka010/package.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala index 6deb0f47bb7d2..60f0987431dc8 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/package.scala @@ -19,6 +19,9 @@ package org.apache.spark.streaming import org.apache.spark.internal.config.ConfigBuilder +import org.apache.kafka.clients.consumer.OffsetCommitCallback +import org.apache.spark.streaming.kafka010.{CanCommitOffsets, DirectKafkaInputDStream, HasOffsetRanges} + /** * Spark Integration for Kafka 0.9 */ @@ -72,5 +75,23 @@ package object kafka010 { //scalastyle:ignore .booleanConf .createWithDefault(false) + /** + * This extension provides easy access to commit offsets back to MapR-ES or Kafka + * + * @param directKafkaInputDStream We can only call this function on the original stream and not the transformations + * @tparam K + * @tparam V + */ + implicit class CanCommitStreamOffsets[K, V](directKafkaInputDStream: DirectKafkaInputDStream[K, V]) { + def commitOffsetsAsync(): Unit = commitOffsetsAsync(null) + + def commitOffsetsAsync(callback: OffsetCommitCallback): Unit = { + directKafkaInputDStream.foreachRDD { rdd => + val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + + directKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) + } + } + } }