diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 4b89b98f9164c..8d7a5ba59f96a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -169,7 +169,7 @@ case class LeftSemiJoinHash( def execute() = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => - val hashTable = new java.util.HashSet[Row]() + val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null // Create a Hash set of buildKeys @@ -177,16 +177,16 @@ case class LeftSemiJoinHash( currentRow = buildIter.next() val rowKey = buildSideKeyGenerator(currentRow) if(!rowKey.anyNull) { - val keyExists = hashTable.contains(rowKey) + val keyExists = hashSet.contains(rowKey) if (!keyExists) { - hashTable.add(rowKey) + hashSet.add(rowKey) } } } + val joinKeys = streamSideKeyGenerator() streamIter.filter(current => { - val joinKeys = streamSideKeyGenerator() - !joinKeys(current).anyNull && hashTable.contains(joinKeys.currentValue) + !joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue) }) } }