Skip to content

Commit

Permalink
Adding easy access to commitable offsets (apache#628)
Browse files Browse the repository at this point in the history
* Adding SQL API to write to kafka from Spark (apache#567)

* Branch 2.4.3 extended kafka and examples (apache#569)

* The v2 API is in its own package

- the v2 api is in a different package
- the old functionality is available in a separated package

* v2 API examples

- All the examples are using the newest API.
- I have removed the old examples since they are not relevant any more and the same functionality is shown in the new examples usin the new API.

* Adding easy access to commitable offsets

* Adding easy access to commitable offsets

Co-authored-by: Nicolas A Perez <anicolaspp@gmail.com>
  • Loading branch information
2 people authored and ekrivokonmapr committed Nov 6, 2023
1 parent 54c0dba commit e35e9f8
Showing 1 changed file with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package org.apache.spark.streaming

import org.apache.spark.internal.config.ConfigBuilder

import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, DirectKafkaInputDStream, HasOffsetRanges}

/**
* Spark Integration for Kafka 0.9
*/
Expand Down Expand Up @@ -72,5 +75,23 @@ package object kafka010 { //scalastyle:ignore
.booleanConf
.createWithDefault(false)

/**
* This extension provides easy access to commit offsets back to MapR-ES or Kafka
*
* @param directKafkaInputDStream We can only call this function on the original stream and not the transformations
* @tparam K
* @tparam V
*/
implicit class CanCommitStreamOffsets[K, V](directKafkaInputDStream: DirectKafkaInputDStream[K, V]) {
def commitOffsetsAsync(): Unit = commitOffsetsAsync(null)

def commitOffsetsAsync(callback: OffsetCommitCallback): Unit = {
directKafkaInputDStream.foreachRDD { rdd =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

directKafkaInputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
}
}
}

0 comments on commit e35e9f8

Please sign in to comment.