From 4eac0244c7651cb23ed4ae7dec3b36661b9f6408 Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Wed, 25 Oct 2023 05:25:43 +0900 Subject: [PATCH] [SPARK-45503][SS] Add Conf to Set RocksDB Compression ### What changes were proposed in this pull request? Add a conf to set RocksDB State Store Instance's compression type, with default to be LZ4 ### Why are the changes needed? LZ4 is generally faster than Snappy. That's probably why we use LZ4 in changelogs and other places by default. However, we don't change RocksDB's default of Snappy compression style. The RocksDB Team recommend LZ4 and/or ZSTD (https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#compression) and the default is kept to Snappy only for backward compatibility reason. We should make it tunable with default LZ4 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests should fix it. Some benchmarks are run and we see positive improvements. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43338 from siying/rocksdb_lz4. Authored-by: Siying Dong Signed-off-by: Jungtaek Lim --- .../structured-streaming-programming-guide.md | 5 ++++ .../execution/streaming/state/RocksDB.scala | 23 ++++++++++++++++--- .../state/RocksDBStateStoreSuite.scala | 2 ++ .../streaming/state/RocksDBSuite.scala | 16 +++++++++++++ 4 files changed, 43 insertions(+), 3 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 9fb823abaa3ab..547834c7f9e3a 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2390,6 +2390,11 @@ Here are the configs regarding to RocksDB instance of the state store provider: Allow the rocksdb runtime to use fallocate to pre-allocate disk space for logs, etc... Disable for apps that have many smaller state stores to trade off disk space for write performance. true + + spark.sql.streaming.stateStore.rocksdb.compression + Compression type used in RocksDB. The string is converted RocksDB compression type through RocksDB Java API getCompressionType(). + lz4 + ##### RocksDB State Store Memory Management diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 1e3f3a67f16f6..40f63c86a6a4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.rocksdb.{RocksDB => NativeRocksDB, _} +import org.rocksdb.CompressionType._ import org.rocksdb.TickerType._ import org.apache.spark.TaskContext @@ -91,7 +92,7 @@ class RocksDB( tableFormatConfig.setPinL0FilterAndIndexBlocksInCache(true) } - private val columnFamilyOptions = new ColumnFamilyOptions() + private[state] val columnFamilyOptions = new ColumnFamilyOptions() // Set RocksDB options around MemTable memory usage. By default, we let RocksDB // use its internal default values for these settings. @@ -103,6 +104,8 @@ class RocksDB( columnFamilyOptions.setMaxWriteBufferNumber(conf.maxWriteBufferNumber) } + columnFamilyOptions.setCompressionType(getCompressionType(conf.compression)) + private val dbOptions = new Options(new DBOptions(), columnFamilyOptions) // options to open the RocksDB @@ -676,7 +679,8 @@ case class RocksDBConf( writeBufferCacheRatio: Double, highPriorityPoolRatio: Double, compressionCodec: String, - allowFAllocate: Boolean) + allowFAllocate: Boolean, + compression: String) object RocksDBConf { /** Common prefix of all confs in SQLConf that affects RocksDB */ @@ -767,6 +771,10 @@ object RocksDBConf { val ALLOW_FALLOCATE_CONF_KEY = "allowFAllocate" private val ALLOW_FALLOCATE_CONF = SQLConfEntry(ALLOW_FALLOCATE_CONF_KEY, "true") + // Pass as compression type to RocksDB. + val COMPRESSION_KEY = "compression" + private val COMPRESSION_CONF = SQLConfEntry(COMPRESSION_KEY, "lz4") + def apply(storeConf: StateStoreConf): RocksDBConf = { val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs) val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions) @@ -826,6 +834,14 @@ object RocksDBConf { } } + def getStringConf(conf: ConfEntry): String = { + Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toString } getOrElse { + throw new IllegalArgumentException( + s"Invalid value for '${conf.fullName}', must be a string" + ) + } + } + RocksDBConf( storeConf.minVersionsToRetain, storeConf.minDeltasForSnapshot, @@ -845,7 +861,8 @@ object RocksDBConf { getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF), getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF), storeConf.compressionCodec, - getBooleanConf(ALLOW_FALLOCATE_CONF)) + getBooleanConf(ALLOW_FALLOCATE_CONF), + getStringConf(COMPRESSION_CONF)) } def apply(): RocksDBConf = apply(new StateStoreConf()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index 4ce344d4e73a3..a6e65825a5bca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -87,6 +87,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", "3"), (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB", "16"), (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".allowFAllocate", "false"), + (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".compression", "zstd"), (SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4") ) testConfs.foreach { case (k, v) => spark.conf.set(k, v) } @@ -117,6 +118,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid assert(rocksDBConfInTask.maxWriteBufferNumber == 3) assert(rocksDBConfInTask.writeBufferSizeMB == 16L) assert(rocksDBConfInTask.allowFAllocate == false) + assert(rocksDBConfInTask.compression == "zstd") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 15d35bae700fe..ac50e55202919 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -24,6 +24,7 @@ import scala.language.implicitConversions import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration +import org.rocksdb.CompressionType import org.scalactic.source.Position import org.scalatest.Tag @@ -373,6 +374,21 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("RocksDB: compression conf") { + val remoteDir = Utils.createTempDir().toString + new File(remoteDir).delete() // to make sure that the directory gets created + + val conf = RocksDBConf().copy(compression = "zstd") + withDB(remoteDir, conf = conf) { db => + assert(db.columnFamilyOptions.compressionType() == CompressionType.ZSTD_COMPRESSION) + } + + // Test the default is LZ4 + withDB(remoteDir, conf = RocksDBConf().copy()) { db => + assert(db.columnFamilyOptions.compressionType() == CompressionType.LZ4_COMPRESSION) + } + } + test("RocksDB: get, put, iterator, commit, load") { def testOps(compactOnCommit: Boolean): Unit = { val remoteDir = Utils.createTempDir().toString