Skip to content

Commit

Permalink
Merge pull request apache#187 from cafreeman/sparkr-sql
Browse files Browse the repository at this point in the history
Three more DataFrame methods
  • Loading branch information
shivaram committed Feb 28, 2015
2 parents 020bce8 + b0e7f73 commit 6134649
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,16 @@ importFrom(methods, setGeneric, setMethod, setOldClass)
exportClasses("DataFrame")

exportMethods("columns",
"distinct",
"dtypes",
"first",
"head",
"limit",
"names",
"printSchema",
"registerTempTable",
"repartition",
"sampleDF",
"schema",
"toRDD")

Expand Down
89 changes: 89 additions & 0 deletions pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,95 @@ setMethod("unpersist",
x
})

#' Repartition
#'
#' Return a new DataFrame that has exactly numPartitions partitions.
#'
#' @param x A SparkSQL DataFrame
#' @param numPartitions The number of partitions to use.
#' @rdname repartition
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' newDF <- repartition(df, 2L)
#'}

setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") })

#' @rdname repartition
#' @export
setMethod("repartition",
signature(x = "DataFrame", numPartitions = "numeric"),
function(x, numPartitions) {
sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
dataFrame(sdf)
})

#' Distinct
#'
#' Return a new DataFrame containing the distinct rows in this DataFrame.
#'
#' @param x A SparkSQL DataFrame
#' @rdname distinct
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' distinctDF <- distinct(df)
#'}

setMethod("distinct",
signature(x = "DataFrame"),
function(x) {
sdf <- callJMethod(x@sdf, "distinct")
dataFrame(sdf)
})

#' SampleDF
#'
#' Return a sampled subset of this DataFrame using a random seed.
#'
#' @param x A SparkSQL DataFrame
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#' @rdname sampleDF
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' collect(sampleDF(df, FALSE, 0.5))
#' collect(sampleDF(df, TRUE, 0.5))
#'}

setGeneric("sampleDF",
function(x, withReplacement, fraction, seed) {
standardGeneric("sampleDF")
})

#' @rdname sampleDF
#' @export

setMethod("sampleDF",
# TODO : Figure out how to send integer as java.lang.Long to JVM so
# we can send seed as an argument through callJMethod
signature(x = "DataFrame", withReplacement = "logical",
fraction = "numeric"),
function(x, withReplacement, fraction) {
if (fraction < 0.0) stop(cat("Negative fraction value:", fraction))
sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction)
dataFrame(sdf)
})

#' Count
#'
#' Returns the number of rows in a DataFrame
Expand Down
8 changes: 8 additions & 0 deletions pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,11 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
"OFF_HEAP" = SparkR:::callJStatic("org.apache.spark.storage.StorageLevel", "OFF_HEAP"))
}

# Utility function for functions where an argument needs to be integer but we want to allow
# the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
numToInt <- function(num) {
if (as.integer(num) != num) {
warning(paste("Coercing", as.list(sys.call())[[2]], "to integer."))
}
as.integer(num)
}
23 changes: 23 additions & 0 deletions pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,27 @@ test_that("head() and first() return the correct data", {
expect_true(nrow(testFirst) == 1)
})

test_that("distinct() on DataFrames", {
lines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}",
"{\"name\":\"Justin\", \"age\":19}")
jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPathWithDup)

df <- jsonFile(sqlCtx, jsonPathWithDup)
uniques <- distinct(df)
expect_true(inherits(uniques, "DataFrame"))
expect_true(count(uniques) == 3)
})

test_that("sampleDF on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
sampled <- sampleDF(df, FALSE, 1.0)
expect_equal(nrow(collect(sampled)), count(df))
expect_true(inherits(sampled, "DataFrame"))
sampled2 <- sampleDF(df, FALSE, 0.1)
expect_true(count(sampled2) < 3)
})

unlink(jsonPath)

0 comments on commit 6134649

Please sign in to comment.