Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Nov 26, 2018
1 parent 9df6274 commit b2bfb33
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 47 deletions.
6 changes: 3 additions & 3 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ displayTitle: Spark SQL Upgrading Guide

- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.

- In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independetly of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
- In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independetly of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.

- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.

- In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.

- In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a grouped dataset with key attribute wrongly named as "value", if the key is non-struct type, e.g. int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries weird. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behaviour is preserved under a newly added configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a default value of `false`.

- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy.
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be udefined.

## Upgrading From Spark SQL 2.3 to 2.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)

override def eval(input: InternalRow): Any = {
val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
val maps = children.map(_.eval(input).asInstanceOf[MapData])
if (maps.contains(null)) {
return null
}
Expand All @@ -561,12 +561,8 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
}

mapBuilder.reset()
var i = 0
while (i < maps.length) {
val map = maps(i)
for (map <- maps) {
mapBuilder.putAll(map.keyArray(), map.valueArray())
i += 1
}
mapBuilder.build()
}
Expand Down Expand Up @@ -648,7 +644,6 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres
|}
|ArrayData $finKeysName = $keyConcat($keyArgsName, (int) $numElementsName);
|ArrayData $finValsName = $valueConcat($valArgsName, (int) $numElementsName);
|$builderTerm.reset();
|${ev.value} = $builderTerm.from($finKeysName, $finValsName);
""".stripMargin

Expand Down Expand Up @@ -752,7 +747,6 @@ case class MapFromEntries(child: Expression) extends UnaryExpression {
}
}

mapBuilder.reset()
i = 0
while (i < numEntries) {
mapBuilder.put(entries.getStruct(i, 2))
Expand All @@ -769,7 +763,6 @@ case class MapFromEntries(child: Expression) extends UnaryExpression {
ctx.nullArrayElementsSaveExec(nullEntries, ev.isNull, c) {
s"""
|final int $numEntries = $c.numElements();
|$builderTerm.reset();
|for (int $i = 0; $i < $numEntries; $i++) {
| $builderTerm.put($c.getStruct($i, 2));
|}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ case class CreateMap(children: Seq[Expression]) extends Expression {
private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, dataType.valueType)

override def eval(input: InternalRow): Any = {
mapBuilder.reset()
var i = 0
while (i < keys.length) {
mapBuilder.put(keys(i).eval(input), values(i).eval(input))
Expand All @@ -191,7 +190,6 @@ case class CreateMap(children: Seq[Expression]) extends Expression {
$assignKeys
$allocationValueData
$assignValues
$builderTerm.reset();
final MapData ${ev.value} = $builderTerm.from($keyArrayData, $valueArrayData);
"""
ev.copy(code = code, isNull = FalseLiteral)
Expand Down Expand Up @@ -239,17 +237,13 @@ case class MapFromArrays(left: Expression, right: Expression)
override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
val keyArrayData = keyArray.asInstanceOf[ArrayData]
val valueArrayData = valueArray.asInstanceOf[ArrayData]
mapBuilder.reset()
mapBuilder.from(keyArrayData.copy(), valueArrayData.copy())
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
val builderTerm = ctx.addReferenceObj("mapBuilder", mapBuilder)
s"""
|$builderTerm.reset();
|${ev.value} = $builderTerm.from($keyArrayData.copy(), $valueArrayData.copy());
""".stripMargin
s"${ev.value} = $builderTerm.from($keyArrayData.copy(), $valueArrayData.copy());"
})
}

Expand Down Expand Up @@ -467,7 +461,6 @@ case class StringToMap(text: Expression, pairDelim: Expression, keyValueDelim: E
inputString.asInstanceOf[UTF8String].split(stringDelimiter.asInstanceOf[UTF8String], -1)
val keyValueDelimiterUTF8String = keyValueDelimiter.asInstanceOf[UTF8String]

mapBuilder.reset()
var i = 0
while (i < keyValues.length) {
val keyValueArray = keyValues(i).split(keyValueDelimiterUTF8String, 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,6 @@ case class TransformKeys(
resultKeys.update(i, result)
i += 1
}
mapBuilder.reset()
mapBuilder.from(resultKeys, map.valueArray())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.sql.catalyst.util
import scala.collection.mutable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, DataType, MapType}
import org.apache.spark.sql.types._

/**
* A builder of [[ArrayBasedMapData]], which fails if a null map key is detected, and removes
* duplicated map keys w.r.t. the last wins policy.
*/
class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable {
assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map")
assert(keyType != NullType, "map key cannot be null type.")

private lazy val keyToIndex = keyType match {
case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int]
Expand All @@ -44,12 +45,6 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
private lazy val keyGetter = InternalRow.getAccessor(keyType)
private lazy val valueGetter = InternalRow.getAccessor(valueType)

def reset(): Unit = {
keyToIndex.clear()
keys.clear()
values.clear()
}

def put(key: Any, value: Any): Unit = {
if (key == null) {
throw new RuntimeException("Cannot use null as map key.")
Expand All @@ -74,19 +69,6 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
put(keyGetter(entry, 0), valueGetter(entry, 1))
}

def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
if (keyArray.length != valueArray.length) {
throw new RuntimeException(
"The key array and value array of MapData must have the same length.")
}

var i = 0
while (i < keyArray.length) {
put(keyArray(i), valueArray(i))
i += 1
}
}

def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
if (keyArray.numElements() != valueArray.numElements()) {
throw new RuntimeException(
Expand All @@ -100,16 +82,34 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
}
}

private def reset(): Unit = {
keyToIndex.clear()
keys.clear()
values.clear()
}

/**
* Builds the result [[ArrayBasedMapData]] and reset this builder to free up the resources. The
* builder becomes fresh afterward and is ready to take input and build another map.
*/
def build(): ArrayBasedMapData = {
new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray))
val map = new ArrayBasedMapData(
new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray))
reset()
map
}

/**
* Builds a [[ArrayBasedMapData]] from the given key and value array and reset this builder. The
* builder becomes fresh afterward and is ready to take input and build another map.
*/
def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = {
assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.")
assert(keyToIndex.isEmpty, "'from' can only be called with a fresh ArrayBasedMapBuilder.")
putAll(keyArray, valueArray)
if (keyToIndex.size == keyArray.numElements()) {
// If there is no duplicated map keys, creates the MapData with the input key and value array,
// as they might already in unsafe format and are more efficient.
reset()
new ArrayBasedMapData(keyArray, valueArray)
} else {
build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite {
val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
builder.put(1, 1)
builder.put(InternalRow(2, 2))
builder.putAll(Array(3: Any), Array(3: Any))
builder.putAll(new GenericArrayData(Seq(4)), new GenericArrayData(Seq(4)))
builder.putAll(new GenericArrayData(Seq(3)), new GenericArrayData(Seq(3)))
val map = builder.build()
assert(map.numElements() == 4)
assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 1, 2 -> 2, 3 -> 3, 4 -> 4))
assert(map.numElements() == 3)
assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 1, 2 -> 2, 3 -> 3))
}

test("fail with null key") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ private[parquet] class ParquetRowConverter(
override def end(): Unit = {
// The parquet map may contains null or duplicated map keys. When it happens, the behavior is
// undefined.
// TODO (SPARK-26174): disallow it with a config.
updater.set(ArrayBasedMapData(currentKeys.toArray, currentValues.toArray))
}

Expand Down

0 comments on commit b2bfb33

Please sign in to comment.