-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25829][SQL] remove duplicated map keys with last wins policy #23124
Conversation
case _: MapType => (input, ordinal) => input.getMap(ordinal) | ||
case u: UserDefinedType[_] => getAccessor(u.sqlType) | ||
case _ => (input, ordinal) => input.get(ordinal, dataType) | ||
def getAccessor(dt: DataType, nullable: Boolean = true): (SpecializedGetters, Int) => Any = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can move it to a new PR if others think it's necessary. It's a little dangerous to ask the caller side to take care of null values.
val isKeyPrimitive = CodeGenerator.isPrimitiveType(dataType.keyType) | ||
val isValuePrimitive = CodeGenerator.isPrimitiveType(dataType.valueType) | ||
val code = if (isKeyPrimitive && isValuePrimitive) { | ||
genCodeForPrimitiveElements(ctx, c, ev.value, numEntries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear how we can keep this optimization if we need to remove duplicated keys. Personally I don't think it's worth the effort to keep such a complex optimization for non-critial code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change allows us to focus on optimizing ArrayBasedMapBuilder
in another PR.
* duplicated map keys w.r.t. the last wins policy. | ||
*/ | ||
class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Serializable { | ||
assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map cannot be/contain map") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add assert to prevent NullType
here, too?
docs/sql-migration-guide-upgrade.md
Outdated
@@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide | |||
|
|||
- In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful. | |||
|
|||
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we merge this with the above sentence at line 20? Both are different, but are related very strongly. In fact, it's a change of Map
semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are related, but they are not the same. For example, we don't support map type as key, because we can't check equality of map type correctly. This is just a current implementation limitation, and we may relax it in the future.
Duplicated map keys is a real problem and we will never allow it.
override def end(): Unit = | ||
override def end(): Unit = { | ||
// The parquet map may contains null or duplicated map keys. When it happens, the behavior is | ||
// undefined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about creating a Spark JIRA issue for this and embedded that ID here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
case _ => | ||
// for complex types, use interpreted ordering to be able to compare unsafe data with safe | ||
// data, e.g. UnsafeRow vs GenericInternalRow. | ||
mutable.TreeMap.empty[Any, Int](TypeUtils.getInterpretedOrdering(keyType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala> sql("select map(null,2)")
res1: org.apache.spark.sql.DataFrame = [map(NULL, 2): map<null,int>]
scala> sql("select map(null,2)").collect
scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
at org.apache.spark.sql.catalyst.util.TypeUtils$.getInterpretedOrdering(TypeUtils.scala:67)
at org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.keyToIndex$lzycompute(ArrayBasedMapBuilder.scala:37)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should fail it at analyzer phase, and other map-producing functions should do it as well. Can you create a JIRA for it? thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After merging this PR, I'll check again and file a JIRA for that.
Test build #99209 has finished for PR 23124 at commit
|
override def eval(input: InternalRow): Any = { | ||
val maps = children.map(_.eval(input)) | ||
val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need toArray
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to access it by index below, turn it to array so that the access is guaranteed to be O(1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, my understanding is that we could do a maps.foreach
instead of accessing them by index. I don't see the index to be significant at all, but maybe I am missing something...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in scala, while loop is faster than foreach
. If you look at Expression.eval
implementations, we use while loop a lot even foreach
can produce simpler code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, if it's not true anymore with scala 2.12, we should update them together with a bechmark, instead of only updating this single one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but converting toArray
may require an extra O(N) operation for the copy, so I am not sure the difference between while
and foreach
is significant enough to cover the overhead of the copy...
|} | ||
|if ($numElementsName > ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is not really correct, as we are not considering duplicates IIUC. I think we can change this behavior using putAll
and checking the size in the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is done before the putAll
, so that it can fail fast. I think it's fine to ignore duplicated keys here, to make it a more conservative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but we could do the putAll before and eventually fail when we reach the limit. We can maybe do that in a followup, though, as it is not introducing any regression..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup. I actually did what you proposed at first, and then realized it's different from before and may introduce perf regression. We can investigate it in a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I agree doing it in a followup, thanks.
|
||
// write a 2-field row, the first field is key and the second field is value. | ||
def put(entry: InternalRow): Unit = { | ||
if (entry.isNullAt(0)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is checked only here and not in all the other put...I think we should be consistent and either check it always or never do it..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 put methods have this null check and other put methods all go through them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see now, I missed it, thanks.
retest this please. |
} | ||
|
||
def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = { | ||
assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArrayBasedMapBuilder
instead of GenericMapBuilder
?
docs/sql-migration-guide-upgrade.md
Outdated
@@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide | |||
|
|||
- In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful. | |||
|
|||
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar as above, shall we also mention data source can have be read with duplicated map keys?
put(keyGetter(entry, 0), valueGetter(entry, 1)) | ||
} | ||
|
||
def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this method been used? Looks like only another putAll
below is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good catch!
If not too verbose, we can update the |
Test build #99213 has finished for PR 23124 at commit
|
|
||
def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData = { | ||
assert(keyToIndex.isEmpty, "'from' can only be called with a fresh GenericMapBuilder.") | ||
putAll(keyArray, valueArray) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call new ArrayBasedMapData(keyArray, valueArray)
without calling putAll(keyArray, valueArray)
if keyArray.asInstanceOf[ArrayData].containsNull
is false? This is a kind of optimizations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no we can't, as we still need to detect duplicated keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you are right.
if (keyToIndex.size == keyArray.numElements()) { | ||
// If there is no duplicated map keys, creates the MapData with the input key and value array, | ||
// as they might already in unsafe format and are more efficient. | ||
new ArrayBasedMapData(keyArray, valueArray) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto in build
} | ||
|
||
def build(): ArrayBasedMapData = { | ||
new ArrayBasedMapData(new GenericArrayData(keys.toArray), new GenericArrayData(values.toArray)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to call reset() after calling new ArrayBasedMapData to reduce memory consumption in Java heap?
At caller side, ArrayBasedMapBuilder is not released. Therefore, until reset() will be called next time, each ArrayBasedMapBuilder keeps unused data in keys, values, and keyToIndex. They consumes Java heap unexpectedly.
Test build #99276 has finished for PR 23124 at commit
|
@@ -89,7 +89,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { | |||
val msg1 = intercept[Exception] { | |||
df5.select(map_from_arrays($"k", $"v")).collect | |||
}.getMessage | |||
assert(msg1.contains("Cannot use null as map key!")) | |||
assert(msg1.contains("Cannot use null as map key")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Message at Line 98 is also changed now.
Test build #99304 has finished for PR 23124 at commit
|
Test build #99310 has finished for PR 23124 at commit
|
retest this please |
Test build #99312 has finished for PR 23124 at commit
|
Test build #99325 has finished for PR 23124 at commit
|
assert(keyType != NullType, "map key cannot be null type.") | ||
|
||
private lazy val keyToIndex = keyType match { | ||
case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: I had a test lying around from when I worked on map_concat. With this PR:
- map_concat of two small maps (20 string keys per map, no dups) for 2M rows is about 17% slower.
- map_concat of two big maps (500 string keys per map, no dups) for 1M rows is about 25% slower.
The baseline code is the same branch as the PR, but without the 4 commits.
Some cost makes sense, as we're checking for dups, but it's odd that the overhead grows disproportionately as the size of the maps grows.
I remember that at one time, mutable.HashMap had some performance issues (rumor has it, anyway). So as a test, I modified ArrayBasedMapBuilder.scala to use java.util.Hashmap instead. After that:
- map_concat of two small maps (20 string keys per map, no dups) for 2M rows is about 12% slower.
- map_concat of two big maps (500 string keys per map, no dups) for 1M rows is about 15% slower.
It's a little more proportionate. I don't know if switching HashMap implementations would have some negative consequences.
Also, my test is a dumb benchmark that uses System.currentTimeMillis concatenating simple [String,Integer] maps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for performance critical code path we should prefer java collection. thanks for pointing it out!
assert(keyType != NullType, "map key cannot be null type.") | ||
|
||
private lazy val keyToIndex = keyType match { | ||
case _: AtomicType | _: CalendarIntervalType => mutable.HashMap.empty[Any, Int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to exclude BinaryType
from AtomicType
here.
assert(map.numElements() == 2) | ||
assert(ArrayBasedMapData.toScalaMap(map) == | ||
Map(new GenericArrayData(Seq(1, 1)) -> 3, new GenericArrayData(Seq(2, 2)) -> 2)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a binary type key test as well?
Test build #99357 has finished for PR 23124 at commit
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
* A simple `MapData` implementation which is backed by 2 arrays. | ||
* | ||
* Note that, user is responsible to guarantee that the key array does not have duplicated | ||
* elements, otherwise the behavior is undefined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we might need to add the same note to the 3rd and 4th ArrayBasedMapData.apply()
method.
LGTM too |
Test build #99362 has finished for PR 23124 at commit
|
Test build #99368 has finished for PR 23124 at commit
|
thanks, merging to master! |
Thank you so much, @cloud-fan ! |
@@ -27,6 +27,8 @@ displayTitle: Spark SQL Upgrading Guide | |||
|
|||
- In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but users can still distinguish them via `Dataset.show`, `Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 internally, and users can't distinguish them any more. | |||
|
|||
- In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, these built-in functions will remove duplicated map keys with last wins policy. Users may still read map values with duplicated keys from data sources which do not enforce it (e.g. Parquet), the behavior will be udefined. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few typos.
In Spark version 2.4 and earlier, users can create a map with duplicate keys via built-in functions like
CreateMap
andStringToMap
. The behavior of map with duplicate keys is undefined. For example, the map lookup respects the duplicate key that appears first,Dataset.collect
only keeps the duplicate key that appears last, andMapKeys
returns duplicate keys. Since Spark 3.0, these built-in functions will remove duplicate map keys using the last-one-wins policy. Users may still read map values with duplicate keys from the data sources that do not enforce it (e.g. Parquet), but the behavior will be undefined.
## What changes were proposed in this pull request? Currently duplicated map keys are not handled consistently. For example, map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. This PR proposes to remove duplicated map keys with last wins policy, to follow Java/Scala and Presto. It only applies to built-in functions, as users can create map with duplicated map keys via private APIs anyway. updated functions: `CreateMap`, `MapFromArrays`, `MapFromEntries`, `StringToMap`, `MapConcat`, `TransformKeys`. For other places: 1. data source v1 doesn't have this problem, as users need to provide a java/scala map, which can't have duplicated keys. 2. data source v2 may have this problem. I've added a note to `ArrayBasedMapData` to ask the caller to take care of duplicated keys. In the future we should enforce it in the stable data APIs for data source v2. 3. UDF doesn't have this problem, as users need to provide a java/scala map. Same as data source v1. 4. file format. I checked all of them and only parquet does not enforce it. For backward compatibility reasons I change nothing but leave a note saying that the behavior will be undefined if users write map with duplicated keys to parquet files. Maybe we can add a config and fail by default if parquet files have map with duplicated keys. This can be done in followup. ## How was this patch tested? updated tests and new tests Closes apache#23124 from cloud-fan/map. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…perly the limit size ## What changes were proposed in this pull request? The PR starts from the [comment](apache#23124 (comment)) in the main one and it aims at: - simplifying the code for `MapConcat`; - be more precise in checking the limit size. ## How was this patch tested? existing tests Closes apache#23217 from mgaido91/SPARK-25829_followup. Authored-by: Marco Gaido <marcogaido91@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…s` and change the default behavior ### What changes were proposed in this pull request? This is a follow-up for #23124, add a new config `spark.sql.legacy.allowDuplicatedMapKeys` to control the behavior of removing duplicated map keys in build-in functions. With the default value `false`, Spark will throw a RuntimeException while duplicated keys are found. ### Why are the changes needed? Prevent silent behavior changes. ### Does this PR introduce any user-facing change? Yes, new config added and the default behavior for duplicated map keys changed to RuntimeException thrown. ### How was this patch tested? Modify existing UT. Closes #27478 from xuanyuanking/SPARK-25892-follow. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…s` and change the default behavior ### What changes were proposed in this pull request? This is a follow-up for #23124, add a new config `spark.sql.legacy.allowDuplicatedMapKeys` to control the behavior of removing duplicated map keys in build-in functions. With the default value `false`, Spark will throw a RuntimeException while duplicated keys are found. ### Why are the changes needed? Prevent silent behavior changes. ### Does this PR introduce any user-facing change? Yes, new config added and the default behavior for duplicated map keys changed to RuntimeException thrown. ### How was this patch tested? Modify existing UT. Closes #27478 from xuanyuanking/SPARK-25892-follow. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit ab186e3) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…s` and change the default behavior ### What changes were proposed in this pull request? This is a follow-up for apache#23124, add a new config `spark.sql.legacy.allowDuplicatedMapKeys` to control the behavior of removing duplicated map keys in build-in functions. With the default value `false`, Spark will throw a RuntimeException while duplicated keys are found. ### Why are the changes needed? Prevent silent behavior changes. ### Does this PR introduce any user-facing change? Yes, new config added and the default behavior for duplicated map keys changed to RuntimeException thrown. ### How was this patch tested? Modify existing UT. Closes apache#27478 from xuanyuanking/SPARK-25892-follow. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently duplicated map keys are not handled consistently. For example, map look up respects the duplicated key appears first,
Dataset.collect
only keeps the duplicated key appears last,MapKeys
returns duplicated keys, etc.This PR proposes to remove duplicated map keys with last wins policy, to follow Java/Scala and Presto. It only applies to built-in functions, as users can create map with duplicated map keys via private APIs anyway.
updated functions:
CreateMap
,MapFromArrays
,MapFromEntries
,StringToMap
,MapConcat
,TransformKeys
.For other places:
ArrayBasedMapData
to ask the caller to take care of duplicated keys. In the future we should enforce it in the stable data APIs for data source v2.How was this patch tested?
updated tests and new tests