Skip to content

Commit

Permalink
merge with upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
lythesia committed Feb 15, 2015
2 parents d968664 + 5836650 commit 59e2d54
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ work/
SparkR-pkg.Rproj
*.o
*.so
# Eclipse Meta Files
.project
.classpath
4 changes: 4 additions & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
exportClasses("RDD")
exportClasses("Broadcast")
exportMethods(
"aggregateRDD",
"cache",
"checkpoint",
"cogroup",
Expand All @@ -17,6 +18,7 @@ exportMethods(
"filterRDD",
"flatMap",
"flatMapValues",
"fold",
"foreach",
"foreachPartition",
"fullOuterJoin",
Expand Down Expand Up @@ -49,7 +51,9 @@ exportMethods(
"sortBy",
"sortByKey",
"take",
"takeOrdered",
"takeSample",
"top",
"unionRDD",
"unpersist",
"value",
Expand Down
230 changes: 230 additions & 0 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,236 @@ setMethod("sortBy",
values(sortByKey(keyBy(rdd, func), ascending, numPartitions))
})

# Helper function to get first N elements from an RDD in the specified order.
# Param:
# rdd An RDD.
# num Number of elements to return.
# ascending A flag to indicate whether the sorting is ascending or descending.
# Return:
# A list of the first N elements from the RDD in the specified order.
#
takeOrderedElem <- function(rdd, num, ascending = TRUE) {
if (num <= 0L) {
return(list())
}

partitionFunc <- function(part) {
if (num < length(part)) {
# R limitation: order works only on primitive types!
ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
list(part[ord[1:num]])
} else {
list(part)
}
}

reduceFunc <- function(elems, part) {
newElems <- append(elems, part)
# R limitation: order works only on primitive types!
ord <- order(unlist(newElems, recursive = FALSE), decreasing = !ascending)
newElems[ord[1:num]]
}

newRdd <- mapPartitions(rdd, partitionFunc)
reduce(newRdd, reduceFunc)
}

#' Returns the first N elements from an RDD in ascending order.
#'
#' @param rdd An RDD.
#' @param num Number of elements to return.
#' @return The first N elements from the RDD in ascending order.
#' @rdname takeOrdered
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
#'}
setGeneric("takeOrdered", function(rdd, num) { standardGeneric("takeOrdered") })

#' @rdname takeOrdered
#' @aliases takeOrdered,RDD,RDD-method
setMethod("takeOrdered",
signature(rdd = "RDD", num = "integer"),
function(rdd, num) {
takeOrderedElem(rdd, num)
})

#' Returns the top N elements from an RDD.
#'
#' @param rdd An RDD.
#' @param num Number of elements to return.
#' @return The top N elements from the RDD.
#' @rdname top
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
#'}
setGeneric("top", function(rdd, num) { standardGeneric("top") })

#' @rdname top
#' @aliases top,RDD,RDD-method
setMethod("top",
signature(rdd = "RDD", num = "integer"),
function(rdd, num) {
takeOrderedElem(rdd, num, FALSE)
})

#' Fold an RDD using a given associative function and a neutral "zero value".
#'
#' Aggregate the elements of each partition, and then the results for all the
#' partitions, using a given associative function and a neutral "zero value".
#'
#' @param rdd An RDD.
#' @param zeroValue A neutral "zero value".
#' @param op An associative function for the folding operation.
#' @return The folding result.
#' @rdname fold
#' @seealso reduce
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
#' fold(rdd, 0, "+") # 15
#'}
setGeneric("fold", function(rdd, zeroValue, op) { standardGeneric("fold") })

#' @rdname fold
#' @aliases fold,RDD,RDD-method
setMethod("fold",
signature(rdd = "RDD", zeroValue = "ANY", op = "ANY"),
function(rdd, zeroValue, op) {
aggregateRDD(rdd, zeroValue, op, op)
})

#' Aggregate an RDD using the given combine functions and a neutral "zero value".
#'
#' Aggregate the elements of each partition, and then the results for all the
#' partitions, using given combine functions and a neutral "zero value".
#'
#' @param rdd An RDD.
#' @param zeroValue A neutral "zero value".
#' @param seqOp A function to aggregate the RDD elements. It may return a different
#' result type from the type of the RDD elements.
#' @param combOp A function to aggregate results of seqOp.
#' @return The aggregation result.
#' @rdname aggregateRDD
#' @seealso reduce
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4))
#' zeroValue <- list(0, 0)
#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
#'}
setGeneric("aggregateRDD", function(rdd, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })

#' @rdname aggregateRDD
#' @aliases aggregateRDD,RDD,RDD-method
setMethod("aggregateRDD",
signature(rdd = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
function(rdd, zeroValue, seqOp, combOp) {
partitionFunc <- function(part) {
Reduce(seqOp, part, zeroValue)
}

partitionList <- collect(lapplyPartition(rdd, partitionFunc),
flatten = FALSE)
Reduce(combOp, partitionList, zeroValue)
})

############ Shuffle Functions ############

#' Partition an RDD by key
#'
#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
#' For each element of this RDD, the partitioner is used to compute a hash
#' function and the RDD is partitioned using this hash value.
#'
#' @param rdd The RDD to partition. Should be an RDD where each element is
#' list(K, V) or c(K, V).
#' @param numPartitions Number of partitions to create.
#' @param ... Other optional arguments to partitionBy.
#'
#' @param partitionFunc The partition function to use. Uses a default hashCode
#' function if not provided
#' @return An RDD partitioned using the specified partitioner.
#' @rdname partitionBy
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
#' rdd <- parallelize(sc, pairs)
#' parts <- partitionBy(rdd, 2L)
#' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
#'}
setGeneric("partitionBy",
function(rdd, numPartitions, ...) {
standardGeneric("partitionBy")
})

#' @rdname partitionBy
#' @aliases partitionBy,RDD,integer-method
setMethod("partitionBy",
signature(rdd = "RDD", numPartitions = "integer"),
function(rdd, numPartitions, partitionFunc = hashCode) {

#if (missing(partitionFunc)) {
# partitionFunc <- hashCode
#}

depsBinArr <- getDependencies(partitionFunc)

serializedHashFuncBytes <- serialize(as.character(substitute(partitionFunc)),
connection = NULL,
ascii = TRUE)

packageNamesArr <- serialize(.sparkREnv$.packages,
connection = NULL,
ascii = TRUE)
broadcastArr <- lapply(ls(.broadcastNames), function(name) {
get(name, .broadcastNames) })
jrdd <- getJRDD(rdd)

# We create a PairwiseRRDD that extends RDD[(Array[Byte],
# Array[Byte])], where the key is the hashed split, the value is
# the content (key-val pairs).
pairwiseRRDD <- newJObject("edu.berkeley.cs.amplab.sparkr.PairwiseRRDD",
callJMethod(jrdd, "rdd"),
as.integer(numPartitions),
serializedHashFuncBytes,
rdd@env$serialized,
depsBinArr,
packageNamesArr,
as.character(.sparkREnv$libname),
broadcastArr,
callJMethod(jrdd, "classTag"))

# Create a corresponding partitioner.
rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
as.integer(numPartitions))

# Call partitionBy on the obtained PairwiseRDD.
javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
javaPairRDD <- callJMethod(javaPairRDD, "partitionBy", rPartitioner)

# Call .values() on the result to get back the final result, the
# shuffled acutal content key-val pairs.
r <- callJMethod(javaPairRDD, "values")

RDD(r, serialized = TRUE)
})

############ Binary Functions #############

#' Return the union RDD of two RDDs.
Expand Down
46 changes: 46 additions & 0 deletions pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,52 @@ test_that("sortBy() on RDDs", {
expect_equal(actual, as.list(nums))
})

test_that("takeOrdered() on RDDs", {
l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
rdd <- parallelize(sc, l)
actual <- takeOrdered(rdd, 6L)
expect_equal(actual, as.list(sort(unlist(l)))[1:6])

l <- list("e", "d", "c", "d", "a")
rdd <- parallelize(sc, l)
actual <- takeOrdered(rdd, 3L)
expect_equal(actual, as.list(sort(unlist(l)))[1:3])
})

test_that("top() on RDDs", {
l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
rdd <- parallelize(sc, l)
actual <- top(rdd, 6L)
expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])

l <- list("e", "d", "c", "d", "a")
rdd <- parallelize(sc, l)
actual <- top(rdd, 3L)
expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
})

test_that("fold() on RDDs", {
actual <- fold(rdd, 0, "+")
expect_equal(actual, Reduce("+", nums, 0))

rdd <- parallelize(sc, list())
actual <- fold(rdd, 0, "+")
expect_equal(actual, 0)
})

test_that("aggregateRDD() on RDDs", {
rdd <- parallelize(sc, list(1, 2, 3, 4))
zeroValue <- list(0, 0)
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
expect_equal(actual, list(10, 4))

rdd <- parallelize(sc, list())
actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp)
expect_equal(actual, list(0, 0))
})

test_that("keys() on RDDs", {
keys <- keys(intRdd)
actual <- collect(keys)
Expand Down
43 changes: 43 additions & 0 deletions pkg/man/aggregateRDD.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{aggregateRDD}
\alias{aggregateRDD}
\alias{aggregateRDD,RDD,RDD-method}
\alias{aggregateRDD,RDD-method}
\title{Aggregate an RDD using the given combine functions and a neutral "zero value".}
\usage{
aggregateRDD(rdd, zeroValue, seqOp, combOp)

\S4method{aggregateRDD}{RDD}(rdd, zeroValue, seqOp, combOp)
}
\arguments{
\item{rdd}{An RDD.}

\item{zeroValue}{A neutral "zero value".}

\item{seqOp}{A function to aggregate the RDD elements. It may return a different
result type from the type of the RDD elements.}

\item{combOp}{A function to aggregate results of seqOp.}
}
\value{
The aggregation result.
}
\description{
Aggregate the elements of each partition, and then the results for all the
partitions, using given combine functions and a neutral "zero value".
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(1, 2, 3, 4))
zeroValue <- list(0, 0)
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
}
}
\seealso{
reduce
}

Loading

0 comments on commit 59e2d54

Please sign in to comment.