diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 53fd06fd24c27..37a8785f04d6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -126,10 +126,8 @@ private[sql] class RocksDBStateStoreProvider CUSTOM_METRIC_PUT_TIME -> sumNativeOpsLatencyMillis("put"), CUSTOM_METRIC_GET_COUNT -> nativeOpsCount("get"), CUSTOM_METRIC_PUT_COUNT -> nativeOpsCount("put"), - CUSTOM_METRIC_WRITEBATCH_TIME -> commitLatencyMs("writeBatch"), CUSTOM_METRIC_FLUSH_TIME -> commitLatencyMs("flush"), CUSTOM_METRIC_COMMIT_COMPACT_TIME -> commitLatencyMs("compact"), - CUSTOM_METRIC_PAUSE_TIME -> commitLatencyMs("pauseBg"), CUSTOM_METRIC_CHECKPOINT_TIME -> commitLatencyMs("checkpoint"), CUSTOM_METRIC_FILESYNC_TIME -> commitLatencyMs("fileSync"), CUSTOM_METRIC_BYTES_COPIED -> rocksDBMetrics.bytesCopied, @@ -270,14 +268,10 @@ object RocksDBStateStoreProvider { "rocksdbPutCount", "RocksDB: number of put calls") // Commit latency detailed breakdown - val CUSTOM_METRIC_WRITEBATCH_TIME = StateStoreCustomTimingMetric( - "rocksdbCommitWriteBatchLatency", "RocksDB: commit - write batch time") val CUSTOM_METRIC_FLUSH_TIME = StateStoreCustomTimingMetric( "rocksdbCommitFlushLatency", "RocksDB: commit - flush time") val CUSTOM_METRIC_COMMIT_COMPACT_TIME = StateStoreCustomTimingMetric( "rocksdbCommitCompactLatency", "RocksDB: commit - compact time") - val CUSTOM_METRIC_PAUSE_TIME = StateStoreCustomTimingMetric( - "rocksdbCommitPauseLatency", "RocksDB: commit - pause bg time") val CUSTOM_METRIC_CHECKPOINT_TIME = StateStoreCustomTimingMetric( "rocksdbCommitCheckpointLatency", "RocksDB: commit - checkpoint time") val CUSTOM_METRIC_FILESYNC_TIME = StateStoreCustomTimingMetric( @@ -332,8 +326,8 @@ object RocksDBStateStoreProvider { val ALL_CUSTOM_METRICS = Seq( CUSTOM_METRIC_SST_FILE_SIZE, CUSTOM_METRIC_GET_TIME, CUSTOM_METRIC_PUT_TIME, - CUSTOM_METRIC_WRITEBATCH_TIME, CUSTOM_METRIC_FLUSH_TIME, CUSTOM_METRIC_COMMIT_COMPACT_TIME, - CUSTOM_METRIC_PAUSE_TIME, CUSTOM_METRIC_CHECKPOINT_TIME, CUSTOM_METRIC_FILESYNC_TIME, + CUSTOM_METRIC_FLUSH_TIME, CUSTOM_METRIC_COMMIT_COMPACT_TIME, + CUSTOM_METRIC_CHECKPOINT_TIME, CUSTOM_METRIC_FILESYNC_TIME, CUSTOM_METRIC_BYTES_COPIED, CUSTOM_METRIC_FILES_COPIED, CUSTOM_METRIC_FILES_REUSED, CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED, CUSTOM_METRIC_GET_COUNT, CUSTOM_METRIC_PUT_COUNT, CUSTOM_METRIC_BLOCK_CACHE_MISS, CUSTOM_METRIC_BLOCK_CACHE_HITS, CUSTOM_METRIC_BYTES_READ, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala index 339d00058fcf6..2eb7d98bea828 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala @@ -94,8 +94,8 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest val stateOperatorMetrics = nextProgress.stateOperators(0) assert(JavaConverters.asScalaSet(stateOperatorMetrics.customMetrics.keySet) === Set( "rocksdbGetLatency", "rocksdbCommitCompactLatency", "rocksdbBytesCopied", - "rocksdbPutLatency", "rocksdbCommitPauseLatency", "rocksdbFilesReused", - "rocksdbCommitWriteBatchLatency", "rocksdbFilesCopied", "rocksdbSstFileSize", + "rocksdbPutLatency", "rocksdbFilesReused", + "rocksdbFilesCopied", "rocksdbSstFileSize", "rocksdbCommitCheckpointLatency", "rocksdbZipFileBytesUncompressed", "rocksdbCommitFlushLatency", "rocksdbCommitFileSyncLatencyMs", "rocksdbGetCount", "rocksdbPutCount", "rocksdbTotalBytesRead", "rocksdbTotalBytesWritten",