Skip to content

Commit

Permalink
[SPARK-25931][SQL] Benchmarking creation of Jackson parser
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Added new benchmark which forcibly invokes Jackson parser to check overhead of its creation for short and wide JSON strings. Existing benchmarks do not allow to check that due to an optimisation introduced by apache#21909 for empty schema pushed down to JSON datasource. The `count()` action passes empty schema as required schema to the datasource, and Jackson parser is not created at all in that case.

Besides of new benchmark I also refactored existing benchmarks:
- Added `numIters` to control number of iteration in each benchmark
- Renamed `JSON per-line parsing` -> `count a short column`, `JSON parsing of wide lines` -> `count a wide column`, and `Count a dataset with 10 columns` -> `Select a subset of 10 columns`.

Closes apache#22920 from MaxGekk/json-benchmark-follow-up.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
MaxGekk and dongjoon-hyun committed Nov 3, 2018
1 parent 0e318ac commit 42b6c1f
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 60 deletions.
35 changes: 23 additions & 12 deletions sql/core/benchmarks/JSONBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,42 @@ OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
JSON schema inferring: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
No encoding 62946 / 63310 1.6 629.5 1.0X
UTF-8 is set 112814 / 112866 0.9 1128.1 0.6X
No encoding 71832 / 72149 1.4 718.3 1.0X
UTF-8 is set 101700 / 101819 1.0 1017.0 0.7X

Preparing data for benchmarking ...
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
JSON per-line parsing: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
count a short column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
No encoding 16468 / 16553 6.1 164.7 1.0X
UTF-8 is set 16420 / 16441 6.1 164.2 1.0X
No encoding 16501 / 16519 6.1 165.0 1.0X
UTF-8 is set 16477 / 16516 6.1 164.8 1.0X

Preparing data for benchmarking ...
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
JSON parsing of wide lines: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
count a wide column: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
No encoding 39789 / 40053 0.3 3978.9 1.0X
UTF-8 is set 39505 / 39584 0.3 3950.5 1.0X
No encoding 39871 / 40242 0.3 3987.1 1.0X
UTF-8 is set 39581 / 39721 0.3 3958.1 1.0X

Preparing data for benchmarking ...
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Select a subset of 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Select 10 columns + count() 16011 / 16033 0.6 1601.1 1.0X
Select 1 column + count() 14350 / 14392 0.7 1435.0 1.1X
count() 3007 / 3034 3.3 300.7 5.3X

Preparing data for benchmarking ...
OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Count a dataset with 10 columns: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
creation of JSON parser per line: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Select 10 columns + count() 15997 / 16015 0.6 1599.7 1.0X
Select 1 column + count() 13280 / 13326 0.8 1328.0 1.2X
count() 3006 / 3021 3.3 300.6 5.3X
Short column without encoding 8334 / 8453 1.2 833.4 1.0X
Short column with UTF-8 13627 / 13784 0.7 1362.7 0.6X
Wide column without encoding 155073 / 155351 0.1 15507.3 0.1X
Wide column with UTF-8 212114 / 212263 0.0 21211.4 0.0X


Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ import org.apache.spark.sql.types._
object JSONBenchmark extends SqlBasedBenchmark {
import spark.implicits._

def schemaInferring(rowsNum: Int): Unit = {
def prepareDataInfo(benchmark: Benchmark): Unit = {
// scalastyle:off println
benchmark.out.println("Preparing data for benchmarking ...")
// scalastyle:on println
}

def schemaInferring(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark("JSON schema inferring", rowsNum, output = output)

withTempPath { path =>
// scalastyle:off println
benchmark.out.println("Preparing data for benchmarking ...")
// scalastyle:on println
prepareDataInfo(benchmark)

spark.sparkContext.range(0, rowsNum, 1)
.map(_ => "a")
Expand All @@ -54,11 +58,11 @@ object JSONBenchmark extends SqlBasedBenchmark {
.option("encoding", "UTF-8")
.json(path.getAbsolutePath)

benchmark.addCase("No encoding", 3) { _ =>
benchmark.addCase("No encoding", numIters) { _ =>
spark.read.json(path.getAbsolutePath)
}

benchmark.addCase("UTF-8 is set", 3) { _ =>
benchmark.addCase("UTF-8 is set", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.json(path.getAbsolutePath)
Expand All @@ -68,28 +72,29 @@ object JSONBenchmark extends SqlBasedBenchmark {
}
}

def perlineParsing(rowsNum: Int): Unit = {
val benchmark = new Benchmark("JSON per-line parsing", rowsNum, output = output)
def writeShortColumn(path: String, rowsNum: Int): StructType = {
spark.sparkContext.range(0, rowsNum, 1)
.map(_ => "a")
.toDF("fieldA")
.write.json(path)
new StructType().add("fieldA", StringType)
}

withTempPath { path =>
// scalastyle:off println
benchmark.out.println("Preparing data for benchmarking ...")
// scalastyle:on println
def countShortColumn(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark("count a short column", rowsNum, output = output)

spark.sparkContext.range(0, rowsNum, 1)
.map(_ => "a")
.toDF("fieldA")
.write.json(path.getAbsolutePath)
val schema = new StructType().add("fieldA", StringType)
withTempPath { path =>
prepareDataInfo(benchmark)
val schema = writeShortColumn(path.getAbsolutePath, rowsNum)

benchmark.addCase("No encoding", 3) { _ =>
benchmark.addCase("No encoding", numIters) { _ =>
spark.read
.schema(schema)
.json(path.getAbsolutePath)
.count()
}

benchmark.addCase("UTF-8 is set", 3) { _ =>
benchmark.addCase("UTF-8 is set", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.schema(schema)
Expand All @@ -101,35 +106,36 @@ object JSONBenchmark extends SqlBasedBenchmark {
}
}

def perlineParsingOfWideColumn(rowsNum: Int): Unit = {
val benchmark = new Benchmark("JSON parsing of wide lines", rowsNum, output = output)
def writeWideColumn(path: String, rowsNum: Int): StructType = {
spark.sparkContext.range(0, rowsNum, 1)
.map { i =>
val s = "abcdef0123456789ABCDEF" * 20
s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}"""
}
.toDF().write.text(path)
new StructType()
.add("a", StringType).add("b", LongType)
.add("c", StringType).add("d", LongType)
.add("e", StringType).add("f", LongType)
.add("x", StringType).add("y", LongType)
.add("z", StringType)
}

def countWideColumn(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark("count a wide column", rowsNum, output = output)

withTempPath { path =>
// scalastyle:off println
benchmark.out.println("Preparing data for benchmarking ...")
// scalastyle:on println
prepareDataInfo(benchmark)
val schema = writeWideColumn(path.getAbsolutePath, rowsNum)

spark.sparkContext.range(0, rowsNum, 1)
.map { i =>
val s = "abcdef0123456789ABCDEF" * 20
s"""{"a":"$s","b": $i,"c":"$s","d":$i,"e":"$s","f":$i,"x":"$s","y":$i,"z":"$s"}"""
}
.toDF().write.text(path.getAbsolutePath)
val schema = new StructType()
.add("a", StringType).add("b", LongType)
.add("c", StringType).add("d", LongType)
.add("e", StringType).add("f", LongType)
.add("x", StringType).add("y", LongType)
.add("z", StringType)

benchmark.addCase("No encoding", 3) { _ =>
benchmark.addCase("No encoding", numIters) { _ =>
spark.read
.schema(schema)
.json(path.getAbsolutePath)
.count()
}

benchmark.addCase("UTF-8 is set", 3) { _ =>
benchmark.addCase("UTF-8 is set", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.schema(schema)
Expand All @@ -141,12 +147,14 @@ object JSONBenchmark extends SqlBasedBenchmark {
}
}

def countBenchmark(rowsNum: Int): Unit = {
def selectSubsetOfColumns(rowsNum: Int, numIters: Int): Unit = {
val colsNum = 10
val benchmark =
new Benchmark(s"Count a dataset with $colsNum columns", rowsNum, output = output)
new Benchmark(s"Select a subset of $colsNum columns", rowsNum, output = output)

withTempPath { path =>
prepareDataInfo(benchmark)

val fields = Seq.tabulate(colsNum)(i => StructField(s"col$i", IntegerType))
val schema = StructType(fields)
val columnNames = schema.fieldNames
Expand All @@ -158,26 +166,78 @@ object JSONBenchmark extends SqlBasedBenchmark {

val ds = spark.read.schema(schema).json(path.getAbsolutePath)

benchmark.addCase(s"Select $colsNum columns + count()", 3) { _ =>
benchmark.addCase(s"Select $colsNum columns + count()", numIters) { _ =>
ds.select("*").filter((_: Row) => true).count()
}
benchmark.addCase(s"Select 1 column + count()", 3) { _ =>
benchmark.addCase(s"Select 1 column + count()", numIters) { _ =>
ds.select($"col1").filter((_: Row) => true).count()
}
benchmark.addCase(s"count()", 3) { _ =>
benchmark.addCase(s"count()", numIters) { _ =>
ds.count()
}

benchmark.run()
}
}

def jsonParserCreation(rowsNum: Int, numIters: Int): Unit = {
val benchmark = new Benchmark("creation of JSON parser per line", rowsNum, output = output)

withTempPath { path =>
prepareDataInfo(benchmark)

val shortColumnPath = path.getAbsolutePath + "/short"
val shortSchema = writeShortColumn(shortColumnPath, rowsNum)

val wideColumnPath = path.getAbsolutePath + "/wide"
val wideSchema = writeWideColumn(wideColumnPath, rowsNum)

benchmark.addCase("Short column without encoding", numIters) { _ =>
spark.read
.schema(shortSchema)
.json(shortColumnPath)
.filter((_: Row) => true)
.count()
}

benchmark.addCase("Short column with UTF-8", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.schema(shortSchema)
.json(shortColumnPath)
.filter((_: Row) => true)
.count()
}

benchmark.addCase("Wide column without encoding", numIters) { _ =>
spark.read
.schema(wideSchema)
.json(wideColumnPath)
.filter((_: Row) => true)
.count()
}

benchmark.addCase("Wide column with UTF-8", numIters) { _ =>
spark.read
.option("encoding", "UTF-8")
.schema(wideSchema)
.json(wideColumnPath)
.filter((_: Row) => true)
.count()
}

benchmark.run()
}
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
val numIters = 3
runBenchmark("Benchmark for performance of JSON parsing") {
schemaInferring(100 * 1000 * 1000)
perlineParsing(100 * 1000 * 1000)
perlineParsingOfWideColumn(10 * 1000 * 1000)
countBenchmark(10 * 1000 * 1000)
schemaInferring(100 * 1000 * 1000, numIters)
countShortColumn(100 * 1000 * 1000, numIters)
countWideColumn(10 * 1000 * 1000, numIters)
selectSubsetOfColumns(10 * 1000 * 1000, numIters)
jsonParserCreation(10 * 1000 * 1000, numIters)
}
}
}

0 comments on commit 42b6c1f

Please sign in to comment.