From bf3ad7e94afb5416d995dc22344566899ee7c4b0 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Thu, 1 Aug 2024 08:48:19 +0800 Subject: [PATCH] [SPARK-49074][SQL] Fix variant with `df.cache()` ### What changes were proposed in this pull request? Currently, the `actualSize` method of the `VARIANT` `columnType` isn't overridden, so we use the default size of 2kb for the `actualSize`. We should define `actualSize` so the cached variant column can correctly be written to the byte buffer. Currently, if the avg per-variant size is greater than 2KB and the total column size is greater than 128KB (the default initial buffer size), an exception will be (incorrectly) thrown. ### Why are the changes needed? to fix caching larger variants (in df.cache()), such as the ones included in the UTs. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? added UT ### Was this patch authored or co-authored using generative AI tooling? no Closes #47559 from richardc-db/fix_variant_cache. Authored-by: Richard Chen Signed-off-by: Wenchen Fan --- .../sql/execution/columnar/ColumnType.scala | 6 +++ .../org/apache/spark/sql/VariantSuite.scala | 53 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala index 5cc3a3d83d4c1..60695a6c5d49c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala @@ -829,6 +829,12 @@ private[columnar] object VARIANT /** Chosen to match the default size set in `VariantType`. */ override def defaultSize: Int = 2048 + override def actualSize(row: InternalRow, ordinal: Int): Int = { + val v = getField(row, ordinal) + // 4 bytes each for the integers representing the 'value' and 'metadata' lengths. + 8 + v.getValue().length + v.getMetadata().length + } + override def getField(row: InternalRow, ordinal: Int): VariantVal = row.getVariant(ordinal) override def setField(row: InternalRow, ordinal: Int, value: VariantVal): Unit = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala index ce2643f9e239c..0c8b0b501951f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala @@ -652,6 +652,21 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval checkAnswer(df, expected.collect()) } + test("variant with many keys in a cached row-based df") { + // The initial size of the buffer backing a cached dataframe column is 128KB. + // See `ColumnBuilder`. + val numKeys = 128 * 1024 + var keyIterator = (0 until numKeys).iterator + val entries = Array.fill(numKeys)(s"""\"${keyIterator.next()}\": \"test\"""") + val jsonStr = s"{${entries.mkString(", ")}}" + val query = s"""select parse_json('${jsonStr}') v from range(0, 10)""" + val df = spark.sql(query) + df.cache() + + val expected = spark.sql(query) + checkAnswer(df, expected.collect()) + } + test("struct of variant in a cached row-based df") { val query = """select named_struct( 'v', parse_json(format_string('{\"a\": %s}', id)), @@ -680,6 +695,21 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval checkAnswer(df, expected.collect()) } + test("array variant with many keys in a cached row-based df") { + // The initial size of the buffer backing a cached dataframe column is 128KB. + // See `ColumnBuilder`. + val numKeys = 128 * 1024 + var keyIterator = (0 until numKeys).iterator + val entries = Array.fill(numKeys)(s"""\"${keyIterator.next()}\": \"test\"""") + val jsonStr = s"{${entries.mkString(", ")}}" + val query = s"""select array(parse_json('${jsonStr}')) v from range(0, 10)""" + val df = spark.sql(query) + df.cache() + + val expected = spark.sql(query) + checkAnswer(df, expected.collect()) + } + test("map of variant in a cached row-based df") { val query = """select map( 'v', parse_json(format_string('{\"a\": %s}', id)), @@ -711,6 +741,29 @@ class VariantSuite extends QueryTest with SharedSparkSession with ExpressionEval } } + test("variant with many keys in a cached column-based df") { + withTable("t") { + // The initial size of the buffer backing a cached dataframe column is 128KB. + // See `ColumnBuilder`. + val numKeys = 128 * 1024 + var keyIterator = (0 until numKeys).iterator + val entries = Array.fill(numKeys)(s"""\"${keyIterator.next()}\": \"test\"""") + val jsonStr = s"{${entries.mkString(", ")}}" + val query = s"""select named_struct( + 'v', parse_json('$jsonStr'), + 'null_v', cast(null as variant), + 'some_null', case when id % 2 = 0 then parse_json(cast(id as string)) else null end + ) v + from range(0, 10)""" + spark.sql(query).write.format("parquet").mode("overwrite").saveAsTable("t") + val df = spark.sql("select * from t") + df.cache() + + val expected = spark.sql(query) + checkAnswer(df, expected.collect()) + } + } + test("variant_get size") { val largeKey = "x" * 1000 val df = Seq(s"""{ "$largeKey": {"a" : 1 },