From 8d3d4f9b900dadede3b8e33af830e5ef66682923 Mon Sep 17 00:00:00 2001 From: Ted Jenks Date: Fri, 12 Jul 2024 18:33:20 +0800 Subject: [PATCH] [SPARK-47307][SQL] Add a config to optionally chunk base64 strings Follow up #45408 ### What changes were proposed in this pull request? [[SPARK-47307](https://issues.apache.org/jira/browse/SPARK-47307)] Add a config to optionally chunk base64 strings ### Why are the changes needed? In #35110, it was incorrectly asserted that: > ApacheCommonBase64 obeys http://www.ietf.org/rfc/rfc2045.txt This is not true as the previous code called: ```java public static byte[] encodeBase64(byte[] binaryData) ``` Which states: > Encodes binary data using the base64 algorithm but does not chunk the output. However, the RFC 2045 (MIME) base64 encoder does chunk by default. This now means that any Spark encoded base64 strings cannot be decoded by encoders that do not implement RFC 2045. The docs state RFC 4648. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test suite. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47303 from wForget/SPARK-47307. Lead-authored-by: Ted Jenks Co-authored-by: wforget <643348094@qq.com> Co-authored-by: Kent Yao Co-authored-by: Ted Chester Jenks Signed-off-by: Kent Yao --- .../explain-results/function_base64.explain | 2 +- .../expressions/stringExpressions.scala | 40 +++++++++++++------ .../apache/spark/sql/internal/SQLConf.scala | 11 +++++ .../expressions/StringExpressionsSuite.scala | 13 ++++++ 4 files changed, 53 insertions(+), 13 deletions(-) diff --git a/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain b/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain index f80f3522190d8..d3a250919ea5d 100644 --- a/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain +++ b/connect/common/src/test/resources/query-tests/explain-results/function_base64.explain @@ -1,2 +1,2 @@ -Project [base64(cast(g#0 as binary)) AS base64(CAST(g AS BINARY))#0] +Project [static_invoke(Base64.encode(cast(g#0 as binary), false)) AS base64(CAST(g AS BINARY))#0] +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index f25f58731c8cd..b188b9c2630fa 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -2682,24 +2682,40 @@ case class Chr(child: Expression) """, since = "1.5.0", group = "string_funcs") -case class Base64(child: Expression) - extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant { +case class Base64(child: Expression, chunkBase64: Boolean) + extends UnaryExpression with RuntimeReplaceable with ImplicitCastInputTypes { + + def this(expr: Expression) = this(expr, SQLConf.get.chunkBase64StringEnabled) override def dataType: DataType = SQLConf.get.defaultStringType override def inputTypes: Seq[DataType] = Seq(BinaryType) - protected override def nullSafeEval(bytes: Any): Any = { - UTF8String.fromBytes(JBase64.getMimeEncoder.encode(bytes.asInstanceOf[Array[Byte]])) - } + override def replacement: Expression = StaticInvoke( + classOf[Base64], + dataType, + "encode", + Seq(child, Literal(chunkBase64, BooleanType)), + Seq(BinaryType, BooleanType)) - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - nullSafeCodeGen(ctx, ev, (child) => { - s"""${ev.value} = UTF8String.fromBytes( - ${classOf[JBase64].getName}.getMimeEncoder().encode($child)); - """}) - } + override def toString: String = s"$prettyName($child)" - override protected def withNewChildInternal(newChild: Expression): Base64 = copy(child = newChild) + override protected def withNewChildInternal(newChild: Expression): Expression = + copy(child = newChild) +} + +object Base64 { + def apply(expr: Expression): Base64 = new Base64(expr) + + private lazy val nonChunkEncoder = JBase64.getMimeEncoder(-1, Array()) + + def encode(input: Array[Byte], chunkBase64: Boolean): UTF8String = { + val encoder = if (chunkBase64) { + JBase64.getMimeEncoder + } else { + nonChunkEncoder + } + UTF8String.fromBytes(encoder.encode(input)) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6ca831f99304b..65beb21d59d92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3525,6 +3525,15 @@ object SQLConf { .booleanConf .createWithDefault(false) + val CHUNK_BASE64_STRING_ENABLED = buildConf("spark.sql.legacy.chunkBase64String.enabled") + .internal() + .doc("Whether to truncate string generated by the `Base64` function. When true, base64" + + " strings generated by the base64 function are chunked into lines of at most 76" + + " characters. When false, the base64 strings are not chunked.") + .version("3.5.2") + .booleanConf + .createWithDefault(false) + val ENABLE_DEFAULT_COLUMNS = buildConf("spark.sql.defaultColumn.enabled") .internal() @@ -5856,6 +5865,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def ansiRelationPrecedence: Boolean = ansiEnabled && getConf(ANSI_RELATION_PRECEDENCE) + def chunkBase64StringEnabled: Boolean = getConf(CHUNK_BASE64_STRING_ENABLED) + def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match { case "TIMESTAMP_LTZ" => // For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala index ebd7245434819..2ad8652f2b314 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala @@ -509,6 +509,19 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")).replacement :: Nil) } + test("SPARK-47307: base64 encoding without chunking") { + val longString = "a" * 58 + val encoded = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ==" + withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "false") { + checkEvaluation(Base64(Literal(longString.getBytes)), encoded) + } + val chunkEncoded = + s"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\r\nYQ==" + withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "true") { + checkEvaluation(Base64(Literal(longString.getBytes)), chunkEncoded) + } + } + test("initcap unit test") { checkEvaluation(InitCap(Literal.create(null, StringType)), null) checkEvaluation(InitCap(Literal("a b")), "A B")