Skip to content

Commit

Permalink
[SPARK-49954][SQL] Codegen Support for SchemaOfJson (by Invoke & Ru…
Browse files Browse the repository at this point in the history
…ntimeReplaceable)
  • Loading branch information
panbingkun committed Oct 14, 2024
1 parent 560748c commit c0eba3b
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,22 @@
import java.util.ArrayList;
import java.util.List;

import scala.Option;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;

import org.apache.spark.sql.catalyst.expressions.SharedFactory;
import org.apache.spark.sql.catalyst.json.CreateJacksonParser;
import org.apache.spark.sql.catalyst.json.JSONOptions;
import org.apache.spark.sql.catalyst.json.JsonInferSchema;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

public class JsonExpressionUtils {
Expand Down Expand Up @@ -86,4 +96,33 @@ public static GenericArrayData jsonObjectKeys(UTF8String json) {
return null;
}
}

public static UTF8String schemaOfJson(
JsonFactory jsonFactory,
JSONOptions jsonOptions,
JsonInferSchema jsonInferSchema,
UTF8String json) throws IOException {
DataType schema;
try (JsonParser jsonParser = CreateJacksonParser.utf8String(jsonFactory, json)) {
jsonParser.nextToken();
// To match with schema inference from JSON datasource.
DataType inferSchema = jsonInferSchema.inferField(jsonParser);
if (inferSchema instanceof StructType) {
Option<DataType> canonicalType = jsonInferSchema.canonicalizeType(inferSchema, jsonOptions);
schema = canonicalType.isDefined() ?
canonicalType.get() : new StructType(new StructField[0]);
} else if (inferSchema instanceof ArrayType at && at.elementType() instanceof StructType et) {
Option<DataType> canonicalType = jsonInferSchema.canonicalizeType(et, jsonOptions).
map(dt -> ArrayType.apply(dt, at.containsNull()));
schema = canonicalType.isDefined() ? canonicalType.get() :
ArrayType.apply(new StructType(new StructField[0]), at.containsNull());
} else {
Option<DataType> canonicalType = jsonInferSchema.canonicalizeType(inferSchema, jsonOptions);
schema = canonicalType.isDefined() ?
canonicalType.get() : SQLConf.get().defaultStringType();
}
}

return UTF8String.fromString(schema.sql());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import org.apache.spark.sql.internal.types.{AbstractMapType, StringTypeWithCaseA
import org.apache.spark.sql.types.{DataType, MapType, StringType, StructType, VariantType}
import org.apache.spark.unsafe.types.UTF8String

object ExprUtils extends QueryErrorsBase {
object ExprUtils extends EvalHelper with QueryErrorsBase {

def evalTypeExpr(exp: Expression): DataType = {
if (exp.foldable) {
exp.eval() match {
prepareForEval(exp).eval() match {
case s: UTF8String if s != null =>
val dataType = DataType.parseTypeWithFallback(
s.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,9 @@ case class StructsToJson(
case class SchemaOfJson(
child: Expression,
options: Map[String, String])
extends UnaryExpression with CodegenFallback with QueryErrorsBase {
extends UnaryExpression
with RuntimeReplaceable
with QueryErrorsBase {

def this(child: Expression) = this(child, Map.empty[String, String])

Expand Down Expand Up @@ -919,26 +921,20 @@ case class SchemaOfJson(
}
}

override def eval(v: InternalRow): Any = {
val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
parser.nextToken()
// To match with schema inference from JSON datasource.
jsonInferSchema.inferField(parser) match {
case st: StructType =>
jsonInferSchema.canonicalizeType(st, jsonOptions).getOrElse(StructType(Nil))
case at: ArrayType if at.elementType.isInstanceOf[StructType] =>
jsonInferSchema
.canonicalizeType(at.elementType, jsonOptions)
.map(ArrayType(_, containsNull = at.containsNull))
.getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull))
case other: DataType =>
jsonInferSchema.canonicalizeType(other, jsonOptions).getOrElse(
SQLConf.get.defaultStringType)
}
}
@transient private lazy val jsonFactoryObjectType = ObjectType(classOf[JsonFactory])
@transient private lazy val jsonOptionsObjectType = ObjectType(classOf[JSONOptions])
@transient private lazy val jsonInferSchemaObjectType = ObjectType(classOf[JsonInferSchema])

UTF8String.fromString(dt.sql)
}
override def replacement: Expression = StaticInvoke(
classOf[JsonExpressionUtils],
dataType,
"schemaOfJson",
Seq(Literal(jsonFactory, jsonFactoryObjectType),
Literal(jsonOptions, jsonOptionsObjectType),
Literal(jsonInferSchema, jsonInferSchemaObjectType),
child),
Seq(jsonFactoryObjectType, jsonOptionsObjectType, jsonInferSchemaObjectType, child.dataType)
)

override def prettyName: String = "schema_of_json"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [schema_of_json([{"col":01}]) AS schema_of_json([{"col":01}])#0]
Project [static_invoke(JsonExpressionUtils.schemaOfJson(com.fasterxml.jackson.core.JsonFactory, org.apache.spark.sql.catalyst.json.JSONOptions, org.apache.spark.sql.catalyst.json.JsonInferSchema, [{"col":01}])) AS schema_of_json([{"col":01}])#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [schema_of_json([{"col":01}], (allowNumericLeadingZeros,true)) AS schema_of_json([{"col":01}])#0]
Project [static_invoke(JsonExpressionUtils.schemaOfJson(com.fasterxml.jackson.core.JsonFactory, org.apache.spark.sql.catalyst.json.JSONOptions, org.apache.spark.sql.catalyst.json.JsonInferSchema, [{"col":01}])) AS schema_of_json([{"col":01}])#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]

0 comments on commit c0eba3b

Please sign in to comment.