diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 920e1b7f598de..cac2fa2efb6ca 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -54,6 +54,7 @@ exportMethods( "repartition", "rightOuterJoin", "sampleRDD", + "sampleByKey", "saveAsTextFile", "saveAsObjectFile", "sortBy", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 99e377cda4817..b07581c8ca922 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -262,6 +262,12 @@ setGeneric("mapValues", function(X, FUN) { standardGeneric("mapValues") }) #' @export setGeneric("values", function(x) { standardGeneric("values") }) +#' @rdname sampleByKey +#' @export +setGeneric("sampleByKey", + function(x, withReplacement, fractions, seed) { + standardGeneric("sampleByKey") + }) ############ Shuffle Functions ############ diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index c2396c32a7548..08406b90f907e 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -450,19 +450,19 @@ setMethod("combineByKey", }) #' Aggregate a pair RDD by each key. -#' +#' #' Aggregate the values of each key in an RDD, using given combine functions #' and a neutral "zero value". This function can return a different result type, #' U, than the type of the values in this RDD, V. Thus, we need one operation -#' for merging a V into a U and one operation for merging two U's, The former -#' operation is used for merging values within a partition, and the latter is -#' used for merging values between partitions. To avoid memory allocation, both -#' of these functions are allowed to modify and return their first argument +#' for merging a V into a U and one operation for merging two U's, The former +#' operation is used for merging values within a partition, and the latter is +#' used for merging values between partitions. To avoid memory allocation, both +#' of these functions are allowed to modify and return their first argument #' instead of creating a new U. -#' +#' #' @param x An RDD. #' @param zeroValue A neutral "zero value". -#' @param seqOp A function to aggregate the values of each key. It may return +#' @param seqOp A function to aggregate the values of each key. It may return #' a different result type from the type of the values. #' @param combOp A function to aggregate results of seqOp. #' @return An RDD containing the aggregation result. @@ -474,7 +474,7 @@ setMethod("combineByKey", #' zeroValue <- list(0, 0) #' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } #' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } -#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) +#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) #' # list(list(1, list(3, 2)), list(2, list(7, 2))) #'} #' @rdname aggregateByKey @@ -491,12 +491,12 @@ setMethod("aggregateByKey", }) #' Fold a pair RDD by each key. -#' +#' #' Aggregate the values of each key in an RDD, using an associative function "func" -#' and a neutral "zero value" which may be added to the result an arbitrary -#' number of times, and must not change the result (e.g., 0 for addition, or +#' and a neutral "zero value" which may be added to the result an arbitrary +#' number of times, and must not change the result (e.g., 0 for addition, or #' 1 for multiplication.). -#' +#' #' @param x An RDD. #' @param zeroValue A neutral "zero value". #' @param func An associative function for folding values of each key. @@ -546,11 +546,11 @@ setMethod("join", function(x, y, numPartitions) { xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) - + doJoin <- function(v) { joinTaggedList(v, list(FALSE, FALSE)) } - + joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)), doJoin) }) @@ -566,8 +566,8 @@ setMethod("join", #' @param y An RDD to be joined. Should be an RDD where each element is #' list(K, V). #' @param numPartitions Number of partitions to create. -#' @return For each element (k, v) in x, the resulting RDD will either contain -#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL)) +#' @return For each element (k, v) in x, the resulting RDD will either contain +#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL)) #' if no elements in rdd2 have key k. #' @examples #'\dontrun{ @@ -584,11 +584,11 @@ setMethod("leftOuterJoin", function(x, y, numPartitions) { xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) - + doJoin <- function(v) { joinTaggedList(v, list(FALSE, TRUE)) } - + joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) @@ -621,18 +621,18 @@ setMethod("rightOuterJoin", function(x, y, numPartitions) { xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) - + doJoin <- function(v) { joinTaggedList(v, list(TRUE, FALSE)) } - + joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) #' Full outer join two RDDs #' #' @description -#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V). +#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V). #' The key types of the two RDDs should be the same. #' #' @param x An RDD to be joined. Should be an RDD where each element is @@ -642,7 +642,7 @@ setMethod("rightOuterJoin", #' @param numPartitions Number of partitions to create. #' @return For each element (k, v) in x and (k, w) in y, the resulting RDD #' will contain all pairs (k, (v, w)) for both (k, v) in x and -#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements +#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements #' in x/y have key k. #' @examples #'\dontrun{ @@ -681,7 +681,7 @@ setMethod("fullOuterJoin", #' sc <- sparkR.init() #' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) #' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) -#' cogroup(rdd1, rdd2, numPartitions = 2L) +#' cogroup(rdd1, rdd2, numPartitions = 2L) #' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list())) #'} #' @rdname cogroup @@ -692,7 +692,7 @@ setMethod("cogroup", rdds <- list(...) rddsLen <- length(rdds) for (i in 1:rddsLen) { - rdds[[i]] <- lapply(rdds[[i]], + rdds[[i]] <- lapply(rdds[[i]], function(x) { list(x[[1]], list(i, x[[2]])) }) # TODO(hao): As issue [SparkR-142] mentions, the right value of i # will not be captured into UDF if getJRDD is not invoked. @@ -721,7 +721,7 @@ setMethod("cogroup", } }) } - cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions), + cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions), group.func) }) @@ -743,18 +743,18 @@ setMethod("sortByKey", signature(x = "RDD"), function(x, ascending = TRUE, numPartitions = SparkR::numPartitions(x)) { rangeBounds <- list() - + if (numPartitions > 1) { rddSize <- count(x) # constant from Spark's RangePartitioner maxSampleSize <- numPartitions * 20 fraction <- min(maxSampleSize / max(rddSize, 1), 1.0) - + samples <- collect(keys(sampleRDD(x, FALSE, fraction, 1L))) - + # Note: the built-in R sort() function only works on atomic vectors samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending) - + if (length(samples) > 0) { rangeBounds <- lapply(seq_len(numPartitions - 1), function(i) { @@ -766,24 +766,109 @@ setMethod("sortByKey", rangePartitionFunc <- function(key) { partition <- 0 - + # TODO: Use binary search instead of linear search, similar with Spark while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) { partition <- partition + 1 } - + if (ascending) { partition } else { numPartitions - partition - 1 } } - + partitionFunc <- function(part) { sortKeyValueList(part, decreasing = !ascending) } - + newRDD <- partitionBy(x, numPartitions, rangePartitionFunc) lapplyPartition(newRDD, partitionFunc) }) - + +#' @description +#' \code{sampleByKey} return a subset RDD of the given RDD sampled by key +#' +#' @param x The RDD to sample elements by key, where each element is +#' list(K, V) or c(K, V). +#' @param withReplacement Sampling with replacement or not +#' @param fraction The (rough) sample target fraction +#' @param seed Randomness seed value +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, 1:3000) +#' pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x) +#' else { if (x %% 3 == 1) list("b", x) else list("c", x) }}) +#' fractions <- list(a = 0.2, b = 0.1, c = 0.3) +#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) +#' 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE +#' 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE +#' 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE +#' lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE +#' lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE +#' lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE +#' lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE +#' lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE +#' lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE +#' fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4) +#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored +#' fractions <- list(a = 0.2, b = 0.1) +#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c" +#'} +#' @rdname sampleByKey +#' @aliases sampleByKey,RDD-method +setMethod("sampleByKey", + signature(x = "RDD", withReplacement = "logical", + fractions = "vector", seed = "integer"), + function(x, withReplacement, fractions, seed) { + + for (elem in fractions) { + if (elem < 0.0) { + stop(paste("Negative fraction value ", fractions[which(fractions == elem)])) + } + } + + # The sampler: takes a partition and returns its sampled version. + samplingFunc <- function(split, part) { + set.seed(bitwXor(seed, split)) + res <- vector("list", length(part)) + len <- 0 + + # mixing because the initial seeds are close to each other + runif(10) + + for (elem in part) { + if (elem[[1]] %in% names(fractions)) { + frac <- as.numeric(fractions[which(elem[[1]] == names(fractions))]) + if (withReplacement) { + count <- rpois(1, frac) + if (count > 0) { + res[(len + 1):(len + count)] <- rep(list(elem), count) + len <- len + count + } + } else { + if (runif(1) < frac) { + len <- len + 1 + res[[len]] <- elem + } + } + } else { + stop("KeyError: \"", elem[[1]], "\"") + } + } + + # TODO(zongheng): look into the performance of the current + # implementation. Look into some iterator package? Note that + # Scala avoids many calls to creating an empty list and PySpark + # similarly achieves this using `yield'. (duplicated from sampleRDD) + if (len > 0) { + res[1:len] + } else { + list() + } + } + + lapplyPartitionsWithIndex(x, samplingFunc) + }) diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R index fc0607ae1de60..c2acf15514f54 100644 --- a/R/pkg/inst/tests/test_rdd.R +++ b/R/pkg/inst/tests/test_rdd.R @@ -35,7 +35,7 @@ test_that("get number of partitions in RDD", { test_that("first on RDD", { expect_true(first(rdd) == 1) newrdd <- lapply(rdd, function(x) x + 1) - expect_true(first(newrdd) == 2) + expect_true(first(newrdd) == 2) }) test_that("count and length on RDD", { @@ -48,7 +48,7 @@ test_that("count by values and keys", { actual <- countByValue(mods) expected <- list(list(0, 3L), list(1, 4L), list(2, 3L)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) - + actual <- countByKey(intRdd) expected <- list(list(2L, 2L), list(1L, 2L)) expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) @@ -82,11 +82,11 @@ test_that("filterRDD on RDD", { filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 }) actual <- collect(filtered.rdd) expect_equal(actual, list(2, 4, 6, 8, 10)) - + filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd) actual <- collect(filtered.rdd) expect_equal(actual, list(list(1L, -1))) - + # Filter out all elements. filtered.rdd <- filterRDD(rdd, function(x) { x > 10 }) actual <- collect(filtered.rdd) @@ -96,7 +96,7 @@ test_that("filterRDD on RDD", { test_that("lookup on RDD", { vals <- lookup(intRdd, 1L) expect_equal(vals, list(-1, 200)) - + vals <- lookup(intRdd, 3L) expect_equal(vals, list()) }) @@ -110,7 +110,7 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", { }) rdd2 <- lapply(rdd2, function(x) x + x) actual <- collect(rdd2) - expected <- list(24, 24, 24, 24, 24, + expected <- list(24, 24, 24, 24, 24, 168, 170, 172, 174, 176) expect_equal(actual, expected) }) @@ -247,10 +247,10 @@ test_that("flatMapValues() on pairwise RDDs", { l <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4)))) actual <- collect(flatMapValues(l, function(x) { x })) expect_equal(actual, list(list(1,1), list(1,2), list(2,3), list(2,4))) - + # Generate x to x+1 for every value actual <- collect(flatMapValues(intRdd, function(x) { x:(x + 1) })) - expect_equal(actual, + expect_equal(actual, list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101), list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201))) }) @@ -347,7 +347,7 @@ test_that("top() on RDDs", { rdd <- parallelize(sc, l) actual <- top(rdd, 6L) expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6]) - + l <- list("e", "d", "c", "d", "a") rdd <- parallelize(sc, l) actual <- top(rdd, 3L) @@ -357,7 +357,7 @@ test_that("top() on RDDs", { test_that("fold() on RDDs", { actual <- fold(rdd, 0, "+") expect_equal(actual, Reduce("+", nums, 0)) - + rdd <- parallelize(sc, list()) actual <- fold(rdd, 0, "+") expect_equal(actual, 0) @@ -370,7 +370,7 @@ test_that("aggregateRDD() on RDDs", { combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp) expect_equal(actual, list(10, 4)) - + rdd <- parallelize(sc, list()) actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp) expect_equal(actual, list(0, 0)) @@ -379,13 +379,13 @@ test_that("aggregateRDD() on RDDs", { test_that("zipWithUniqueId() on RDDs", { rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) actual <- collect(zipWithUniqueId(rdd)) - expected <- list(list("a", 0), list("b", 3), list("c", 1), + expected <- list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2)) expect_equal(actual, expected) - + rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) actual <- collect(zipWithUniqueId(rdd)) - expected <- list(list("a", 0), list("b", 1), list("c", 2), + expected <- list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4)) expect_equal(actual, expected) }) @@ -393,13 +393,13 @@ test_that("zipWithUniqueId() on RDDs", { test_that("zipWithIndex() on RDDs", { rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) actual <- collect(zipWithIndex(rdd)) - expected <- list(list("a", 0), list("b", 1), list("c", 2), + expected <- list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4)) expect_equal(actual, expected) - + rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) actual <- collect(zipWithIndex(rdd)) - expected <- list(list("a", 0), list("b", 1), list("c", 2), + expected <- list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4)) expect_equal(actual, expected) }) @@ -426,12 +426,12 @@ test_that("pipeRDD() on RDDs", { actual <- collect(pipeRDD(rdd, "more")) expected <- as.list(as.character(1:10)) expect_equal(actual, expected) - + trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n")) actual <- collect(pipeRDD(trailed.rdd, "sort")) expected <- list("", "1", "2", "3") expect_equal(actual, expected) - + rev.nums <- 9:0 rev.rdd <- parallelize(sc, rev.nums, 2L) actual <- collect(pipeRDD(rev.rdd, "sort")) @@ -445,11 +445,11 @@ test_that("zipRDD() on RDDs", { actual <- collect(zipRDD(rdd1, rdd2)) expect_equal(actual, list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))) - + mockFile = c("Spark is pretty.", "Spark is awesome.") fileName <- tempfile(pattern="spark-test", fileext=".tmp") writeLines(mockFile, fileName) - + rdd <- textFile(sc, fileName, 1) actual <- collect(zipRDD(rdd, rdd)) expected <- lapply(mockFile, function(x) { list(x ,x) }) @@ -464,7 +464,7 @@ test_that("zipRDD() on RDDs", { actual <- collect(zipRDD(rdd, rdd1)) expected <- lapply(mockFile, function(x) { list(x, x) }) expect_equal(actual, expected) - + unlink(fileName) }) @@ -611,9 +611,9 @@ test_that("sortByKey() on pairwise RDDs", { sortedRdd3 <- sortByKey(rdd3) actual <- collect(sortedRdd3) expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) - + # test on the boundary cases - + # boundary case 1: the RDD to be sorted has only 1 partition rdd4 <- parallelize(sc, l, 1L) sortedRdd4 <- sortByKey(rdd4) @@ -638,7 +638,7 @@ test_that("sortByKey() on pairwise RDDs", { rdd7 <- parallelize(sc, l3, 2L) sortedRdd7 <- sortByKey(rdd7) actual <- collect(sortedRdd7) - expect_equal(actual, l3) + expect_equal(actual, l3) }) test_that("collectAsMap() on a pairwise RDD", { @@ -649,12 +649,36 @@ test_that("collectAsMap() on a pairwise RDD", { rdd <- parallelize(sc, list(list("a", 1), list("b", 2))) vals <- collectAsMap(rdd) expect_equal(vals, list(a = 1, b = 2)) - + rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4))) vals <- collectAsMap(rdd) expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4)) - + rdd <- parallelize(sc, list(list(1, "a"), list(2, "b"))) vals <- collectAsMap(rdd) expect_equal(vals, list(`1` = "a", `2` = "b")) }) + +test_that("sampleByKey() on pairwise RDDs", { + rdd <- parallelize(sc, 1:2000) + pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) }) + fractions <- list(a = 0.2, b = 0.1) + sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L) + expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE) + expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE) + expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE) + expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE) + expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE) + expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE) + + rdd <- parallelize(sc, 1:2000) + pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) }) + fractions <- list(`2` = 0.2, `3` = 0.1) + sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L) + expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE) + expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE) + expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE) + expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE) + expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE) + expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE) +})