Skip to content

Commit

Permalink
[SPARK-45503][SS] Add Conf to Set RocksDB Compression
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Add a conf to set RocksDB State Store Instance's compression type, with default to be LZ4

### Why are the changes needed?
LZ4 is generally faster than Snappy. That's probably why we use LZ4 in changelogs and other places by default. However, we don't change RocksDB's default of Snappy compression style. The RocksDB Team recommend LZ4 and/or ZSTD (https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#compression) and the default is kept to Snappy only for backward compatibility reason. We should make it tunable with default LZ4

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests should fix it. Some benchmarks are run and we see positive improvements.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43338 from siying/rocksdb_lz4.

Authored-by: Siying Dong <siying.dong@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
siying authored and HeartSaVioR committed Oct 24, 2023
1 parent 68c0f64 commit 4eac024
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2390,6 +2390,11 @@ Here are the configs regarding to RocksDB instance of the state store provider:
<td>Allow the rocksdb runtime to use fallocate to pre-allocate disk space for logs, etc... Disable for apps that have many smaller state stores to trade off disk space for write performance.</td>
<td>true</td>
</tr>
<tr>
<td>spark.sql.streaming.stateStore.rocksdb.compression</td>
<td>Compression type used in RocksDB. The string is converted RocksDB compression type through RocksDB Java API getCompressionType(). </td>
<td>lz4</td>
</tr>
</table>

##### RocksDB State Store Memory Management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.rocksdb.{RocksDB => NativeRocksDB, _}
import org.rocksdb.CompressionType._
import org.rocksdb.TickerType._

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -91,7 +92,7 @@ class RocksDB(
tableFormatConfig.setPinL0FilterAndIndexBlocksInCache(true)
}

private val columnFamilyOptions = new ColumnFamilyOptions()
private[state] val columnFamilyOptions = new ColumnFamilyOptions()

// Set RocksDB options around MemTable memory usage. By default, we let RocksDB
// use its internal default values for these settings.
Expand All @@ -103,6 +104,8 @@ class RocksDB(
columnFamilyOptions.setMaxWriteBufferNumber(conf.maxWriteBufferNumber)
}

columnFamilyOptions.setCompressionType(getCompressionType(conf.compression))

private val dbOptions =
new Options(new DBOptions(), columnFamilyOptions) // options to open the RocksDB

Expand Down Expand Up @@ -676,7 +679,8 @@ case class RocksDBConf(
writeBufferCacheRatio: Double,
highPriorityPoolRatio: Double,
compressionCodec: String,
allowFAllocate: Boolean)
allowFAllocate: Boolean,
compression: String)

object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
Expand Down Expand Up @@ -767,6 +771,10 @@ object RocksDBConf {
val ALLOW_FALLOCATE_CONF_KEY = "allowFAllocate"
private val ALLOW_FALLOCATE_CONF = SQLConfEntry(ALLOW_FALLOCATE_CONF_KEY, "true")

// Pass as compression type to RocksDB.
val COMPRESSION_KEY = "compression"
private val COMPRESSION_CONF = SQLConfEntry(COMPRESSION_KEY, "lz4")

def apply(storeConf: StateStoreConf): RocksDBConf = {
val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
Expand Down Expand Up @@ -826,6 +834,14 @@ object RocksDBConf {
}
}

def getStringConf(conf: ConfEntry): String = {
Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toString } getOrElse {
throw new IllegalArgumentException(
s"Invalid value for '${conf.fullName}', must be a string"
)
}
}

RocksDBConf(
storeConf.minVersionsToRetain,
storeConf.minDeltasForSnapshot,
Expand All @@ -845,7 +861,8 @@ object RocksDBConf {
getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
storeConf.compressionCodec,
getBooleanConf(ALLOW_FALLOCATE_CONF))
getBooleanConf(ALLOW_FALLOCATE_CONF),
getStringConf(COMPRESSION_CONF))
}

def apply(): RocksDBConf = apply(new StateStoreConf())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", "3"),
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB", "16"),
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".allowFAllocate", "false"),
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".compression", "zstd"),
(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
)
testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
Expand Down Expand Up @@ -117,6 +118,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
assert(rocksDBConfInTask.maxWriteBufferNumber == 3)
assert(rocksDBConfInTask.writeBufferSizeMB == 16L)
assert(rocksDBConfInTask.allowFAllocate == false)
assert(rocksDBConfInTask.compression == "zstd")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.language.implicitConversions

import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.rocksdb.CompressionType
import org.scalactic.source.Position
import org.scalatest.Tag

Expand Down Expand Up @@ -373,6 +374,21 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}

test("RocksDB: compression conf") {
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets created

val conf = RocksDBConf().copy(compression = "zstd")
withDB(remoteDir, conf = conf) { db =>
assert(db.columnFamilyOptions.compressionType() == CompressionType.ZSTD_COMPRESSION)
}

// Test the default is LZ4
withDB(remoteDir, conf = RocksDBConf().copy()) { db =>
assert(db.columnFamilyOptions.compressionType() == CompressionType.LZ4_COMPRESSION)
}
}

test("RocksDB: get, put, iterator, commit, load") {
def testOps(compactOnCommit: Boolean): Unit = {
val remoteDir = Utils.createTempDir().toString
Expand Down

0 comments on commit 4eac024

Please sign in to comment.