From 5292be71de95b4861a8eb31f59752ef949e98d2e Mon Sep 17 00:00:00 2001 From: hlin09 Date: Mon, 16 Feb 2015 16:05:11 -0500 Subject: [PATCH] Adds support of pipeRDD(). --- pkg/NAMESPACE | 1 + pkg/R/RDD.R | 37 +++++++++++++++++++++++++++++++++++++ pkg/inst/tests/test_rdd.R | 12 ++++++++++++ pkg/man/pipeRDD.Rd | 34 ++++++++++++++++++++++++++++++++++ 4 files changed, 84 insertions(+) create mode 100644 pkg/man/pipeRDD.Rd diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 1fa1336674750..dbe05ea3f8df2 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -41,6 +41,7 @@ exportMethods( "numPartitions", "partitionBy", "persist", + "pipeRDD", "reduce", "reduceByKey", "reduceByKeyLocally", diff --git a/pkg/R/RDD.R b/pkg/R/RDD.R index c54abae2666a2..8af5d3608f295 100644 --- a/pkg/R/RDD.R +++ b/pkg/R/RDD.R @@ -1275,6 +1275,43 @@ setMethod("aggregateRDD", Reduce(combOp, partitionList, zeroValue) }) +#' Pipes elements to a forked externel process. +#' +#' The same as 'pipe()' in Spark. +#' +#' @param rdd The RDD whose elements are piped to the forked externel process. +#' @param command The command to fork an externel process. +#' @param env A named list to set environment variables of the externel process. +#' @return A new RDD created by piping all elements to a forked externel process. +#' @rdname pipeRDD +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, 1:10) +#' collect(pipeRDD(rdd, "more") +#' Output: c("1", "2", ..., "10") +#'} +setGeneric("pipeRDD", function(rdd, command, env = list()) { + standardGeneric("pipeRDD") +}) + +#' @rdname pipeRDD +#' @aliases pipeRDD,RDD,character-method +setMethod("pipeRDD", + signature(rdd = "RDD", command = "character"), + function(rdd, command, env = list()) { + func <- function(part) { + trim.trailing.func <- function(x) { + sub("[\r\n]*$", "", toString(x)) + } + input <- unlist(lapply(part, trim.trailing.func)) + res <- system2(command, stdout = TRUE, input = input, env = env) + lapply(res, trim.trailing.func) + } + lapplyPartition(rdd, func) + }) + # TODO: Consider caching the name in the RDD's environment #' Return an RDD's name. #' diff --git a/pkg/inst/tests/test_rdd.R b/pkg/inst/tests/test_rdd.R index 89d7890fbf685..fa2a2959c6913 100644 --- a/pkg/inst/tests/test_rdd.R +++ b/pkg/inst/tests/test_rdd.R @@ -336,6 +336,18 @@ test_that("values() on RDDs", { expect_equal(actual, lapply(intPairs, function(x) { x[[2]] })) }) +test_that("pipeRDD() on RDDs", { + actual <- collect(pipeRDD(rdd, "more")) + expected <- as.list(as.character(1:10)) + expect_equal(actual, expected) + + rev.nums <- 9:0 + rev.rdd <- parallelize(sc, rev.nums, 2L) + actual <- collect(pipeRDD(rev.rdd, "sort")) + expected <- as.list(as.character(c(5:9, 0:4))) + expect_equal(actual, expected) +}) + test_that("join() on pairwise RDDs", { rdd1 <- parallelize(sc, list(list(1,1), list(2,4))) rdd2 <- parallelize(sc, list(list(1,2), list(1,3))) diff --git a/pkg/man/pipeRDD.Rd b/pkg/man/pipeRDD.Rd new file mode 100644 index 0000000000000..e930fe0a65a84 --- /dev/null +++ b/pkg/man/pipeRDD.Rd @@ -0,0 +1,34 @@ +% Generated by roxygen2 (4.1.0): do not edit by hand +% Please edit documentation in R/RDD.R +\docType{methods} +\name{pipeRDD} +\alias{pipeRDD} +\alias{pipeRDD,RDD,character-method} +\title{Pipes elements to a forked externel process.} +\usage{ +pipeRDD(rdd, command, env = list()) + +\S4method{pipeRDD}{RDD,character}(rdd, command, env = list()) +} +\arguments{ +\item{rdd}{The RDD whose elements are piped to the forked externel process.} + +\item{command}{The command to fork an externel process.} + +\item{env}{A named list to set environment variables of the externel process.} +} +\value{ +A new RDD created by piping all elements to a forked externel process. +} +\description{ +The same as 'pipe()' in Spark. +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd <- parallelize(sc, 1:10) +collect(pipeRDD(rdd, "more") +Output: c("1", "2", ..., "10") +} +} +