Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Mar 14, 2015
1 parent 4b1628d commit 70f620c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 162 deletions.
152 changes: 2 additions & 150 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -762,15 +762,11 @@ setMethod("first",
#' rdd <- parallelize(sc, c(1,2,2,3,3,3))
#' sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3)
#'}
setClassUnion("missingOrInteger", c("missing", "integer"))
#' @rdname distinct
#' @aliases distinct,RDD-method
setMethod("distinct",
signature(x = "RDD", numPartitions = "missingOrInteger"),
function(x, numPartitions) {
if (missing(numPartitions)) {
numPartitions <- SparkR::numPartitions(x)
}
signature(x = "RDD"),
function(x, numPartitions = SparkR::numPartitions(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
reduced <- reduceByKey(identical.mapped,
function(x, y) { x },
Expand Down Expand Up @@ -1273,149 +1269,6 @@ setMethod("name",
#'}
setGeneric("setName", function(x, name) { standardGeneric("setName") })

#' @rdname setName
#' @aliases setName,RDD
setMethod("setName",
signature(x = "RDD", name = "character"),
function(x, name) {
callJMethod(getJRDD(x), "setName", name)
x
})

#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
#'}
#' @rdname top
#' @aliases top,RDD,RDD-method
setMethod("top",
signature(x = "RDD", num = "integer"),
function(x, num) {
takeOrderedElem(x, num, FALSE)
})

#' Fold an RDD using a given associative function and a neutral "zero value".
#'
#' Aggregate the elements of each partition, and then the results for all the
#' partitions, using a given associative function and a neutral "zero value".
#'
#' @param x An RDD.
#' @param zeroValue A neutral "zero value".
#' @param op An associative function for the folding operation.
#' @return The folding result.
#' @seealso reduce
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
#' fold(rdd, 0, "+") # 15
#'}
#' @rdname fold
#' @aliases fold,RDD,RDD-method
setMethod("fold",
signature(x = "RDD", zeroValue = "ANY", op = "ANY"),
function(x, zeroValue, op) {
aggregateRDD(x, zeroValue, op, op)
})

#' Aggregate an RDD using the given combine functions and a neutral "zero value".
#'
#' Aggregate the elements of each partition, and then the results for all the
#' partitions, using given combine functions and a neutral "zero value".
#'
#' @param x An RDD.
#' @param zeroValue A neutral "zero value".
#' @param seqOp A function to aggregate the RDD elements. It may return a different
#' result type from the type of the RDD elements.
#' @param combOp A function to aggregate results of seqOp.
#' @return The aggregation result.
#' @seealso reduce
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4))
#' zeroValue <- list(0, 0)
#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
#' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
#'}
#' @rdname aggregateRDD
#' @aliases aggregateRDD,RDD,RDD-method
setMethod("aggregateRDD",
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
function(x, zeroValue, seqOp, combOp) {
partitionFunc <- function(part) {
Reduce(seqOp, part, zeroValue)
}

partitionList <- collect(lapplyPartition(x, partitionFunc),
flatten = FALSE)
Reduce(combOp, partitionList, zeroValue)
})

#' Pipes elements to a forked external process.
#'
#' The same as 'pipe()' in Spark.
#'
#' @param x The RDD whose elements are piped to the forked external process.
#' @param command The command to fork an external process.
#' @param env A named list to set environment variables of the external process.
#' @return A new RDD created by piping all elements to a forked external process.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' collect(pipeRDD(rdd, "more")
#' Output: c("1", "2", ..., "10")
#'}
#' @rdname pipeRDD
#' @aliases pipeRDD,RDD,character-method
setMethod("pipeRDD",
signature(x = "RDD", command = "character"),
function(x, command, env = list()) {
func <- function(part) {
trim.trailing.func <- function(x) {
sub("[\r\n]*$", "", toString(x))
}
input <- unlist(lapply(part, trim.trailing.func))
res <- system2(command, stdout = TRUE, input = input, env = env)
lapply(res, trim.trailing.func)
}
lapplyPartition(x, func)
})

# TODO: Consider caching the name in the RDD's environment
#' Return an RDD's name.
#'
#' @param x The RDD whose name is returned.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1,2,3))
#' name(rdd) # NULL (if not set before)
#'}
#' @rdname name
#' @aliases name,RDD
setMethod("name",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "name")
})

#' Set an RDD's name.
#'
#' @param x The RDD whose name is to be set.
#' @param name The RDD name to be set.
#' @return a new RDD renamed.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1,2,3))
#' setName(rdd, "myRDD")
#' name(rdd) # "myRDD"
#'}
#' @rdname setName
#' @aliases setName,RDD
setMethod("setName",
Expand Down Expand Up @@ -1518,7 +1371,6 @@ setMethod("zipWithIndex",

############ Binary Functions #############


#' Return the union RDD of two RDDs.
#' The same as union() in Spark.
#'
Expand Down
6 changes: 3 additions & 3 deletions pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,15 @@ setGeneric("join", function(x, y, ...) { standardGeneric("join") })

#' @rdname join-methods
#' @export
setGeneric("leftOuterJoin", function(x, y, ...) { standardGeneric("leftOuterJoin") })
setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") })

#' @rdname join-methods
#' @export
setGeneric("rightOuterJoin", function(x, y, ...) { standardGeneric("rightOuterJoin") })
setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("rightOuterJoin") })

#' @rdname join-methods
#' @export
setGeneric("fullOuterJoin", function(x, y, ...) { standardGeneric("fullOuterJoin") })
setGeneric("fullOuterJoin", function(x, y, numPartitions) { standardGeneric("fullOuterJoin") })

#' @rdname cogroup
#' @export
Expand Down
15 changes: 6 additions & 9 deletions pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ setMethod("join",
#' @rdname join-methods
#' @aliases leftOuterJoin,RDD,RDD-method
setMethod("leftOuterJoin",
signature(x = "RDD", y = "RDD"),
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand All @@ -572,8 +572,7 @@ setMethod("leftOuterJoin",
joinTaggedList(v, list(FALSE, TRUE))
}

joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
doJoin)
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})

#' Right outer join two RDDs
Expand Down Expand Up @@ -601,7 +600,7 @@ setMethod("leftOuterJoin",
#' @rdname join-methods
#' @aliases rightOuterJoin,RDD,RDD-method
setMethod("rightOuterJoin",
signature(x = "RDD", y = "RDD"),
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand All @@ -610,8 +609,7 @@ setMethod("rightOuterJoin",
joinTaggedList(v, list(TRUE, FALSE))
}

joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
doJoin)
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})

#' Full outer join two RDDs
Expand Down Expand Up @@ -642,7 +640,7 @@ setMethod("rightOuterJoin",
#' @rdname join-methods
#' @aliases fullOuterJoin,RDD,RDD-method
setMethod("fullOuterJoin",
signature(x = "RDD", y = "RDD"),
signature(x = "RDD", y = "RDD", numPartitions = "integer"),
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
Expand All @@ -651,8 +649,7 @@ setMethod("fullOuterJoin",
joinTaggedList(v, list(TRUE, TRUE))
}

joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
doJoin)
joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})

#' For each key k in several RDDs, return a resulting RDD that
Expand Down

0 comments on commit 70f620c

Please sign in to comment.