Skip to content

Commit

Permalink
[SPARK-49954][SQL] Codegen Support for SchemaOfJson (by Invoke & Runt…
Browse files Browse the repository at this point in the history
…imeReplaceable)
  • Loading branch information
panbingkun committed Oct 15, 2024
1 parent c3176a7 commit 320fb0a
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import org.apache.spark.sql.internal.types.{AbstractMapType, StringTypeWithCaseA
import org.apache.spark.sql.types.{DataType, MapType, StringType, StructType, VariantType}
import org.apache.spark.unsafe.types.UTF8String

object ExprUtils extends QueryErrorsBase {
object ExprUtils extends EvalHelper with QueryErrorsBase {

def evalTypeExpr(exp: Expression): DataType = {
if (exp.foldable) {
exp.eval() match {
prepareForEval(exp).eval() match {
case s: UTF8String if s != null =>
val dataType = DataType.parseTypeWithFallback(
s.toString,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.expressions.json

import com.fasterxml.jackson.core.JsonFactory

import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JsonInferSchema, JSONOptions}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, DataType, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

object JsonExpressionEvalUtils {

def schemaOfJson(
jsonFactory: JsonFactory,
jsonOptions: JSONOptions,
jsonInferSchema: JsonInferSchema,
json: UTF8String): UTF8String = {
val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
parser.nextToken()
// To match with schema inference from JSON datasource.
jsonInferSchema.inferField(parser) match {
case st: StructType =>
jsonInferSchema.canonicalizeType(st, jsonOptions).getOrElse(StructType(Nil))
case at: ArrayType if at.elementType.isInstanceOf[StructType] =>
jsonInferSchema
.canonicalizeType(at.elementType, jsonOptions)
.map(ArrayType(_, containsNull = at.containsNull))
.getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull))
case other: DataType =>
jsonInferSchema.canonicalizeType(other, jsonOptions).getOrElse(
SQLConf.get.defaultStringType)
}
}

UTF8String.fromString(dt.sql)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, CodegenFallback, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
import org.apache.spark.sql.catalyst.expressions.json.JsonExpressionUtils
import org.apache.spark.sql.catalyst.expressions.json.{JsonExpressionEvalUtils, JsonExpressionUtils}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtils
import org.apache.spark.sql.catalyst.json._
Expand Down Expand Up @@ -878,7 +878,9 @@ case class StructsToJson(
case class SchemaOfJson(
child: Expression,
options: Map[String, String])
extends UnaryExpression with CodegenFallback with QueryErrorsBase {
extends UnaryExpression
with RuntimeReplaceable
with QueryErrorsBase {

def this(child: Expression) = this(child, Map.empty[String, String])

Expand Down Expand Up @@ -919,26 +921,20 @@ case class SchemaOfJson(
}
}

override def eval(v: InternalRow): Any = {
val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
parser.nextToken()
// To match with schema inference from JSON datasource.
jsonInferSchema.inferField(parser) match {
case st: StructType =>
jsonInferSchema.canonicalizeType(st, jsonOptions).getOrElse(StructType(Nil))
case at: ArrayType if at.elementType.isInstanceOf[StructType] =>
jsonInferSchema
.canonicalizeType(at.elementType, jsonOptions)
.map(ArrayType(_, containsNull = at.containsNull))
.getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull))
case other: DataType =>
jsonInferSchema.canonicalizeType(other, jsonOptions).getOrElse(
SQLConf.get.defaultStringType)
}
}
@transient private lazy val jsonFactoryObjectType = ObjectType(classOf[JsonFactory])
@transient private lazy val jsonOptionsObjectType = ObjectType(classOf[JSONOptions])
@transient private lazy val jsonInferSchemaObjectType = ObjectType(classOf[JsonInferSchema])

UTF8String.fromString(dt.sql)
}
override def replacement: Expression = StaticInvoke(
JsonExpressionEvalUtils.getClass,
dataType,
"schemaOfJson",
Seq(Literal(jsonFactory, jsonFactoryObjectType),
Literal(jsonOptions, jsonOptionsObjectType),
Literal(jsonInferSchema, jsonInferSchemaObjectType),
child),
Seq(jsonFactoryObjectType, jsonOptionsObjectType, jsonInferSchemaObjectType, child.dataType)
)

override def prettyName: String = "schema_of_json"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [schema_of_json([{"col":01}]) AS schema_of_json([{"col":01}])#0]
Project [static_invoke(JsonExpressionEvalUtils.schemaOfJson(com.fasterxml.jackson.core.JsonFactory, org.apache.spark.sql.catalyst.json.JSONOptions, org.apache.spark.sql.catalyst.json.JsonInferSchema, [{"col":01}])) AS schema_of_json([{"col":01}])#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [schema_of_json([{"col":01}], (allowNumericLeadingZeros,true)) AS schema_of_json([{"col":01}])#0]
Project [static_invoke(JsonExpressionEvalUtils.schemaOfJson(com.fasterxml.jackson.core.JsonFactory, org.apache.spark.sql.catalyst.json.JSONOptions, org.apache.spark.sql.catalyst.json.JsonInferSchema, [{"col":01}])) AS schema_of_json([{"col":01}])#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]

0 comments on commit 320fb0a

Please sign in to comment.