Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45190][SPARK-48897][PYTHON][CONNECT] Make from_xml support StructType schema #47355

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_
import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry
import org.apache.spark.sql.connect.service.{ExecuteHolder, SessionHolder, SparkConnectService}
import org.apache.spark.sql.connect.utils.MetricGenerator
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.errors.{DataTypeErrors, QueryCompilationErrors}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.arrow.ArrowConverters
Expand Down Expand Up @@ -1973,6 +1973,40 @@ class SparkConnectPlanner(
None
}

case "from_xml" if Seq(2, 3).contains(fun.getArgumentsCount) =>
// XmlToStructs constructor doesn't accept JSON-formatted schema.
val children = fun.getArgumentsList.asScala.map(transformExpression)

var schema: DataType = null
children(1) match {
case Literal(s, StringType) if s != null =>
try {
schema = DataType.fromJson(s.toString)
} catch {
case _: Exception =>
}
case _ =>
}

if (schema != null) {
schema match {
case t: StructType => t
case _ => throw DataTypeErrors.failedParsingStructTypeError(schema.sql)
}

var options = Map.empty[String, String]
if (children.length == 3) {
options = extractMapData(children(2), "Options")
}
Some(
XmlToStructs(
schema = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType],
options = options,
child = children.head))
} else {
None
}

// Avro-specific functions
case "from_avro" if Seq(2, 3).contains(fun.getArgumentsCount) =>
val children = fun.getArgumentsList.asScala.map(transformExpression)
Expand Down
18 changes: 16 additions & 2 deletions python/pyspark/sql/functions/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16303,7 +16303,21 @@ def from_xml(
>>> df.select(sf.from_xml(df.value, schema).alias("xml")).collect()
[Row(xml=Row(a=1))]

Example 2: Parsing XML with :class:`ArrayType` in schema
Example 2: Parsing XML with a :class:`StructType` schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this. Can you please reuse the existing jira #SPARK-45190?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I was not aware of that ticket, will also add it to the title

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can you please enable tests here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


>>> import pyspark.sql.functions as sf
>>> from pyspark.sql.types import StructType, LongType
>>> data = [(1, '''<p><a>1</a></p>''')]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> schema = StructType().add("a", LongType())
>>> df.select(sf.from_xml(df.value, schema)).show()
+---------------+
|from_xml(value)|
+---------------+
| {1}|
+---------------+

Example 3: Parsing XML with :class:`ArrayType` in schema

>>> import pyspark.sql.functions as sf
>>> data = [(1, '<p><a>1</a><a>2</a></p>')]
Expand All @@ -16314,7 +16328,7 @@ def from_xml(
>>> df.select(sf.from_xml(df.value, schema).alias("xml")).collect()
[Row(xml=Row(a=[1, 2]))]

Example 3: Parsing XML using :meth:`pyspark.sql.functions.schema_of_xml`
Example 4: Parsing XML using :meth:`pyspark.sql.functions.schema_of_xml`

>>> import pyspark.sql.functions as sf
>>> # Sample data with an XML column
Expand Down