diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index f876c1a717844..9a33fcd62382b 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -85,6 +85,7 @@ importFrom(methods, setGeneric, setMethod, setOldClass) exportClasses("DataFrame") exportMethods("columns", + "distinct", "dtypes", "first", "head", @@ -92,6 +93,8 @@ exportMethods("columns", "names", "printSchema", "registerTempTable", + "repartition", + "sampleDF", "schema", "toRDD") diff --git a/pkg/R/DataFrame.R b/pkg/R/DataFrame.R index 2519b0595eeb7..9773598db578e 100644 --- a/pkg/R/DataFrame.R +++ b/pkg/R/DataFrame.R @@ -253,6 +253,95 @@ setMethod("unpersist", x }) +#' Repartition +#' +#' Return a new DataFrame that has exactly numPartitions partitions. +#' +#' @param x A SparkSQL DataFrame +#' @param numPartitions The number of partitions to use. +#' @rdname repartition +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- jsonFile(sqlCtx, path) +#' newDF <- repartition(df, 2L) +#'} + +setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") }) + +#' @rdname repartition +#' @export +setMethod("repartition", + signature(x = "DataFrame", numPartitions = "numeric"), + function(x, numPartitions) { + sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions)) + dataFrame(sdf) + }) + +#' Distinct +#' +#' Return a new DataFrame containing the distinct rows in this DataFrame. +#' +#' @param x A SparkSQL DataFrame +#' @rdname distinct +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- jsonFile(sqlCtx, path) +#' distinctDF <- distinct(df) +#'} + +setMethod("distinct", + signature(x = "DataFrame"), + function(x) { + sdf <- callJMethod(x@sdf, "distinct") + dataFrame(sdf) + }) + +#' SampleDF +#' +#' Return a sampled subset of this DataFrame using a random seed. +#' +#' @param x A SparkSQL DataFrame +#' @param withReplacement Sampling with replacement or not +#' @param fraction The (rough) sample target fraction +#' @rdname sampleDF +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- jsonFile(sqlCtx, path) +#' collect(sampleDF(df, FALSE, 0.5)) +#' collect(sampleDF(df, TRUE, 0.5)) +#'} + +setGeneric("sampleDF", + function(x, withReplacement, fraction, seed) { + standardGeneric("sampleDF") + }) + +#' @rdname sampleDF +#' @export + +setMethod("sampleDF", + # TODO : Figure out how to send integer as java.lang.Long to JVM so + # we can send seed as an argument through callJMethod + signature(x = "DataFrame", withReplacement = "logical", + fraction = "numeric"), + function(x, withReplacement, fraction) { + if (fraction < 0.0) stop(cat("Negative fraction value:", fraction)) + sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction) + dataFrame(sdf) + }) + #' Count #' #' Returns the number of rows in a DataFrame diff --git a/pkg/R/utils.R b/pkg/R/utils.R index 3ceb928ea9cac..55c5d85234533 100644 --- a/pkg/R/utils.R +++ b/pkg/R/utils.R @@ -326,3 +326,11 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY", "OFF_HEAP" = SparkR:::callJStatic("org.apache.spark.storage.StorageLevel", "OFF_HEAP")) } +# Utility function for functions where an argument needs to be integer but we want to allow +# the user to type (for example) `5` instead of `5L` to avoid a confusing error message. +numToInt <- function(num) { + if (as.integer(num) != num) { + warning(paste("Coercing", as.list(sys.call())[[2]], "to integer.")) + } + as.integer(num) +} diff --git a/pkg/inst/tests/test_sparkSQL.R b/pkg/inst/tests/test_sparkSQL.R index 21032a78494f8..644dd1e5070e5 100644 --- a/pkg/inst/tests/test_sparkSQL.R +++ b/pkg/inst/tests/test_sparkSQL.R @@ -197,4 +197,27 @@ test_that("head() and first() return the correct data", { expect_true(nrow(testFirst) == 1) }) +test_that("distinct() on DataFrames", { + lines <- c("{\"name\":\"Michael\"}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"Justin\", \"age\":19}", + "{\"name\":\"Justin\", \"age\":19}") + jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp") + writeLines(lines, jsonPathWithDup) + + df <- jsonFile(sqlCtx, jsonPathWithDup) + uniques <- distinct(df) + expect_true(inherits(uniques, "DataFrame")) + expect_true(count(uniques) == 3) +}) + +test_that("sampleDF on a DataFrame", { + df <- jsonFile(sqlCtx, jsonPath) + sampled <- sampleDF(df, FALSE, 1.0) + expect_equal(nrow(collect(sampled)), count(df)) + expect_true(inherits(sampled, "DataFrame")) + sampled2 <- sampleDF(df, FALSE, 0.1) + expect_true(count(sampled2) < 3) +}) + unlink(jsonPath)