Skip to content

Commit

Permalink
Structured streaming job to Hudi and Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
WarFox committed Mar 2, 2024
1 parent a0aec8f commit 28c25be
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 0 deletions.
54 changes: 54 additions & 0 deletions bin/submit-structured-streaming-confluent-kafka-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env bash

mkdir -p /tmp/spark-events

output=console

while getopts "t:o:" arg; do
case $arg in
o) output=$OPTARG;;
t) topic=$OPTARG;;
esac
done

export output=$output
export topic=$topic
export base_path=/tmp/warehouse/spark/$output/

echo "Topic: ${topic}"
echo "Output: ${output}"
echo "Base Path: ${base_path}"

echo "Spark Structured Streaming from Kafka topic $topic to $output"

LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties"
DEBUG="-Dlog4j2.debug=true"

set -eux

/usr/local/bin/envsubst '${output},${base_path}' < src/main/resources/spark.properties > "/tmp/spark.properties"

PACKAGES=(
org.apache.spark:spark-avro_2.12:3.3.1 # org.apache.spark.sql.avro.SchemaConverter
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2

za.co.absa:abris_2.12:6.3.0

# org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.2.0
# org.apache.kafka:kafka-clients:0.11.0.3

)

/usr/local/bin/spark-submit \
--master local[1] \
--num-executors=1 \
--executor-cores=1 \
--repositories https://packages.confluent.io/maven/ \
--packages $(IFS=,; echo "${PACKAGES[*]}") \
--conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
--conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
--properties-file /tmp/spark.properties \
--class "com.github.warfox.sparksandbox.StructuredStreamingKafkaTo${output}" \
--name "StructuredStreamingKafkaTo${output}" \
target/scala-2.12/spark-sandbox-assembly-0.0.1.jar $topic $base_path
50 changes: 50 additions & 0 deletions bin/submit-structured-streaming-kafka-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env bash

mkdir -p /tmp/spark-events

output=console

while getopts "t:o:" arg; do
case $arg in
o) output=$OPTARG;;
t) topic=$OPTARG;;
esac
done

export output=$output
export topic=$topic
export base_path=/tmp/warehouse/spark/$output/

echo "Topic: ${topic}"
echo "Output: ${output}"
echo "Base Path: ${base_path}"

echo "Spark Structured Streaming from Kafka topic $topic to $output"

LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties"
DEBUG="-Dlog4j2.debug=true"

set -eux

envsubst '${output},${base_path}' < src/main/resources/spark.properties > "/tmp/spark.properties"

PACKAGES=(
org.apache.spark:spark-avro_2.12:3.3.1 # org.apache.spark.sql.avro.SchemaConverter
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2

# org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.2.0
# org.apache.kafka:kafka-clients:0.11.0.3
)

/usr/local/bin/spark-submit \
--master local[1] \
--num-executors=1 \
--executor-cores=1 \
--packages $(IFS=,; echo "${PACKAGES[*]}") \
--conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
--conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
--properties-file /tmp/spark.properties \
--class "com.github.warfox.sparksandbox.StructuredStreamingKafkaTo${output}" \
--name "StructuredStreamingKafkaTo${output}" \
target/scala-2.12/spark-sandbox-assembly-0.0.1.jar $topic $base_path
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.github.warfox.sparksandbox

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer

import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecord

import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StringType
import org.apache.spark.streaming._

import sandbox.avro.Pageview

import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.io.Source


// https://iceberg.apache.org/docs/latest/spark-structured-streaming/

object StructuredStreamingKafkaToIceberg extends SparkSessionWrapper {

def readFromKafka(spark: SparkSession, inputTopic: String) = {
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Config.bootstrapServers)
.option("groupIdPrefix", s"spark-streaming-${inputTopic}")
.option("subscribe", inputTopic)
.option("startingOffsets", "earliest") // From starting
.load()
}

def writeToIceberg[T](ds: Dataset[T], basePath:String, catalogTable: String) = {
println("******** Writing to Iceberg ******")
ds.writeStream
.format("iceberg")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
// .option("path", s"${basePath}/${catalogTable}")
.option("path", s"${catalogTable}")
.option("checkpointLocation", Config.checkpointLocation)
.outputMode("append")
.start()
}

def main(args: Array[String]): Unit = {

import spark.implicits._

val inputTopic::basePath::rest = args.toList

println("----------------------------------------------------------------------------")
println(s"inputTopic=${inputTopic} basePath=${basePath} rest=${rest}")
println("-----------------------------------------------------------------------------")

val ssc = new StreamingContext(sc, Seconds(1))

val df = readFromKafka(spark, inputTopic)

println("df schema")
df.printSchema()

println(s"df isStreaming=${df.isStreaming}.") // Returns True for DataFrames that have streaming sources

// val value = getValue(df)
// .select($"partition", $"offset", deserialize_avro($"value").as("data"))

val recordRDD: Dataset[KafkaRecord[Array[Byte], Array[Byte]]] = df.as[KafkaRecord[Array[Byte], Array[Byte]]]
// the above conversion did not work with avro data for iceberg
// Problems:
// * key: binary cannot be promoted to string
// * value: binary cannot be promoted to string

val castToString = recordRDD
.withColumn("key", col("key").cast(StringType))
.withColumn("value", col("value").cast(StringType))
// The above conversion worked, but the avro data in value is not looking good

// schema from class generated using "sbt avroGenerate"
val jsonFormatSchema = new Pageview().getSchema().toString
println(s"Schema is ${jsonFormatSchema}")

// we read the default dynamic frame
val data = df
.withColumn("key", col("key").cast(StringType))
.withColumn("value", from_avro(col("value"), jsonFormatSchema))
.withColumn("value", to_json(col("value")))

val tableName = inputTopic.replaceAll("-", "_")
createIcebergTable(spark, "kafka")
val query = writeToIceberg(data, basePath, s"local.db.kafka")

query.awaitTermination()
}

def createIcebergTable(spark: SparkSession, tableName: String) = {
val ddl = Source.fromResource(s"${tableName}-iceberg.ddl").mkString
spark.sql(ddl)
}

def dropIcebergTable(spark: SparkSession, tableName: String) = {
val sql = Source.fromResource(s"${tableName}-iceberg-drop.sql").mkString
spark.sql(sql)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.github.warfox.sparksandbox

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer

import org.apache.avro.Schema
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericRecord

import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig._

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StringType
import org.apache.spark.streaming._

import sandbox.avro.Pageview

import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.io.Source

// https://hudi.apache.org/docs/latest/quick-start-guide

object StructuredStreamingKafkaToHudi extends StreamingSessionWrapper {

private val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString

def readFromKafka(spark: SparkSession, inputTopic: String) = {
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Config.bootstrapServers)
.option("groupIdPrefix", s"spark-streaming-${inputTopic}")
.option("subscribe", inputTopic)
.option("startingOffsets", "earliest") // From starting
.load()
}


def writeToHudi[T](ds: Dataset[T], basePath: String, tableName: String) = {
println("******** Writing to Hudi ********")
ds.writeStream
.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(RECORDKEY_FIELD_OPT_KEY, "timestamp")
// .option(PARTITIONPATH_FIELD_OPT_KEY, "")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option("hoodie.merge.allow.duplicate.on.inserts",true)
.option("path", s"${basePath}/${tableName}")
.option("checkpointLocation", checkpointLocation)
.outputMode("append")
.start()
}


def main(args: Array[String]): Unit = {
import spark.implicits._

val inputTopic::basePath::rest = args.toList

println("----------------------------------------------------------------------------")
println(s"inputTopic=${inputTopic} basePath=${basePath} rest=${rest}")
println("-----------------------------------------------------------------------------")

val df = readFromKafka(spark, inputTopic)

println("df schema")
df.printSchema()

println(s"df isStreaming=${df.isStreaming}.") // Returns True for DataFrames that have streaming sources

// schema from class generated using "sbt avroGenerate"
val jsonFormatSchema = new Pageview().getSchema().toString
println(s"Schema is ${jsonFormatSchema}")

val tableName = inputTopic.replaceAll("-", "_")
// we read the default dynamic frame
val data = df
.withColumn("key", col("key").cast(StringType))
.withColumn("value", from_avro(col("value"), jsonFormatSchema))
.withColumn("value", to_json(col("value")))

val query = writeToHudi(data, basePath, tableName)
query.awaitTermination()
}

}
20 changes: 20 additions & 0 deletions src/main/scala/com/github/warfox/sparksandbox/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.github.warfox

// This class is generated from Pageview avro schema, using sbt-avro
import sandbox.avro.Pageview

package object sparksandbox {

// val recordRDD: Dataset[KafkaRecord] = df.as[KafkaRecord]
// the above conversion did not work with avro data for iceberg
// Problems:
// * key: binary cannot be promoted to string
// * value: binary cannot be promoted to string

case class Pageviews(pageview: Pageview)
case class Pagecount(pageid: String, count: Long)
case class Output(
key: String,
value: String
) // value needs to binary or string
}

0 comments on commit 28c25be

Please sign in to comment.