Skip to content

Commit

Permalink
use buffer for only one side
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-wang committed Apr 11, 2015
1 parent 171001f commit 3af6ba5
Showing 1 changed file with 21 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,23 @@ case class SortMergeJoin(
private[this] var rightElement: Row = _
private[this] var leftKey: Row = _
private[this] var rightKey: Row = _
private[this] var leftMatches: CompactBuffer[Row] = _
private[this] var rightMatches: CompactBuffer[Row] = _
private[this] var leftPosition: Int = -1
private[this] var rightPosition: Int = -1
private[this] var stop: Boolean = false
private[this] var matchKey: Row = _

override final def hasNext: Boolean = leftPosition != -1 || nextMatchingPair
override final def hasNext: Boolean = nextMatchingPair()

override final def next(): Row = {
if (hasNext) {
val joinedRow = joinRow(leftMatches(leftPosition), rightMatches(rightPosition))
val joinedRow = joinRow(leftElement, rightMatches(rightPosition))
rightPosition += 1
if (rightPosition >= rightMatches.size) {
leftPosition += 1
rightPosition = 0
if (leftPosition >= leftMatches.size) {
leftPosition = -1
fetchLeft()
if (leftElement == null || ordering.compare(leftKey, matchKey) != 0) {
stop = false
rightMatches = null
}
}
joinedRow
Expand Down Expand Up @@ -130,9 +131,7 @@ case class SortMergeJoin(
* of tuples.
*/
private def nextMatchingPair(): Boolean = {
if (leftPosition == -1) {
leftMatches = null
var stop: Boolean = false
if (!stop && rightElement != null) {
while (!stop && leftElement != null && rightElement != null) {
stop = ordering.compare(leftKey, rightKey) == 0 && !leftKey.anyNull
if (ordering.compare(leftKey, rightKey) > 0 || rightKey.anyNull) {
Expand All @@ -142,27 +141,21 @@ case class SortMergeJoin(
}
}
rightMatches = new CompactBuffer[Row]()
while (stop && rightElement != null) {
rightMatches += rightElement
fetchRight()
// exit loop when run out of right matches
stop = ordering.compare(leftKey, rightKey) == 0
}
if (rightMatches.size > 0) {
leftMatches = new CompactBuffer[Row]()
val leftMatch = leftKey.copy()
while (ordering.compare(leftKey, leftMatch) == 0 && leftElement != null) {
leftMatches += leftElement
fetchLeft()
if (stop) {
stop = false
while (!stop && rightElement != null) {
rightMatches += rightElement
fetchRight()
// exit loop when run out of right matches
stop = ordering.compare(leftKey, rightKey) != 0
}
if (rightMatches.size > 0) {
rightPosition = 0
matchKey = leftKey
}
}

if (leftMatches != null) {
leftPosition = 0
rightPosition = 0
}
}
leftPosition > -1
rightMatches != null && rightMatches.size > 0
}
}
}
Expand Down

0 comments on commit 3af6ba5

Please sign in to comment.