Skip to content

Commit

Permalink
[SPARK-45488][SQL] XML: Add support for value in 'rowTag' element
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

The following XML with rowTag 'book' will yield a schema with just "_id" column and not the value:

```
 <p><book id="1">Great Book</book> </p>
```

Let's parse value as well. The scope of this PR is to keep the rowTag's behavior of `valueTag` consistent with the inner objects.

### Why are the changes needed?

The semantics for attributes and `valueTag` should be consistent

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#43319 from shujingyang-db/rootlevel-valuetag.

Lead-authored-by: Shujing Yang <shujing.yang@databricks.com>
Co-authored-by: Shujing Yang <135740748+shujingyang-db@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
2 people authored and HyukjinKwon committed Oct 12, 2023
1 parent 5a00631 commit e69752a
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ class StaxXmlParser(
}
val parser = StaxXmlParserUtils.filteredReader(xml)
val rootAttributes = StaxXmlParserUtils.gatherRootAttributes(parser)
Some(convertObject(parser, schema, options, rootAttributes))
// A structure object is an attribute-only element
// if it only consists of attributes and valueTags.
val isRootAttributesOnly = schema.fields.forall { f =>
f.name == options.valueTag || f.name.startsWith(options.attributePrefix)
}
Some(convertObject(parser, schema, options, rootAttributes, isRootAttributesOnly))
} catch {
case e: SparkUpgradeException => throw e
case e@(_: RuntimeException | _: XMLStreamException | _: MalformedInputException
Expand Down Expand Up @@ -305,7 +310,8 @@ class StaxXmlParser(
parser: XMLEventReader,
schema: StructType,
options: XmlOptions,
rootAttributes: Array[Attribute] = Array.empty): InternalRow = {
rootAttributes: Array[Attribute] = Array.empty,
isRootAttributesOnly: Boolean = false): InternalRow = {
val row = new Array[Any](schema.length)
val nameToIndex = schema.map(_.name).zipWithIndex.toMap
// If there are attributes, then we process them first.
Expand Down Expand Up @@ -371,6 +377,13 @@ class StaxXmlParser(
badRecordException = badRecordException.orElse(Some(e))
}

case c: Characters if !c.isWhiteSpace && isRootAttributesOnly =>
nameToIndex.get(options.valueTag) match {
case Some(index) =>
row(index) = convertTo(c.getData, schema(index).dataType, options)
case None => // do nothing
}

case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,28 @@ private[sql] object XmlInferSchema {
dataTypes += inferredType
nameToDataType += (field -> dataTypes)

case c: Characters if !c.isWhiteSpace =>
// This can be an attribute-only object
val valueTagType = inferFrom(c.getData, options)
nameToDataType += options.valueTag -> ArrayBuffer(valueTagType)

case _: EndElement =>
shouldStop = StaxXmlParserUtils.checkEndElement(parser)

case _ => // do nothing
}
}
// A structure object is an attribute-only element
// if it only consists of attributes and valueTags.
// If not, we will remove the valueTag field from the schema
val attributesOnly = nameToDataType.forall {
case (fieldName, dataTypes) =>
dataTypes.length == 1 &&
(fieldName == options.valueTag || fieldName.startsWith(options.attributePrefix))
}
if (!attributesOnly) {
nameToDataType -= options.valueTag
}
// We need to manually merges the fields having the sames so that
// This can be inferred as ArrayType.
nameToDataType.foreach {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0"?>
<ROWSET>
<ROW>value1</ROW>
<ROW attr="attr1">value2</ROW>
<ROW>4<tag>5</tag></ROW>
<ROW><tag>6</tag>7</ROW>
<ROW attr="8"></ROW>
</ROWSET>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?xml version="1.0"?>
<ROWSET>
<ROW>value1</ROW>
<ROW attr="attr1">value2</ROW>
<ROW>
value3
<!-- this is a comment-->
</ROW>
</ROWSET>
Original file line number Diff line number Diff line change
Expand Up @@ -1645,4 +1645,79 @@ class XmlSuite extends QueryTest with SharedSparkSession {
.xml(getTestResourcePath(resDir + "fias_house.xml"))
assert(df.collect().length === 37)
}

test("SPARK-45488: root-level value tag for attributes-only object") {
val schema = buildSchema(field("_attr"), field("_VALUE"))
val results = Seq(
// user specified schema
spark.read
.schema(schema)
.xml(getTestResourcePath(resDir + "root-level-value.xml")).collect(),
// schema inference
spark.read
.xml(getTestResourcePath(resDir + "root-level-value.xml")).collect())
results.foreach { result =>
assert(result.length === 3)
assert(result(0).getAs[String]("_VALUE") == "value1")
assert(result(1).getAs[String]("_attr") == "attr1"
&& result(1).getAs[String]("_VALUE") == "value2")
// comments aren't included in valueTag
assert(result(2).getAs[String]("_VALUE") == "\n value3\n ")
}
}

test("SPARK-45488: root-level value tag for not attributes-only object") {
val ATTRIBUTE_NAME = "_attr"
val TAG_NAME = "tag"
val VALUETAG_NAME = "_VALUE"
val schema = buildSchema(
field(ATTRIBUTE_NAME),
field(TAG_NAME, LongType),
field(VALUETAG_NAME))
val dfs = Seq(
// user specified schema
spark.read
.schema(schema)
.xml(getTestResourcePath(resDir + "root-level-value-none.xml")),
// schema inference
spark.read
.xml(getTestResourcePath(resDir + "root-level-value-none.xml"))
)
dfs.foreach { df =>
val result = df.collect()
assert(result.length === 5)
assert(result(0).get(0) == null && result(0).get(1) == null)
assert(
result(1).getAs[String](ATTRIBUTE_NAME) == "attr1"
&& result(1).getAs[Any](TAG_NAME) == null
)
assert(
result(2).getAs[Long](TAG_NAME) == 5L
&& result(2).getAs[Any](ATTRIBUTE_NAME) == null
)
assert(
result(3).getAs[Long](TAG_NAME) == 6L
&& result(3).getAs[Any](ATTRIBUTE_NAME) == null
)
assert(
result(4).getAs[String](ATTRIBUTE_NAME) == "8"
&& result(4).getAs[Any](TAG_NAME) == null
)
}
}

test("SPARK-45488: root-level value tag for attributes-only object - from xml") {
val xmlData = """<ROW attr="attr1">123456</ROW>"""
val df = Seq((1, xmlData)).toDF("number", "payload")
val xmlSchema = schema_of_xml(xmlData)
val schema = buildSchema(
field("_VALUE", LongType),
field("_attr"))
val expectedSchema = df.schema.add("decoded", schema)
val result = df.withColumn("decoded",
from_xml(df.col("payload"), xmlSchema, Map[String, String]().asJava))
assert(expectedSchema == result.schema)
assert(result.select("decoded._VALUE").head().getLong(0) === 123456L)
assert(result.select("decoded._attr").head().getString(0) === "attr1")
}
}

0 comments on commit e69752a

Please sign in to comment.