Skip to content

Commit

Permalink
Merge branch 'spark-streaming'
Browse files Browse the repository at this point in the history
  • Loading branch information
WarFox committed Mar 4, 2024
2 parents f3101f1 + 28c25be commit 0ab7df5
Show file tree
Hide file tree
Showing 22 changed files with 1,081 additions and 12 deletions.
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,69 @@ Sandbox project for playing around with Spark

PRs welcome!

TODO: Transactional topic with no compaction - consecutive offsets
TODO: Transactional topic with compaction - non consecutive offsets

## Build

## Kafka Avro data

Structured Streaming in Confluent Platform provides =from_avro= function that
accepts Confluent Schema registry url as a parameter and handles Confluent Avro
format (wire format) automatically

https://docs.databricks.com/structured-streaming/avro-dataframe.html

## Submit

```
spark-submit --master localhost[*]
--jars target
--class <main-class> \
<application-jar> \
[application-arguments]
```

```sh
sbt package && \
spark-submit --packages \
org.apache.spark:spark-sql_2.12:3.3.1,\
org.apache.spark:spark-streaming_2.12:3.3.1,\
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1 \
--class com.github.warfox.sparksandbox.StructuredStreamingKafkaExample \
--name StructuredStreamingKafkaExample target/scala-2.12/spark-sandbox_2.12-0.0.1.jar
```

```sh
sbt package and
(spark-submit --packages org.apache.spark:spark-sql_2.12:3.3.1,
org.apache.spark:spark-streaming_2.12:3.3.1,
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
--class com.github.warfox.sparksandbox.StructuredStreamingKafkaExample
--name StructuredStreamingKafkaExample target/scala-2.12/spark-sandbox_2.12-0.0.1.jar)
```

org.apache.hadoop:hadoop-common:3.3.4,\
org.apache.hadoop:hadoop-aws:3.3.4,
com.fasterxml.jackson.core:jackson-databind:2.12.7 \
--conf spark.jars.ivySettings=./ivysettings.xml \

## References ##

- https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-libraries.html
- https://spark.apache.org/docs/latest/quick-start.html
- https://bzhangusc.wordpress.com/2015/11/20/use-sbt-console-as-spark-shell/
- https://github.com/MrPowers/spark-examples/blob/master/src/main/scala/com/github/mrpowers/spark/examples/SparkSummit.scala
- https://sparkbyexamples.com/spark/different-ways-to-create-a-spark-dataframe/

export SBT_OPTS="-XX:+CMSClassUnloadingEnabled -Xmx2G -Xms1G"


* How to set the batch size? Time?

* Using spark with Schema Registry
https://github.com/xebia-france/spark-structured-streaming-blog/blob/master/src/main/scala/AvroConsumer.scala

https://datachef.co/blog/deserialzing-confluent-avro-record-kafka-spark/

https://www.dremio.com/blog/streaming-data-into-apache-iceberg-tables-using-aws-kinesis-and-aws-glue/
30 changes: 30 additions & 0 deletions bin/kcat.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env bash

if ! kcat -v <the_command> &> /dev/null
then
echo "kcat could not be found. "
brew install kcat
fi

set -ex

# example producer and consumer
echo "one:one" | kcat -P -b localhost -t test -K:
echo "two:two" | kcat -P -b localhost -t test -K:
echo "three:three" | kcat -P -b localhost -t test -K:

# # tombstone needs -Z
echo "three:" | kcat -P -b localhost -t test -Z -K:
echo "four:four" | kcat -P -b localhost -t test -K:
echo "three:3" | kcat -P -b localhost -t test -K:

## example producer and consumer

kcat -C -b localhost -t test -Z -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\nTimestamp: %T\tPartition: %p\tOffset: %o\n--\n'

# show key, value, timestamp and partition
kcat -C -b localhost -t pageviews -Z -s value=avro -r http://localhost:8081 -f 'Key: %k,Value: %s,Timestamp: %T,Partition: %p,Offset: %o\n'


# show key, value, timestamp, partition and offset with bytes
kcat -C -b localhost -t pageviews -Z -s value=avro -r http://localhost:8081 -f 'Key (%K bytes): %k,Value (%S bytes): %s,Timestamp: %T,Partition: %p,Offset: %o\n'
59 changes: 59 additions & 0 deletions bin/submit-batch-kafka-to-hudi-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env bash
mkdir -p /tmp/spark-events

output=console

usage() { # Function: Print a help message.
echo "Usage: $0 [ -t topic ] [ -o console/hudi ]" 1>&2
}
exit_abnormal() { # Function: Exit with error.
usage
exit 1
}

while getopts ":t:o:" arg; do
case $arg in
o) output=$OPTARG
;;
t) topic=$OPTARG
;;
*) exit 1
;;
esac
done

echo "Topic is ${topic}, Output to ${output}"

if [ -z "$topic" ] || [ -z "$output" ]; then
exit_abnormal
fi

base_path=/tmp/warehouse/spark-batch/

echo "Spark Batch Kafka To Hudi for $topic"

LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties"
DEBUG="-Dlog4j2.debug=true"

set -ex

PACKAGES=(
org.apache.spark:spark-avro_2.12:3.3.1 # org.apache.spark.sql.avro.SchemaConverter
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2
# org.apache.kafka:kafka-clients:0.11.0.3
)

IFS=","

spark-submit \
--master local[1] \
--num-executors=1 \
--executor-cores=1 \
--packages "${PACKAGES[*]}" \
--conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
--conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
--properties-file ~/Sandbox/spark-sandbox/src/main/resources/spark.properties \
--class com.github.warfox.sparksandbox.BatchKafkaToHudi \
--name BatchKafkaToHudi \
target/scala-2.12/spark-sandbox-assembly-0.0.1.jar $topic $base_path $output
66 changes: 66 additions & 0 deletions bin/submit-hoodie-deltastreamer-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env bash

# This script is used to submit a hoodie deltastreamer job to spark
# It will download the hudi-utils jar if it is not found in the current directory

hudi_utils_version=0.12.1

hudi_utils_jar=hudi-utilities-bundle_2.12-${hudi_utils_version}.jar

hudi_utils_jar_url=https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.12/${hudi_utils_version}/${hudi_utils_jar}

if [ -e ${hudi_utils_jar} ]
then
echo "hudi-utilities found in current directory"
else
echo "hudi-utilities not found in current directory, downloading from ${hudi_utils_jar_url}"
wget ${hudi_utils_jar_url}
fi

set -ex

export topic=$1
table_name=$(echo $topic | sed "s/-/_/g")

echo "topic is $topic, table_name is $table_name"

target_base_path=/tmp/warehouse/hudi/$table_name
target_table=$table_name

mkdir -p /tmp/spark-events

envsubst '${topic}' < src/main/resources/hoodie-conf.properties > "/tmp/hoodie-conf-${topic}.properties"

PACKAGES=(
org.apache.kafka:kafka-clients:3.4.0
# io.confluent:kafka-schema-registry-client:7.3.1 # confluent schema registry client
)

# --repositories https://packages.confluent.io/maven \
# --packages "${PACKAGES[*]}" \


LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties"
DEBUG="-Dlog4j2.debug=true"

spark-submit \
--master local[*] \
--num-executors=1 \
--executor-cores=1 \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
--conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
--properties-file ~/Sandbox/spark-sandbox/src/main/resources/spark.properties \
~/Sandbox/spark-sandbox/${hudi_utils_jar} \
--op INSERT \
--props /tmp/hoodie-conf-${topic}.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field viewtime \
--table-type COPY_ON_WRITE \
--target-base-path file://${target_base_path} \
--target-table $target_table

# Other options
# --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
# --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
54 changes: 54 additions & 0 deletions bin/submit-structured-streaming-confluent-kafka-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env bash

mkdir -p /tmp/spark-events

output=console

while getopts "t:o:" arg; do
case $arg in
o) output=$OPTARG;;
t) topic=$OPTARG;;
esac
done

export output=$output
export topic=$topic
export base_path=/tmp/warehouse/spark/$output/

echo "Topic: ${topic}"
echo "Output: ${output}"
echo "Base Path: ${base_path}"

echo "Spark Structured Streaming from Kafka topic $topic to $output"

LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties"
DEBUG="-Dlog4j2.debug=true"

set -eux

/usr/local/bin/envsubst '${output},${base_path}' < src/main/resources/spark.properties > "/tmp/spark.properties"

PACKAGES=(
org.apache.spark:spark-avro_2.12:3.3.1 # org.apache.spark.sql.avro.SchemaConverter
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2

za.co.absa:abris_2.12:6.3.0

# org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.2.0
# org.apache.kafka:kafka-clients:0.11.0.3

)

/usr/local/bin/spark-submit \
--master local[1] \
--num-executors=1 \
--executor-cores=1 \
--repositories https://packages.confluent.io/maven/ \
--packages $(IFS=,; echo "${PACKAGES[*]}") \
--conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
--conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
--properties-file /tmp/spark.properties \
--class "com.github.warfox.sparksandbox.StructuredStreamingKafkaTo${output}" \
--name "StructuredStreamingKafkaTo${output}" \
target/scala-2.12/spark-sandbox-assembly-0.0.1.jar $topic $base_path
50 changes: 50 additions & 0 deletions bin/submit-structured-streaming-kafka-job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env bash

mkdir -p /tmp/spark-events

output=console

while getopts "t:o:" arg; do
case $arg in
o) output=$OPTARG;;
t) topic=$OPTARG;;
esac
done

export output=$output
export topic=$topic
export base_path=/tmp/warehouse/spark/$output/

echo "Topic: ${topic}"
echo "Output: ${output}"
echo "Base Path: ${base_path}"

echo "Spark Structured Streaming from Kafka topic $topic to $output"

LOG4J_SETTING="-Dlog4j2.configurationFile=file:src/main/resources/log4j2.properties"
DEBUG="-Dlog4j2.debug=true"

set -eux

envsubst '${output},${base_path}' < src/main/resources/spark.properties > "/tmp/spark.properties"

PACKAGES=(
org.apache.spark:spark-avro_2.12:3.3.1 # org.apache.spark.sql.avro.SchemaConverter
org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1
org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.2

# org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.2.0
# org.apache.kafka:kafka-clients:0.11.0.3
)

/usr/local/bin/spark-submit \
--master local[1] \
--num-executors=1 \
--executor-cores=1 \
--packages $(IFS=,; echo "${PACKAGES[*]}") \
--conf "spark.driver.extraJavaOptions=${LOG4J_SETTING}" \
--conf "spark.executor.extraJavaOptions=${LOG4J_SETTING}" \
--properties-file /tmp/spark.properties \
--class "com.github.warfox.sparksandbox.StructuredStreamingKafkaTo${output}" \
--name "StructuredStreamingKafkaTo${output}" \
target/scala-2.12/spark-sandbox-assembly-0.0.1.jar $topic $base_path
Loading

0 comments on commit 0ab7df5

Please sign in to comment.