diff --git a/README.md b/README.md index 7569c4b..f476700 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,53 @@ Sandbox project for playing around with Spark PRs welcome! +TODO: Transactional topic with no compaction - consecutive offsets +TODO: Transactional topic with compaction - non consecutive offsets + +## Build + +## Kafka Avro data + +Structured Streaming in Confluent Platform provides =from_avro= function that +accepts Confluent Schema registry url as a parameter and handles Confluent Avro +format (wire format) automatically + +https://docs.databricks.com/structured-streaming/avro-dataframe.html + +## Submit + +``` +spark-submit --master localhost[*] + --jars target + --class \ + \ + [application-arguments] +``` + +```sh +sbt package && \ +spark-submit --packages \ +org.apache.spark:spark-sql_2.12:3.3.1,\ +org.apache.spark:spark-streaming_2.12:3.3.1,\ +org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 \ +--class com.github.warfox.sparksandbox.StructuredStreamingKafkaExample \ +--name StructuredStreamingKafkaExample target/scala-2.12/spark-sandbox_2.12-0.0.1.jar +``` + +```sh +sbt package and +(spark-submit --packages org.apache.spark:spark-sql_2.12:3.3.1, +org.apache.spark:spark-streaming_2.12:3.3.1, +org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 +--class com.github.warfox.sparksandbox.StructuredStreamingKafkaExample +--name StructuredStreamingKafkaExample target/scala-2.12/spark-sandbox_2.12-0.0.1.jar) +``` + +org.apache.hadoop:hadoop-common:3.3.4,\ +org.apache.hadoop:hadoop-aws:3.3.4, +com.fasterxml.jackson.core:jackson-databind:2.12.7 \ +--conf spark.jars.ivySettings=./ivysettings.xml \ + ## References ## - https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-libraries.html @@ -13,3 +60,15 @@ PRs welcome! - https://bzhangusc.wordpress.com/2015/11/20/use-sbt-console-as-spark-shell/ - https://github.com/MrPowers/spark-examples/blob/master/src/main/scala/com/github/mrpowers/spark/examples/SparkSummit.scala - https://sparkbyexamples.com/spark/different-ways-to-create-a-spark-dataframe/ + +export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -Xmx2G -Xms1G" + + +* How to set the batch size? Time? + +* Using spark with Schema Registry +https://github.com/xebia-france/spark-structured-streaming-blog/blob/master/src/main/scala/AvroConsumer.scala + +https://datachef.co/blog/deserialzing-confluent-avro-record-kafka-spark/ + +https://www.dremio.com/blog/streaming-data-into-apache-iceberg-tables-using-aws-kinesis-and-aws-glue/ diff --git a/bin/kcat.sh b/bin/kcat.sh new file mode 100755 index 0000000..6debd5b --- /dev/null +++ b/bin/kcat.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash + +if ! kcat -v &> /dev/null +then + echo "kcat could not be found. " + brew install kcat +fi + +set -ex + +# example producer and consumer +echo "one:one" | kcat -P -b localhost -t test -K: +echo "two:two" | kcat -P -b localhost -t test -K: +echo "three:three" | kcat -P -b localhost -t test -K: + +# # tombstone needs -Z +echo "three:" | kcat -P -b localhost -t test -Z -K: +echo "four:four" | kcat -P -b localhost -t test -K: +echo "three:3" | kcat -P -b localhost -t test -K: + +## example producer and consumer + +kcat -C -b localhost -t test -Z -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n' + +# show key, value, timestamp and partition +kcat -C -b localhost -t pageviews -Z -s value=avro -r http://localhost:8081 -f 'Key: %k,Value: %s,Timestamp: %T,Partition: %p,Offset: %o\n' + + +# show key, value, timestamp, partition and offset with bytes +kcat -C -b localhost -t pageviews -Z -s value=avro -r http://localhost:8081 -f 'Key (%K bytes): %k,Value (%S bytes): %s,Timestamp: %T,Partition: %p,Offset: %o\n' diff --git a/bin/submit-batch-kafka-to-hudi-job.sh b/bin/submit-batch-kafka-to-hudi-job.sh new file mode 100755 index 0000000..5b9055a --- /dev/null +++ b/bin/submit-batch-kafka-to-hudi-job.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +mkdir -p /tmp/spark-events + +output=console + +usage() { # Function: Print a help message. + echo "Usage: $0 [ -t topic ] [ -o console/hudi ]" 1>&2 +} +exit_abnormal() { # Function: Exit with error. + usage + exit 1 +} + +while getopts ":t:o:" arg; do + case $arg in + o) output=$OPTARG + ;; + t) topic=$OPTARG + ;; + *) exit 1 + ;; + esac +done + +echo "Topic is ${topic}, Output to ${output}" + +if [ -z "$topic" ] || [ -z "$output" ]; then + exit_abnormal +fi + +base_path=/tmp/warehouse/spark-batch/ + +echo "Spark Batch Kafka To Hudi for $topic" + +LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties" +DEBUG="-Dlog4j2.debug=true" + +set -ex + +PACKAGES=( + org.apache.spark:spark-avro_2.12:3.3.1 # org.apache.spark.sql.avro.SchemaConverter + org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 + org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2 + # org.apache.kafka:kafka-clients:0.11.0.3 +) + +IFS="," + +spark-submit \ + --master local[1] \ + --num-executors=1 \ + --executor-cores=1 \ + --packages "${PACKAGES[*]}" \ + --conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \ + --conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \ + --properties-file ~/Sandbox/spark-sandbox/src/main/resources/spark.properties \ + --class com.github.warfox.sparksandbox.BatchKafkaToHudi \ + --name BatchKafkaToHudi \ + target/scala-2.12/spark-sandbox-assembly-0.0.1.jar $topic $base_path $output diff --git a/bin/submit-hoodie-deltastreamer-job.sh b/bin/submit-hoodie-deltastreamer-job.sh new file mode 100755 index 0000000..52ebdb5 --- /dev/null +++ b/bin/submit-hoodie-deltastreamer-job.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +# This script is used to submit a hoodie deltastreamer job to spark +# It will download the hudi-utils jar if it is not found in the current directory + +hudi_utils_version=0.12.1 + +hudi_utils_jar=hudi-utilities-bundle_2.12-${hudi_utils_version}.jar + +hudi_utils_jar_url=https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.12/${hudi_utils_version}/${hudi_utils_jar} + +if [ -e ${hudi_utils_jar} ] +then + echo "hudi-utilities found in current directory" +else + echo "hudi-utilities not found in current directory, downloading from ${hudi_utils_jar_url}" + wget ${hudi_utils_jar_url} +fi + +set -ex + +export topic=$1 +table_name=$(echo $topic | sed "s/-/_/g") + +echo "topic is $topic, table_name is $table_name" + +target_base_path=/tmp/warehouse/hudi/$table_name +target_table=$table_name + +mkdir -p /tmp/spark-events + +envsubst '${topic}' < src/main/resources/hoodie-conf.properties > "/tmp/hoodie-conf-${topic}.properties" + +PACKAGES=( + org.apache.kafka:kafka-clients:3.4.0 + # io.confluent:kafka-schema-registry-client:7.3.1 # confluent schema registry client +) + +# --repositories https://packages.confluent.io/maven \ +# --packages "${PACKAGES[*]}" \ + + +LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties" +DEBUG="-Dlog4j2.debug=true" + +spark-submit \ + --master local[*] \ + --num-executors=1 \ + --executor-cores=1 \ + --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ + --conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \ + --conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \ + --properties-file ~/Sandbox/spark-sandbox/src/main/resources/spark.properties \ + ~/Sandbox/spark-sandbox/${hudi_utils_jar} \ + --op INSERT \ + --props /tmp/hoodie-conf-${topic}.properties \ + --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \ + --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \ + --source-ordering-field viewtime \ + --table-type COPY_ON_WRITE \ + --target-base-path file://${target_base_path} \ + --target-table $target_table + +# Other options +# --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider +# --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ diff --git a/bin/submit-structured-streaming-confluent-kafka-job.sh b/bin/submit-structured-streaming-confluent-kafka-job.sh new file mode 100755 index 0000000..b0e0895 --- /dev/null +++ b/bin/submit-structured-streaming-confluent-kafka-job.sh @@ -0,0 +1,54 @@ +#!/usr/bin/env bash + +mkdir -p /tmp/spark-events + +output=console + +while getopts "t:o:" arg; do + case $arg in + o) output=$OPTARG;; + t) topic=$OPTARG;; + esac +done + +export output=$output +export topic=$topic +export base_path=/tmp/warehouse/spark/$output/ + +echo "Topic: ${topic}" +echo "Output: ${output}" +echo "Base Path: ${base_path}" + +echo "Spark Structured Streaming from Kafka topic $topic to $output" + +LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties" +DEBUG="-Dlog4j2.debug=true" + +set -eux + +/usr/local/bin/envsubst '${output},${base_path}' < src/main/resources/spark.properties > "/tmp/spark.properties" + +PACKAGES=( + org.apache.spark:spark-avro_2.12:3.3.1 # org.apache.spark.sql.avro.SchemaConverter + org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 + org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2 + + za.co.absa:abris_2.12:6.3.0 + + # org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.2.0 + # org.apache.kafka:kafka-clients:0.11.0.3 + +) + +/usr/local/bin/spark-submit \ + --master local[1] \ + --num-executors=1 \ + --executor-cores=1 \ + --repositories https://packages.confluent.io/maven/ \ + --packages $(IFS=,; echo "${PACKAGES[*]}") \ + --conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \ + --conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \ + --properties-file /tmp/spark.properties \ + --class "com.github.warfox.sparksandbox.StructuredStreamingKafkaTo${output}" \ + --name "StructuredStreamingKafkaTo${output}" \ + target/scala-2.12/spark-sandbox-assembly-0.0.1.jar $topic $base_path diff --git a/bin/submit-structured-streaming-kafka-job.sh b/bin/submit-structured-streaming-kafka-job.sh new file mode 100755 index 0000000..67b0981 --- /dev/null +++ b/bin/submit-structured-streaming-kafka-job.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +mkdir -p /tmp/spark-events + +output=console + +while getopts "t:o:" arg; do + case $arg in + o) output=$OPTARG;; + t) topic=$OPTARG;; + esac +done + +export output=$output +export topic=$topic +export base_path=/tmp/warehouse/spark/$output/ + +echo "Topic: ${topic}" +echo "Output: ${output}" +echo "Base Path: ${base_path}" + +echo "Spark Structured Streaming from Kafka topic $topic to $output" + +LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties" +DEBUG="-Dlog4j2.debug=true" + +set -eux + +envsubst '${output},${base_path}' < src/main/resources/spark.properties > "/tmp/spark.properties" + +PACKAGES=( + org.apache.spark:spark-avro_2.12:3.3.1 # org.apache.spark.sql.avro.SchemaConverter + org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 + org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2 + + # org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.2.0 + # org.apache.kafka:kafka-clients:0.11.0.3 +) + +/usr/local/bin/spark-submit \ + --master local[1] \ + --num-executors=1 \ + --executor-cores=1 \ + --packages $(IFS=,; echo "${PACKAGES[*]}") \ + --conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \ + --conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \ + --properties-file /tmp/spark.properties \ + --class "com.github.warfox.sparksandbox.StructuredStreamingKafkaTo${output}" \ + --name "StructuredStreamingKafkaTo${output}" \ + target/scala-2.12/spark-sandbox-assembly-0.0.1.jar $topic $base_path diff --git a/build.sbt b/build.sbt index bc9f2ce..7d17172 100644 --- a/build.sbt +++ b/build.sbt @@ -2,16 +2,40 @@ name := "spark-sandbox" version := "0.0.1" -scalaVersion := "2.13.6" +scalaVersion := "2.12.15" val sparkVersion = "3.3.1" val hadoopVersion = "3.3.4" +val confluentVersion = "7.3.1" + +resolvers += "Confluent" at "https://packages.confluent.io/maven/" libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-sql" % sparkVersion, "provided", - "org.apache.spark" %% "spark-streaming" % sparkVersion, "provided", - "org.apache.hadoop" % "hadoop-aws" % hadoopVersion, - "org.apache.hadoop" % "hadoop-common" % hadoopVersion, + /** Provided Section **/ + + // Spark library with same version MUST be available in the cluster that the jobs run + "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", + "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", + "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion % "provided", + "org.apache.spark" %% "spark-avro" % sparkVersion % "provided" , + + "org.apache.hudi" %% "hudi-spark3.3-bundle" % "0.12.1" % "provided", + + // Hadoop libraries with same version MUST be available in the cluster that the jobs run + "org.apache.hadoop" % "hadoop-aws" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided", + + /** End of Provided Section - libraries in provided section is not included in assembly jar **/ + + // thrird party library for using Confluent Schema Registry with Spark + "za.co.absa" % "abris_2.12" % "6.3.0", + "io.confluent" % "kafka-schema-registry-client" % confluentVersion excludeAll( + ExclusionRule(organization = "com.fasterxml.jackson.module", name = "jackson-module-scala") + ), + "io.confluent" % "kafka-avro-serializer" % confluentVersion excludeAll( + ExclusionRule(organization = "com.fasterxml.jackson.module", name = "jackson-module-scala") + ), + "com.github.mrpowers" %% "spark-daria" % "1.2.3", // jackson-module-scala is required for jackson-databind @@ -29,29 +53,37 @@ libraryDependencies ++= Seq( Test / fork := true javaOptions ++= Seq( - "-Xms512M", - "-Xmx2048M", - "-XX:MaxPermSize=2048M", + "-Xms1G", + "-Xmx2G", "-XX:+CMSClassUnloadingEnabled" ) +// The above is equilavent as export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -Xmx2G -Xms1GG + // Show runtime of tests Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oD") // JAR file settings +ThisBuild / assemblyShadeRules := Seq( + ShadeRule.rename(("com.fasterxml.jackson.**") -> "shadejackson.@1").inAll +) + // don't include Scala in the JAR file -// assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) +// assembly / assemblyOption in := (assembly / assemblyOption).value.copy(includeScala = false) +// ThisBuild / assemblyOption := assembly / assemblyOption ~= { _.withIncludeScala(false) } +// ThisBuild / assemblyPackageScala := false // Add the JAR file naming conventions described here: https://github.com/MrPowers/spark-style-guide#jar-files // You can add the JAR file naming conventions by running the shell script +// When you run sbt console - spark, sc and sqlContext will be ready for you! console / initialCommands := s""" import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .master("local[*]") .appName("shell") - .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") + .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") // only needed for dealing with public S3 buckets .getOrCreate() // use default provider credential chain "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") diff --git a/project/plugins.sbt b/project/plugins.sbt index 7faa6f3..5a08462 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,9 @@ logLevel := Level.Warn addDependencyTreePlugin + +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.0") + +addSbtPlugin("com.github.sbt" % "sbt-avro" % "3.4.2") + +libraryDependencies += "org.apache.avro" % "avro-compiler" % "1.11.1" diff --git a/src/main/avro/pageview.avsc b/src/main/avro/pageview.avsc new file mode 100644 index 0000000..15a3903 --- /dev/null +++ b/src/main/avro/pageview.avsc @@ -0,0 +1,38 @@ +{ + "namespace": "sandbox.avro", + "name": "Pageview", + "type": "record", + "fields": [ + { + "name": "viewtime", + "type": { + "type": "long", + "format_as_time": "unix_long", + "arg.properties": { + "iteration": { + "start": 1, + "step": 10 + } + } + } + }, + { + "name": "userid", + "type": { + "type": "string", + "arg.properties": { + "regex": "User_[1-9]" + } + } + }, + { + "name": "pageid", + "type": { + "type": "string", + "arg.properties": { + "regex": "Page_[1-9][0-9]" + } + } + } + ] +} diff --git a/src/main/resources/hoodie-conf.properties b/src/main/resources/hoodie-conf.properties new file mode 100644 index 0000000..dc140f4 --- /dev/null +++ b/src/main/resources/hoodie-conf.properties @@ -0,0 +1,57 @@ +# All --hoodie-conf properties can go here + +# We allow duplicates on inserts! +hoodie.merge.allow.duplicate.on.inserts=true + +# Key fields, for kafka example +hoodie.datasource.write.recordkey.field=viewtime +hoodie.datasource.write.partitionpath.field=pageid + +# Schema provider props (change to absolute path based on your installation) +# hoodie.deltastreamer.schemaprovider.source.schema.file=/~/Sandbox/spark-sandbox/src/main/resources/pageviews.avsc +# hoodie.deltastreamer.schemaprovider.target.schema.file=/~/Sandbox/spark-sandbox/src/main/resources/pageviews.avsc +# Kafka Source +hoodie.deltastreamer.source.kafka.topic=${topic} +# hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer +hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/${topic}-value/versions/latest + +hoodie.deltastreamer.source.kafka.append.offsets=true + +# Kafka Consumer props +bootstrap.servers=localhost:9092 +auto.offset.reset=earliest +schema.registry.url=http://localhost:8081 +# Consumer Group +group.id=hudi-deltastreamer-${topic} + + +isolation.level=read_committed + +# We don't want Consumer to periodically commit offsets because Spark Streaming/Hudi may fail to write the successfully polled records from Kafka +enable.auto.commit=false +# max.poll.records=8 + +# hoodie.merge.allow.duplicate.on.inserts=true +# hoodie.datasource.write.recordkey.field=command_id +# hoodie.datasource.write.precombine.field=requested_at +# hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator +# hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor +# hoodie.datasource.write.partitionpath.field='' +# hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer +# hoodie.datasource.hive_sync.enable=true +# hoodie.datasource.hive_sync.database=kirby_example_raw +# hoodie.datasource.hive_sync.table=make_credit_decision_1 +# hoodie.datasource.hive_sync.partition_fields='' +# hoodie.deltastreamer.source.kafka.topic=make-credit-decision-1 + +# hoodie.database.name=kirby_example_raw +# hoodie.table.name=make_credit_decision_1 + +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor +# hoodie.datasource.write.partitionpath.field='', +# hoodie.datasource.hive_sync.partition_fields='', + +# Spark Configurations Does not Work in this file +# spark.streaming.kafka.allowNonConsecutiveOffsets=true +# should set these via --properties-file for Spark, not as --props for hudi diff --git a/src/main/resources/kafka-iceberg-drop.sql b/src/main/resources/kafka-iceberg-drop.sql new file mode 100644 index 0000000..cc4a5f4 --- /dev/null +++ b/src/main/resources/kafka-iceberg-drop.sql @@ -0,0 +1 @@ +DROP TABLE local.db.kafka; diff --git a/src/main/resources/kafka-iceberg.ddl b/src/main/resources/kafka-iceberg.ddl new file mode 100644 index 0000000..ad6c0de --- /dev/null +++ b/src/main/resources/kafka-iceberg.ddl @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXIST local.db.kafka ( + key string, + value string, + topic string, + partition int, + offset bigint, + timestamp timestamp, + timestampType int) +USING iceberg +PARTITIONED BY (partition); diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties new file mode 100644 index 0000000..9df73d4 --- /dev/null +++ b/src/main/resources/log4j2.properties @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the console +rootLogger.level = INFO +rootLogger.appenderRef.stdout.ref = console + +# In the pattern layout configuration below, we specify an explicit `%ex` conversion +# pattern for logging Throwables. If this was omitted, then (by default) Log4J would +# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional +# class packaging information. That extra information can sometimes add a substantial +# performance overhead, so we disable it in our default logging config. +# For more information, see SPARK-39361. +appender.console.type = Console +appender.console.name = console +appender.console.target = SYSTEM_ERR +appender.console.layout.type = PatternLayout +# appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex +appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c: %m%n%ex + + +# Set the default spark-shell/spark-sql log level to WARN. When running the +# spark-shell/spark-sql, the log level for these classes is used to overwrite +# the root logger's log level, so that the user can have different defaults +# for the shell and regular Spark apps. +logger.repl.name = org.apache.spark.repl.Main +logger.repl.level = warn + +logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver +logger.thriftserver.level = warn + +# Settings to quiet third party logs that are too verbose +logger.jetty1.name = org.sparkproject.jetty +logger.jetty1.level = warn +logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle +logger.jetty2.level = error +logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper +logger.replexprTyper.level = info +logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter +logger.replSparkILoopInterpreter.level = info +logger.parquet1.name = org.apache.parquet +logger.parquet1.level = error +logger.parquet2.name = parquet +logger.parquet2.level = error + +# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support +logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler +logger.RetryingHMSHandler.level = fatal +logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry +logger.FunctionRegistry.level = error + +# For deploying Spark ThriftServer +# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805 +appender.console.filter.1.type = RegexFilter +appender.console.filter.1.regex = .*Thrift error occurred during processing of message.* +appender.console.filter.1.onMatch = deny +appender.console.filter.1.onMismatch = neutral + +# Settings to quiet Kafka consumer logs that is too verbose +logger.kafkaConsumer.name = org.apache.kafka.clients.consumer +logger.kafkaConsumer.level = warn + +# org.apache.spark.scheduler +logger.SparkScheduler.name = org.apache.spark.scheduler +logger.SparkScheduler.level = warn + +logger.SparkExecutor.name = org.apache.spark.executor +logger.SparkExecutor.level = warn + +logger.SparkKafkaDataConsumer.name = org.apache.spark.streaming.kafka010.KafkaDataConsumer +logger.SparkKafkaDataConsumer.level = DEBUG + +logger.SparkInternalConsumer.name = org.apache.spark.streaming.kafka010.InternalKafkaConsumer +logger.SparkInternalConsumer.level = DEBUG + +logger.KafkaOffsetGen.name = org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen +logger.KafkaOffsetGen.level = DEBUG diff --git a/src/main/resources/pageviews-iceberg.ddl b/src/main/resources/pageviews-iceberg.ddl new file mode 100644 index 0000000..b1afad7 --- /dev/null +++ b/src/main/resources/pageviews-iceberg.ddl @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXIST local.db.pageviews ( + viewtime bigint, + userid string, + pageid string, + ts timestamp) +USING iceberg +PARTITIONED BY (pageid); diff --git a/src/main/resources/spark-iceberg.properties b/src/main/resources/spark-iceberg.properties new file mode 100644 index 0000000..a1d4206 --- /dev/null +++ b/src/main/resources/spark-iceberg.properties @@ -0,0 +1,18 @@ +# spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.2.0\ +spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog +spark.sql.catalog.spark_catalog.type=hive +spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.local.type=hadoop +spark.sql.catalog.local.warehouse=/tmp/warehouse/spark/iceberg/ + + + +# iceberg configs +# spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +# spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog +# spark.sql.catalog.spark_catalog.type=hive +# spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog +# spark.sql.catalog.local.type=hadoop +# spark.sql.catalog.local.warehouse=$base_path +# spark.sql.defaultCatalog=local diff --git a/src/main/resources/spark.properties b/src/main/resources/spark.properties new file mode 100644 index 0000000..099d036 --- /dev/null +++ b/src/main/resources/spark.properties @@ -0,0 +1,47 @@ +spark.streaming.kafka.allowNonConsecutiveOffsets=true +spark.streaming.kafka.consumer.poll.ms=1000 +spark.executor.cores=1 + +# hoodie only support org.apache.spark.serializer.KryoSerializer as spark.serializer +spark.serializer=org.apache.spark.serializer.KryoSerializer + +# In Spark 3.0 and before Spark uses KafkaConsumer for offset fetching which +# could cause infinite wait in the driver. In Spark 3.1 a new configuration option +# added spark.sql.streaming.kafka.useDeprecatedOffsetFetching (default: true) +# which could be set to false allowing Spark to use new offset fetching mechanism +# using AdminClient. + +# First of all the new approach supports Kafka brokers 0.11.0.0+. + +spark.sql.streaming.kafka.useDeprecatedOffsetFetching=false + +spark.parquet.avro.write-old-list-structure=false + + +# # Spark Properties +# spark.driver.extraJavaOptions="-XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_driver.hprof" \ +# spark.executor.extraJavaOptions="-XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/varadarb_ds_executor.hprof" \ +# # --queue hadoop-platform-queue \ +# spark.scheduler.mode=FAIR \ +# spark.yarn.executor.memoryOverhead=1072 \ +# spark.yarn.driver.memoryOverhead=2048 \ +# spark.task.cpus=1 \ +# spark.executor.cores=1 \ +# spark.task.maxFailures=10 \ +# spark.memory.fraction=0.4 \ +# spark.rdd.compress=true \ +# spark.kryoserializer.buffer.max=200m \ +# spark.serializer=org.apache.spark.serializer.KryoSerializer \ +# spark.memory.storageFraction=0.1 \ +# spark.shuffle.service.enabled=true \ +# spark.sql.hive.convertMetastoreParquet=false \ +# spark.ui.port=5555 \ +# spark.driver.maxResultSize=3g \ +# spark.executor.heartbeatInterval=120s \ +# spark.network.timeout=600s \ +# spark.eventLog.overwrite=true \ +# spark.eventLog.enabled=true \ +# spark.eventLog.dir=/tmp/spark-events \ +# spark.yarn.max.executor.failures=10 \ +# spark.sql.catalogImplementation=hive \ +# spark.sql.shuffle.partitions=100 diff --git a/src/main/scala/com/github/warfox/sparksandbox/BatchKafkaToHudi.scala b/src/main/scala/com/github/warfox/sparksandbox/BatchKafkaToHudi.scala new file mode 100644 index 0000000..d5acf58 --- /dev/null +++ b/src/main/scala/com/github/warfox/sparksandbox/BatchKafkaToHudi.scala @@ -0,0 +1,119 @@ +package com.github.warfox.sparksandbox + +import scala.io.Source + + +import org.apache.hudi.QuickstartUtils._ +import scala.collection.JavaConversions._ +import org.apache.spark.sql.SaveMode.Append +import org.apache.hudi.DataSourceReadOptions._ +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.config.HoodieWriteConfig._ +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableConfig + +import org.apache.spark.streaming._ +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.DataFrame +import org.apache.avro.SchemaBuilder + +import io.confluent.kafka.schemaregistry.client.{ + CachedSchemaRegistryClient, + SchemaRegistryClient +} +import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord +import org.apache.spark.sql.avro.SchemaConverters + +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.avro.functions._ +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.hudi.config.HoodieWriteConfig + + +// This class is generated from Pageview avro schema, using sbt-avro +import sandbox.avro.Pageview + +// https://hudi.apache.org/docs/0.12.2/quick-start-guide + +object BatchKafkaToHudi extends SparkSessionWrapper { + + case class Pageviews(pageview: Pageview) + case class Pagecount(pageid: String, count: Long) + case class Output(key: String, value: String) // value needs to binary or string + + def readFromKafka(inputTopic: String) = { + spark.read + .format("kafka") + .option("kafka.bootstrap.servers", Config.bootstrapServers) + .option("groupIdPrefix", s"spark-streaming-${inputTopic}") + .option("subscribe", inputTopic) + .option("startingOffsets", "earliest") // From starting + // .option("hoodie.datasource.write.recordkey.field", "viewtime") + // .option("hoodie.datasource.write.partitionpath.field", "pageid") + .load() + } + + def writeToHudi[T](ds: Dataset[T], basePath: String, tableName: String) = { + ds.write + .format("hudi") + .options(getQuickstartWriteConfigs) + .option(PRECOMBINE_FIELD_OPT_KEY, "timestamp") + .option(RECORDKEY_FIELD_OPT_KEY, "timestamp") + .option(OPERATION.key(), "bulk_insert") // upsert, insert or bulk_insert + // .option(PARTITIONPATH_FIELD_OPT_KEY, "") + .option(HoodieWriteConfig.TABLE_NAME, tableName) + .option("hoodie.merge.allow.duplicate.on.inserts",true) + .option("checkpointLocation", Config.checkpointLocation) + .mode(Append) + .save(s"${basePath}/${tableName}") + } + + def writeToConsole[T](ds: Dataset[T]) = { + ds.writeStream + .format("console") + .outputMode("append") // apppend doesn't work for aggregations + // .outputMode("complete") + .start() + } + + def main(args: Array[String]): Unit = { + + import spark.implicits._ + + val inputTopic::basePath::output::rest = args.toList + + println("--------") + println(s"inputTopic=${inputTopic} basePath=${basePath} rest=${rest} output=${output}") + println("--------") + + println(getQuickstartWriteConfigs()) + + val ssc = new StreamingContext(sc, Seconds(1)) + + val df = readFromKafka(inputTopic) + + println("df schema") + df.printSchema() + + println(s"df isStreaming=${df.isStreaming}.") // Returns True for DataFrames that have streaming sources + + // val value = getValue(df) + // .select($"partition", $"offset", deserialize_avro($"value").as("data")) + + val recordRDD: Dataset[KafkaRecord[String, String]] = df.as[KafkaRecord[String, String]] + + val data = recordRDD + + + val tableName = inputTopic.replaceAll("-", "_") + val query = output match { + case "console" => writeToConsole(data) + case _ => writeToHudi(data, basePath, tableName) + } + } + +} + +// checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)) diff --git a/src/main/scala/com/github/warfox/sparksandbox/SparkSessionWrapper.scala b/src/main/scala/com/github/warfox/sparksandbox/SparkSessionWrapper.scala index 20d16e1..097b536 100644 --- a/src/main/scala/com/github/warfox/sparksandbox/SparkSessionWrapper.scala +++ b/src/main/scala/com/github/warfox/sparksandbox/SparkSessionWrapper.scala @@ -1,7 +1,8 @@ package com.github.warfox.sparksandbox -import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.{Seconds, StreamingContext} trait SparkSessionWrapper extends Serializable { @@ -11,4 +12,9 @@ trait SparkSessionWrapper extends Serializable { .getOrCreate() lazy val sc: SparkContext = spark.sparkContext + +} + +trait StreamingSessionWrapper extends SparkSessionWrapper { + lazy val ssc = new StreamingContext(sc, Seconds(1)) } diff --git a/src/main/scala/com/github/warfox/sparksandbox/StructureStreamingKafkaToIceberg.scala b/src/main/scala/com/github/warfox/sparksandbox/StructureStreamingKafkaToIceberg.scala new file mode 100644 index 0000000..e18d36d --- /dev/null +++ b/src/main/scala/com/github/warfox/sparksandbox/StructureStreamingKafkaToIceberg.scala @@ -0,0 +1,125 @@ +package com.github.warfox.sparksandbox + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecord + +import org.apache.hudi.DataSourceReadOptions._ +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.QuickstartUtils._ +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.HoodieWriteConfig._ + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.SaveMode._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.avro.functions._ +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.sql.types.StringType +import org.apache.spark.streaming._ + +import sandbox.avro.Pageview + +import java.util.UUID +import java.util.concurrent.TimeUnit +import scala.collection.JavaConversions._ +import scala.io.Source + + +// https://iceberg.apache.org/docs/latest/spark-structured-streaming/ + +object StructuredStreamingKafkaToIceberg extends SparkSessionWrapper { + + def readFromKafka(spark: SparkSession, inputTopic: String) = { + spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", Config.bootstrapServers) + .option("groupIdPrefix", s"spark-streaming-${inputTopic}") + .option("subscribe", inputTopic) + .option("startingOffsets", "earliest") // From starting + .load() + } + + def writeToIceberg[T](ds: Dataset[T], basePath:String, catalogTable: String) = { + println("******** Writing to Iceberg ******") + ds.writeStream + .format("iceberg") + .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) + // .option("path", s"${basePath}/${catalogTable}") + .option("path", s"${catalogTable}") + .option("checkpointLocation", Config.checkpointLocation) + .outputMode("append") + .start() + } + + def main(args: Array[String]): Unit = { + + import spark.implicits._ + + val inputTopic::basePath::rest = args.toList + + println("----------------------------------------------------------------------------") + println(s"inputTopic=${inputTopic} basePath=${basePath} rest=${rest}") + println("-----------------------------------------------------------------------------") + + val ssc = new StreamingContext(sc, Seconds(1)) + + val df = readFromKafka(spark, inputTopic) + + println("df schema") + df.printSchema() + + println(s"df isStreaming=${df.isStreaming}.") // Returns True for DataFrames that have streaming sources + + // val value = getValue(df) + // .select($"partition", $"offset", deserialize_avro($"value").as("data")) + + val recordRDD: Dataset[KafkaRecord[Array[Byte], Array[Byte]]] = df.as[KafkaRecord[Array[Byte], Array[Byte]]] + // the above conversion did not work with avro data for iceberg + // Problems: + // * key: binary cannot be promoted to string + // * value: binary cannot be promoted to string + + val castToString = recordRDD + .withColumn("key", col("key").cast(StringType)) + .withColumn("value", col("value").cast(StringType)) + // The above conversion worked, but the avro data in value is not looking good + + // schema from class generated using "sbt avroGenerate" + val jsonFormatSchema = new Pageview().getSchema().toString + println(s"Schema is ${jsonFormatSchema}") + + // we read the default dynamic frame + val data = df + .withColumn("key", col("key").cast(StringType)) + .withColumn("value", from_avro(col("value"), jsonFormatSchema)) + .withColumn("value", to_json(col("value"))) + + val tableName = inputTopic.replaceAll("-", "_") + createIcebergTable(spark, "kafka") + val query = writeToIceberg(data, basePath, s"local.db.kafka") + + query.awaitTermination() + } + + def createIcebergTable(spark: SparkSession, tableName: String) = { + val ddl = Source.fromResource(s"${tableName}-iceberg.ddl").mkString + spark.sql(ddl) + } + + def dropIcebergTable(spark: SparkSession, tableName: String) = { + val sql = Source.fromResource(s"${tableName}-iceberg-drop.sql").mkString + spark.sql(sql) + } + +} diff --git a/src/main/scala/com/github/warfox/sparksandbox/StructuredStreamingKafkaToHudi.scala b/src/main/scala/com/github/warfox/sparksandbox/StructuredStreamingKafkaToHudi.scala new file mode 100644 index 0000000..7fdcfe4 --- /dev/null +++ b/src/main/scala/com/github/warfox/sparksandbox/StructuredStreamingKafkaToHudi.scala @@ -0,0 +1,103 @@ +package com.github.warfox.sparksandbox + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer + +import org.apache.avro.Schema +import org.apache.avro.SchemaBuilder +import org.apache.avro.generic.GenericRecord + +import org.apache.hudi.DataSourceReadOptions._ +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.QuickstartUtils._ +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.HoodieWriteConfig._ + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.SaveMode._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.avro.functions._ +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.sql.types.StringType +import org.apache.spark.streaming._ + +import sandbox.avro.Pageview + +import java.util.UUID +import java.util.concurrent.TimeUnit +import scala.collection.JavaConversions._ +import scala.io.Source + +// https://hudi.apache.org/docs/latest/quick-start-guide + +object StructuredStreamingKafkaToHudi extends StreamingSessionWrapper { + + private val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString + + def readFromKafka(spark: SparkSession, inputTopic: String) = { + spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", Config.bootstrapServers) + .option("groupIdPrefix", s"spark-streaming-${inputTopic}") + .option("subscribe", inputTopic) + .option("startingOffsets", "earliest") // From starting + .load() + } + + + def writeToHudi[T](ds: Dataset[T], basePath: String, tableName: String) = { + println("******** Writing to Hudi ********") + ds.writeStream + .format("hudi") + .options(getQuickstartWriteConfigs) + .option(PRECOMBINE_FIELD_OPT_KEY, "timestamp") + .option(RECORDKEY_FIELD_OPT_KEY, "timestamp") + // .option(PARTITIONPATH_FIELD_OPT_KEY, "") + .option(HoodieWriteConfig.TABLE_NAME, tableName) + .option("hoodie.merge.allow.duplicate.on.inserts",true) + .option("path", s"${basePath}/${tableName}") + .option("checkpointLocation", checkpointLocation) + .outputMode("append") + .start() + } + + + def main(args: Array[String]): Unit = { + import spark.implicits._ + + val inputTopic::basePath::rest = args.toList + + println("----------------------------------------------------------------------------") + println(s"inputTopic=${inputTopic} basePath=${basePath} rest=${rest}") + println("-----------------------------------------------------------------------------") + + val df = readFromKafka(spark, inputTopic) + + println("df schema") + df.printSchema() + + println(s"df isStreaming=${df.isStreaming}.") // Returns True for DataFrames that have streaming sources + + // schema from class generated using "sbt avroGenerate" + val jsonFormatSchema = new Pageview().getSchema().toString + println(s"Schema is ${jsonFormatSchema}") + + val tableName = inputTopic.replaceAll("-", "_") + // we read the default dynamic frame + val data = df + .withColumn("key", col("key").cast(StringType)) + .withColumn("value", from_avro(col("value"), jsonFormatSchema)) + .withColumn("value", to_json(col("value"))) + + val query = writeToHudi(data, basePath, tableName) + query.awaitTermination() + } + +} diff --git a/src/main/scala/com/github/warfox/sparksandbox/functions.scala b/src/main/scala/com/github/warfox/sparksandbox/functions.scala index 07e448b..763aca2 100644 --- a/src/main/scala/com/github/warfox/sparksandbox/functions.scala +++ b/src/main/scala/com/github/warfox/sparksandbox/functions.scala @@ -3,11 +3,82 @@ package com.github.warfox.sparksandbox import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ +import org.apache.spark.sql.expressions.UserDefinedFunction + +import org.apache.avro.specific.SpecificDatumReader +import java.nio.ByteBuffer + +import io.confluent.kafka.schemaregistry.client.{ + CachedSchemaRegistryClient, + SchemaRegistryClient +} +import org.apache.kafka.common.errors.SerializationException +import org.apache.avro.io.DecoderFactory + +import sandbox.avro.Pageview +import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer +import org.apache.avro.generic.GenericRecord +import org.apache.avro.generic.GenericDatumReader + +class AvroDeserializer extends AbstractKafkaAvroDeserializer { + def this(client: SchemaRegistryClient) { + this() + this.schemaRegistry = client + } + override def deserialize(bytes: Array[Byte]): String = { + val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord] + if (genericRecord == null) { + "" + } else { + genericRecord.toString + } + } +} + object functions { def isEven(col: Column): Column = { col % 2 === lit(0) } -} + // https://datachef.co/blog/deserialzing-confluent-avro-record-kafka-spark// + def deserializeFromConfluentAvro(bytes: Array[Byte]): Pageview = { + val schemaRegistryUrl = Config.schemaRegistryUrl; + val schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 128) + val buffer: ByteBuffer = ByteBuffer.wrap(bytes) + + // The first byte is magic byte + if (buffer.get != 0) + throw new SerializationException( + "Unknown magic byte!. Expected 0 for Confluent bytes" + ) + + // The next 2 bytes are schema id + val writeSchemaId = buffer.getInt() + val writerSchema = schemaRegistry.getByID(writeSchemaId) + + // we want to deserialize with the last schema + val subject = "pageview" + "-value" + val readerSchemaId = schemaRegistry.getLatestSchemaMetadata(subject).getId + val readerSchema = schemaRegistry.getByID(readerSchemaId) + + // this logic is quite specific to Confluent Avro format that uses schema registry for compresion + val length = buffer.limit() - 1 - 4 + val start = buffer.position() + buffer.arrayOffset() + val decoder = DecoderFactory.get().binaryDecoder(buffer.array(), start, length, null) + + // work with GenericRecord + // val pageViewSchema = Pageview.getClassSchema().toString() + // val datumReader = new GenericDatumReader[GenericRecord](pageViewSchema) + + // SpecificDatumReader needs a type + val datumReader = new SpecificDatumReader[Pageview](writerSchema, readerSchema) + datumReader.read(null, decoder) + } + + val deserializeAvro: UserDefinedFunction = udf((bytes: Array[Byte]) => { + deserializeFromConfluentAvro(bytes) + }) + +} diff --git a/src/main/scala/com/github/warfox/sparksandbox/package.scala b/src/main/scala/com/github/warfox/sparksandbox/package.scala new file mode 100644 index 0000000..2b078f6 --- /dev/null +++ b/src/main/scala/com/github/warfox/sparksandbox/package.scala @@ -0,0 +1,20 @@ +package com.github.warfox + +// This class is generated from Pageview avro schema, using sbt-avro +import sandbox.avro.Pageview + +package object sparksandbox { + + // val recordRDD: Dataset[KafkaRecord] = df.as[KafkaRecord] + // the above conversion did not work with avro data for iceberg + // Problems: + // * key: binary cannot be promoted to string + // * value: binary cannot be promoted to string + + case class Pageviews(pageview: Pageview) + case class Pagecount(pageid: String, count: Long) + case class Output( + key: String, + value: String + ) // value needs to binary or string +}