Skip to content

Commit

Permalink
cool
Browse files Browse the repository at this point in the history
nit
  • Loading branch information
zhengruifeng committed Jul 15, 2024
1 parent b6c0525 commit 9ecd461
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_
import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry
import org.apache.spark.sql.connect.service.{ExecuteHolder, SessionHolder, SparkConnectService}
import org.apache.spark.sql.connect.utils.MetricGenerator
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.errors.{DataTypeErrors, QueryCompilationErrors}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.arrow.ArrowConverters
Expand Down Expand Up @@ -1973,6 +1973,40 @@ class SparkConnectPlanner(
None
}

case "from_xml" if Seq(2, 3).contains(fun.getArgumentsCount) =>
// XmlToStructs constructor doesn't accept JSON-formatted schema.
val children = fun.getArgumentsList.asScala.map(transformExpression)

var schema: DataType = null
children(1) match {
case Literal(s, StringType) if s != null =>
try {
schema = DataType.fromJson(s.toString)
} catch {
case _: Exception =>
}
case _ =>
}

if (schema != null) {
schema match {
case t: StructType => t
case _ => throw DataTypeErrors.failedParsingStructTypeError(schema.sql)
}

var options = Map.empty[String, String]
if (children.length == 3) {
options = extractMapData(children(2), "Options")
}
Some(
XmlToStructs(
schema = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType],
options = options,
child = children.head))
} else {
None
}

// Avro-specific functions
case "from_avro" if Seq(2, 3).contains(fun.getArgumentsCount) =>
val children = fun.getArgumentsList.asScala.map(transformExpression)
Expand Down
18 changes: 16 additions & 2 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16303,7 +16303,21 @@ def from_xml(
>>> df.select(sf.from_xml(df.value, schema).alias("xml")).collect()
[Row(xml=Row(a=1))]

Example 2: Parsing XML with :class:`ArrayType` in schema
Example 2: Parsing XML with a :class:`StructType` schema

>>> import pyspark.sql.functions as sf
>>> from pyspark.sql.types import StructType, LongType
>>> data = [(1, '''<p><a>1</a></p>''')]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> schema = StructType().add("a", LongType())
>>> df.select(sf.from_xml(df.value, schema)).show()
+---------------+
|from_xml(value)|
+---------------+
| {1}|
+---------------+

Example 3: Parsing XML with :class:`ArrayType` in schema

>>> import pyspark.sql.functions as sf
>>> data = [(1, '<p><a>1</a><a>2</a></p>')]
Expand All @@ -16314,7 +16328,7 @@ def from_xml(
>>> df.select(sf.from_xml(df.value, schema).alias("xml")).collect()
[Row(xml=Row(a=[1, 2]))]

Example 3: Parsing XML using :meth:`pyspark.sql.functions.schema_of_xml`
Example 4: Parsing XML using :meth:`pyspark.sql.functions.schema_of_xml`

>>> import pyspark.sql.functions as sf
>>> # Sample data with an XML column
Expand Down

0 comments on commit 9ecd461

Please sign in to comment.