Skip to content

Commit

Permalink
Merge pull request #243 from hqzizania/master
Browse files Browse the repository at this point in the history
[SPARKR-199] Change takeOrdered, top to fetch one partition at a time
  • Loading branch information
concretevitamin authored and Davies Liu committed Apr 14, 2015
1 parent 136a07e commit b317aa7
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ setMethod("take",
index <- -1
jrdd <- getJRDD(x)
numPartitions <- numPartitions(x)
serializedModeRDD <- getSerializedMode(x)

# TODO(shivaram): Collect more than one partition based on size
# estimates similar to the scala version of `take`.
Expand All @@ -756,13 +757,14 @@ setMethod("take",
elems <- convertJListToRList(partition,
flatten = TRUE,
logicalUpperBound = size,
serializedMode = getSerializedMode(x))
# TODO: Check if this append is O(n^2)?
serializedMode = serializedModeRDD)

resList <- append(resList, elems)
}
resList
})


#' First
#'
#' Return the first element of an RDD
Expand Down Expand Up @@ -1100,21 +1102,42 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
if (num < length(part)) {
# R limitation: order works only on primitive types!
ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
list(part[ord[1:num]])
part[ord[1:num]]
} else {
list(part)
part
}
}

reduceFunc <- function(elems, part) {
newElems <- append(elems, part)
# R limitation: order works only on primitive types!
ord <- order(unlist(newElems, recursive = FALSE), decreasing = !ascending)
newElems[ord[1:num]]
}

newRdd <- mapPartitions(x, partitionFunc)
reduce(newRdd, reduceFunc)

resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
numPartitions <- numPartitions(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)

while (TRUE) {
index <- index + 1

if (index >= numPartitions) {
ord <- order(unlist(resList, recursive = FALSE), decreasing = !ascending)
resList <- resList[ord[1:num]]
break
}

# a JList of byte arrays
partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
partition <- partitionArr[[1]]

# elems is capped to have at most `num` elements
elems <- convertJListToRList(partition,
flatten = TRUE,
logicalUpperBound = num,
serializedMode = serializedModeRDD)

resList <- append(resList, elems)
}
resList
}

#' Returns the first N elements from an RDD in ascending order.
Expand Down

0 comments on commit b317aa7

Please sign in to comment.