Skip to content

Commit

Permalink
Merge pull request apache#171 from hlin09/hlin09
Browse files Browse the repository at this point in the history
[SPARKR-159] Adds support of pipeRDD().
  • Loading branch information
shivaram committed Feb 18, 2015
2 parents 2271030 + 1f5a6ac commit 384e6e2
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ exportMethods(
"numPartitions",
"partitionBy",
"persist",
"pipeRDD",
"reduce",
"reduceByKey",
"reduceByKeyLocally",
Expand Down
37 changes: 37 additions & 0 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,43 @@ setMethod("aggregateRDD",
Reduce(combOp, partitionList, zeroValue)
})

#' Pipes elements to a forked external process.
#'
#' The same as 'pipe()' in Spark.
#'
#' @param rdd The RDD whose elements are piped to the forked external process.
#' @param command The command to fork an external process.
#' @param env A named list to set environment variables of the external process.
#' @return A new RDD created by piping all elements to a forked external process.
#' @rdname pipeRDD
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10)
#' collect(pipeRDD(rdd, "more")
#' Output: c("1", "2", ..., "10")
#'}
setGeneric("pipeRDD", function(rdd, command, env = list()) {
standardGeneric("pipeRDD")
})

#' @rdname pipeRDD
#' @aliases pipeRDD,RDD,character-method
setMethod("pipeRDD",
signature(rdd = "RDD", command = "character"),
function(rdd, command, env = list()) {
func <- function(part) {
trim.trailing.func <- function(x) {
sub("[\r\n]*$", "", toString(x))
}
input <- unlist(lapply(part, trim.trailing.func))
res <- system2(command, stdout = TRUE, input = input, env = env)
lapply(res, trim.trailing.func)
}
lapplyPartition(rdd, func)
})

# TODO: Consider caching the name in the RDD's environment
#' Return an RDD's name.
#'
Expand Down
17 changes: 17 additions & 0 deletions pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,23 @@ test_that("values() on RDDs", {
expect_equal(actual, lapply(intPairs, function(x) { x[[2]] }))
})

test_that("pipeRDD() on RDDs", {
actual <- collect(pipeRDD(rdd, "more"))
expected <- as.list(as.character(1:10))
expect_equal(actual, expected)

trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n"))
actual <- collect(pipeRDD(trailed.rdd, "sort"))
expected <- list("", "1", "2", "3")
expect_equal(actual, expected)

rev.nums <- 9:0
rev.rdd <- parallelize(sc, rev.nums, 2L)
actual <- collect(pipeRDD(rev.rdd, "sort"))
expected <- as.list(as.character(c(5:9, 0:4)))
expect_equal(actual, expected)
})

test_that("join() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,1), list(2,4)))
rdd2 <- parallelize(sc, list(list(1,2), list(1,3)))
Expand Down
34 changes: 34 additions & 0 deletions pkg/man/pipeRDD.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
% Generated by roxygen2 (4.1.0): do not edit by hand
% Please edit documentation in R/RDD.R
\docType{methods}
\name{pipeRDD}
\alias{pipeRDD}
\alias{pipeRDD,RDD,character-method}
\title{Pipes elements to a forked external process.}
\usage{
pipeRDD(rdd, command, env = list())

\S4method{pipeRDD}{RDD,character}(rdd, command, env = list())
}
\arguments{
\item{rdd}{The RDD whose elements are piped to the forked external process.}

\item{command}{The command to fork an external process.}

\item{env}{A named list to set environment variables of the external process.}
}
\value{
A new RDD created by piping all elements to a forked external process.
}
\description{
The same as 'pipe()' in Spark.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, 1:10)
collect(pipeRDD(rdd, "more")
Output: c("1", "2", ..., "10")
}
}

0 comments on commit 384e6e2

Please sign in to comment.