Skip to content

Commit

Permalink
[SPARK-47307][SQL] Add a config to optionally chunk base64 strings
Browse files Browse the repository at this point in the history
Follow up apache#45408

### What changes were proposed in this pull request?
[[SPARK-47307](https://issues.apache.org/jira/browse/SPARK-47307)] Add a config to optionally chunk base64 strings

### Why are the changes needed?
In apache#35110, it was incorrectly asserted that:

> ApacheCommonBase64 obeys http://www.ietf.org/rfc/rfc2045.txt

This is not true as the previous code called:

```java
public static byte[] encodeBase64(byte[] binaryData)
```

Which states:

> Encodes binary data using the base64 algorithm but does not chunk the output.

However, the RFC 2045 (MIME) base64 encoder does chunk by default. This now means that any Spark encoded base64 strings cannot be decoded by encoders that do not implement RFC 2045. The docs state RFC 4648.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing test suite.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47303 from wForget/SPARK-47307.

Lead-authored-by: Ted Jenks <tedcj@palantir.com>
Co-authored-by: wforget <643348094@qq.com>
Co-authored-by: Kent Yao <yao@apache.org>
Co-authored-by: Ted Chester Jenks <tedcj@palantir.com>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
3 people committed Jul 12, 2024
1 parent 236d957 commit 8d3d4f9
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [base64(cast(g#0 as binary)) AS base64(CAST(g AS BINARY))#0]
Project [static_invoke(Base64.encode(cast(g#0 as binary), false)) AS base64(CAST(g AS BINARY))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
Expand Up @@ -2682,24 +2682,40 @@ case class Chr(child: Expression)
""",
since = "1.5.0",
group = "string_funcs")
case class Base64(child: Expression)
extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
case class Base64(child: Expression, chunkBase64: Boolean)
extends UnaryExpression with RuntimeReplaceable with ImplicitCastInputTypes {

def this(expr: Expression) = this(expr, SQLConf.get.chunkBase64StringEnabled)

override def dataType: DataType = SQLConf.get.defaultStringType
override def inputTypes: Seq[DataType] = Seq(BinaryType)

protected override def nullSafeEval(bytes: Any): Any = {
UTF8String.fromBytes(JBase64.getMimeEncoder.encode(bytes.asInstanceOf[Array[Byte]]))
}
override def replacement: Expression = StaticInvoke(
classOf[Base64],
dataType,
"encode",
Seq(child, Literal(chunkBase64, BooleanType)),
Seq(BinaryType, BooleanType))

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, (child) => {
s"""${ev.value} = UTF8String.fromBytes(
${classOf[JBase64].getName}.getMimeEncoder().encode($child));
"""})
}
override def toString: String = s"$prettyName($child)"

override protected def withNewChildInternal(newChild: Expression): Base64 = copy(child = newChild)
override protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)
}

object Base64 {
def apply(expr: Expression): Base64 = new Base64(expr)

private lazy val nonChunkEncoder = JBase64.getMimeEncoder(-1, Array())

def encode(input: Array[Byte], chunkBase64: Boolean): UTF8String = {
val encoder = if (chunkBase64) {
JBase64.getMimeEncoder
} else {
nonChunkEncoder
}
UTF8String.fromBytes(encoder.encode(input))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3525,6 +3525,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val CHUNK_BASE64_STRING_ENABLED = buildConf("spark.sql.legacy.chunkBase64String.enabled")
.internal()
.doc("Whether to truncate string generated by the `Base64` function. When true, base64" +
" strings generated by the base64 function are chunked into lines of at most 76" +
" characters. When false, the base64 strings are not chunked.")
.version("3.5.2")
.booleanConf
.createWithDefault(false)

val ENABLE_DEFAULT_COLUMNS =
buildConf("spark.sql.defaultColumn.enabled")
.internal()
Expand Down Expand Up @@ -5856,6 +5865,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def ansiRelationPrecedence: Boolean = ansiEnabled && getConf(ANSI_RELATION_PRECEDENCE)

def chunkBase64StringEnabled: Boolean = getConf(CHUNK_BASE64_STRING_ENABLED)

def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
case "TIMESTAMP_LTZ" =>
// For historical reason, the TimestampType maps to TIMESTAMP WITH LOCAL TIME ZONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,19 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
GenerateUnsafeProjection.generate(StringDecode(b, Literal("\"quote")).replacement :: Nil)
}

test("SPARK-47307: base64 encoding without chunking") {
val longString = "a" * 58
val encoded = "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYQ=="
withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "false") {
checkEvaluation(Base64(Literal(longString.getBytes)), encoded)
}
val chunkEncoded =
s"YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFh\r\nYQ=="
withSQLConf(SQLConf.CHUNK_BASE64_STRING_ENABLED.key -> "true") {
checkEvaluation(Base64(Literal(longString.getBytes)), chunkEncoded)
}
}

test("initcap unit test") {
checkEvaluation(InitCap(Literal.create(null, StringType)), null)
checkEvaluation(InitCap(Literal("a b")), "A B")
Expand Down

0 comments on commit 8d3d4f9

Please sign in to comment.