Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup spark streaming #1

Merged
merged 5 commits into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions bin/delete-topics.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env bash

set -ex

kafka-topics --bootstrap-server localhost:9092 --delete --topic pageviews
kafka-topics --bootstrap-server localhost:9092 --delete --topic pageviews-count
kafka-topics --bootstrap-server localhost:9092 --delete --topic pageviews-processed
kafka-topics --bootstrap-server localhost:9092 --delete --topic pageviews-compacted
5 changes: 5 additions & 0 deletions bin/install-connectors.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash

confluent-hub install confluentinc/kafka-connect-datagen:0.6.0

confluent-hub install confluentinc/kafka-connect-avro-converter:7.3.1
5 changes: 5 additions & 0 deletions bin/setup-connectors.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env bash

set -eux

/usr/local/bin/confluent local services connect connector load datagen-pageviews --config config/connector_datagen-pageviews.config
33 changes: 33 additions & 0 deletions bin/setup-topics.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env bash

set -ex

# https://docs.confluent.io/kafka/operations-tools/partition-determination.html
# A topic partition is the unit of parallelism in Apache Kafka®.

# For both producers and brokers, writes to different partitions can be done in
# parallel. This frees up hardware resources for expensive operations such as
# compression.

# If you want to change the number of partitions or replicas of your Kafka topic,
# you can use a streaming transformation to automatically stream all of the
# messages from the original topic into a new Kafka topic that has the desired
# number of partitions or replicas.

# https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

kafka-topics --bootstrap-server localhost:9092 --create --topic pageviews \
--partitions 5

kafka-topics --bootstrap-server localhost:9092 --create --topic pageviews-count \
--partitions 3

kafka-topics --bootstrap-server localhost:9092 --create --topic pageviews-processed \
--partitions 5

kafka-topics --bootstrap-server localhost:9092 --create --topic pageviews-compacted \
--partitions 5 \
--config cleanup.policy=compact \
--config delete.retention.ms=100 \
--config segment.ms=100 \
--config min.cleanable.dirty.ratio=0.01
48 changes: 31 additions & 17 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,48 @@ version := "0.0.1"

scalaVersion := "2.13.6"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.1" % "provided"
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "3.3.3"
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.3"

libraryDependencies += "com.github.mrpowers" %% "spark-daria" % "1.2.3"

libraryDependencies += "com.github.mrpowers" %% "spark-fast-tests" % "1.1.0" % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.12" % "test"

// jackson-module-scala is required for jackson-databind
// Fixes error: Scala module 2.12.3 requires Jackson Databind version >= 2.12.0 and < 2.13
libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.3"
val sparkVersion = "3.3.1"
val hadoopVersion = "3.3.4"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion, "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion, "provided",
"org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
"org.apache.hadoop" % "hadoop-common" % hadoopVersion,
"com.github.mrpowers" %% "spark-daria" % "1.2.3",

// jackson-module-scala is required for jackson-databind
// Fixes error: Scala module 2.12.3 requires Jackson Databind version >= 2.12.0 and < 2.13
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.3"
)

// Test Dependencies
libraryDependencies ++= Seq(
"com.github.mrpowers" %% "spark-fast-tests" % "1.1.0" % "test",
"org.scalatest" %% "scalatest" % "3.2.12" % "test"
)

// test suite settings
fork in Test := true
javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled")
Test / fork := true

javaOptions ++= Seq(
"-Xms512M",
"-Xmx2048M",
"-XX:MaxPermSize=2048M",
"-XX:+CMSClassUnloadingEnabled"
)
// Show runtime of tests
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-oD")
Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oD")

// JAR file settings

// don't include Scala in the JAR file
//assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
// assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

// Add the JAR file naming conventions described here: https://github.com/MrPowers/spark-style-guide#jar-files
// You can add the JAR file naming conventions by running the shell script

initialCommands in console := s"""
console / initialCommands := s"""
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
Expand Down
15 changes: 15 additions & 0 deletions config/connector_datagen-pageviews.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "datagen-pageviews",
"config": {
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"kafka.topic": "pageviews",
"quickstart": "pageviews",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schemas.enable": "true",
"max.interval": 100,
"iterations": 10000000,
"tasks.max": "1"
}
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.6.2
sbt.version=1.8.2
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.github.warfox.sparksandbox

import org.apache.spark.streaming._

object StreamingNetworkWordCount extends SparkSessionWrapper {

// https://spark.apache.org/docs/latest/streaming-programming-guide.html

// Spark Streaming is a legacy project, we should be using Structured Streaming instead

def main(args: Array[String]) = {

// Run netcat as small data server
// nc -lk 9999

// Initiate StreamingContext from SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

// Split each line into wordss
val words = lines.flatMap(_.split(" "))

// Count each word in each batchh
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.github.warfox.sparksandbox

import org.apache.spark.streaming._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.DataFrame

object StructuredStreamingNetworkCount extends SparkSessionWrapper {

// https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-examplee

def main(args: Array[String]) = {

// Run netcat as small data server
// nc -lk 9999

import spark.implicits._

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()

// Split the lines into words
val words: Dataset[String] = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts: DataFrame = words.groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()

query.awaitTermination()

}

}
Loading