Skip to content

Commit

Permalink
Merge pull request #234 from hqzizania/assist
Browse files Browse the repository at this point in the history
[SPARKR-163] Support sampleByKey()
Conflicts:
	pkg/R/pairRDD.R
  • Loading branch information
concretevitamin authored and Davies Liu committed Apr 14, 2015
1 parent ba54e34 commit 20b97a6
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 61 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ exportMethods(
"repartition",
"rightOuterJoin",
"sampleRDD",
"sampleByKey",
"saveAsTextFile",
"saveAsObjectFile",
"sortBy",
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ setGeneric("mapValues", function(X, FUN) { standardGeneric("mapValues") })
#' @export
setGeneric("values", function(x) { standardGeneric("values") })

#' @rdname sampleByKey
#' @export
setGeneric("sampleByKey",
function(x, withReplacement, fractions, seed) {
standardGeneric("sampleByKey")
})


############ Shuffle Functions ############
Expand Down
153 changes: 119 additions & 34 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -450,19 +450,19 @@ setMethod("combineByKey",
})

#' Aggregate a pair RDD by each key.
#'
#'
#' Aggregate the values of each key in an RDD, using given combine functions
#' and a neutral "zero value". This function can return a different result type,
#' U, than the type of the values in this RDD, V. Thus, we need one operation
#' for merging a V into a U and one operation for merging two U's, The former
#' operation is used for merging values within a partition, and the latter is
#' used for merging values between partitions. To avoid memory allocation, both
#' of these functions are allowed to modify and return their first argument
#' for merging a V into a U and one operation for merging two U's, The former
#' operation is used for merging values within a partition, and the latter is
#' used for merging values between partitions. To avoid memory allocation, both
#' of these functions are allowed to modify and return their first argument
#' instead of creating a new U.
#'
#'
#' @param x An RDD.
#' @param zeroValue A neutral "zero value".
#' @param seqOp A function to aggregate the values of each key. It may return
#' @param seqOp A function to aggregate the values of each key. It may return
#' a different result type from the type of the values.
#' @param combOp A function to aggregate results of seqOp.
#' @return An RDD containing the aggregation result.
Expand All @@ -474,7 +474,7 @@ setMethod("combineByKey",
#' zeroValue <- list(0, 0)
#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
#' # list(list(1, list(3, 2)), list(2, list(7, 2)))
#'}
#' @rdname aggregateByKey
Expand All @@ -491,12 +491,12 @@ setMethod("aggregateByKey",
})

#' Fold a pair RDD by each key.
#'
#'
#' Aggregate the values of each key in an RDD, using an associative function "func"
#' and a neutral "zero value" which may be added to the result an arbitrary
#' number of times, and must not change the result (e.g., 0 for addition, or
#' and a neutral "zero value" which may be added to the result an arbitrary
#' number of times, and must not change the result (e.g., 0 for addition, or
#' 1 for multiplication.).
#'
#'
#' @param x An RDD.
#' @param zeroValue A neutral "zero value".
#' @param func An associative function for folding values of each key.
Expand Down Expand Up @@ -546,11 +546,11 @@ setMethod("join",
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })

doJoin <- function(v) {
joinTaggedList(v, list(FALSE, FALSE))
}

joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)),
doJoin)
})
Expand All @@ -566,8 +566,8 @@ setMethod("join",
#' @param y An RDD to be joined. Should be an RDD where each element is
#' list(K, V).
#' @param numPartitions Number of partitions to create.
#' @return For each element (k, v) in x, the resulting RDD will either contain
#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
#' @return For each element (k, v) in x, the resulting RDD will either contain
#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
#' if no elements in rdd2 have key k.
#' @examples
#'\dontrun{
Expand All @@ -584,11 +584,11 @@ setMethod("leftOuterJoin",
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })

doJoin <- function(v) {
joinTaggedList(v, list(FALSE, TRUE))
}

joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})

Expand Down Expand Up @@ -621,18 +621,18 @@ setMethod("rightOuterJoin",
function(x, y, numPartitions) {
xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })

doJoin <- function(v) {
joinTaggedList(v, list(TRUE, FALSE))
}

joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
})

#' Full outer join two RDDs
#'
#' @description
#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
#' The key types of the two RDDs should be the same.
#'
#' @param x An RDD to be joined. Should be an RDD where each element is
Expand All @@ -642,7 +642,7 @@ setMethod("rightOuterJoin",
#' @param numPartitions Number of partitions to create.
#' @return For each element (k, v) in x and (k, w) in y, the resulting RDD
#' will contain all pairs (k, (v, w)) for both (k, v) in x and
#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
#' in x/y have key k.
#' @examples
#'\dontrun{
Expand Down Expand Up @@ -681,7 +681,7 @@ setMethod("fullOuterJoin",
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
#' cogroup(rdd1, rdd2, numPartitions = 2L)
#' cogroup(rdd1, rdd2, numPartitions = 2L)
#' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
#'}
#' @rdname cogroup
Expand All @@ -692,7 +692,7 @@ setMethod("cogroup",
rdds <- list(...)
rddsLen <- length(rdds)
for (i in 1:rddsLen) {
rdds[[i]] <- lapply(rdds[[i]],
rdds[[i]] <- lapply(rdds[[i]],
function(x) { list(x[[1]], list(i, x[[2]])) })
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
# will not be captured into UDF if getJRDD is not invoked.
Expand Down Expand Up @@ -721,7 +721,7 @@ setMethod("cogroup",
}
})
}
cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions),
cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions),
group.func)
})

Expand All @@ -743,18 +743,18 @@ setMethod("sortByKey",
signature(x = "RDD"),
function(x, ascending = TRUE, numPartitions = SparkR::numPartitions(x)) {
rangeBounds <- list()

if (numPartitions > 1) {
rddSize <- count(x)
# constant from Spark's RangePartitioner
maxSampleSize <- numPartitions * 20
fraction <- min(maxSampleSize / max(rddSize, 1), 1.0)

samples <- collect(keys(sampleRDD(x, FALSE, fraction, 1L)))

# Note: the built-in R sort() function only works on atomic vectors
samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending)

if (length(samples) > 0) {
rangeBounds <- lapply(seq_len(numPartitions - 1),
function(i) {
Expand All @@ -766,24 +766,109 @@ setMethod("sortByKey",

rangePartitionFunc <- function(key) {
partition <- 0

# TODO: Use binary search instead of linear search, similar with Spark
while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) {
partition <- partition + 1
}

if (ascending) {
partition
} else {
numPartitions - partition - 1
}
}

partitionFunc <- function(part) {
sortKeyValueList(part, decreasing = !ascending)
}

newRDD <- partitionBy(x, numPartitions, rangePartitionFunc)
lapplyPartition(newRDD, partitionFunc)
})


#' @description
#' \code{sampleByKey} return a subset RDD of the given RDD sampled by key
#'
#' @param x The RDD to sample elements by key, where each element is
#' list(K, V) or c(K, V).
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#' @param seed Randomness seed value
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:3000)
#' pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x)
#' else { if (x %% 3 == 1) list("b", x) else list("c", x) }})
#' fractions <- list(a = 0.2, b = 0.1, c = 0.3)
#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L)
#' 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE
#' 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE
#' 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE
#' lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE
#' lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE
#' lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE
#' lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE
#' lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE
#' lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE
#' fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4)
#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored
#' fractions <- list(a = 0.2, b = 0.1)
#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c"
#'}
#' @rdname sampleByKey
#' @aliases sampleByKey,RDD-method
setMethod("sampleByKey",
signature(x = "RDD", withReplacement = "logical",
fractions = "vector", seed = "integer"),
function(x, withReplacement, fractions, seed) {

for (elem in fractions) {
if (elem < 0.0) {
stop(paste("Negative fraction value ", fractions[which(fractions == elem)]))
}
}

# The sampler: takes a partition and returns its sampled version.
samplingFunc <- function(split, part) {
set.seed(bitwXor(seed, split))
res <- vector("list", length(part))
len <- 0

# mixing because the initial seeds are close to each other
runif(10)

for (elem in part) {
if (elem[[1]] %in% names(fractions)) {
frac <- as.numeric(fractions[which(elem[[1]] == names(fractions))])
if (withReplacement) {
count <- rpois(1, frac)
if (count > 0) {
res[(len + 1):(len + count)] <- rep(list(elem), count)
len <- len + count
}
} else {
if (runif(1) < frac) {
len <- len + 1
res[[len]] <- elem
}
}
} else {
stop("KeyError: \"", elem[[1]], "\"")
}
}

# TODO(zongheng): look into the performance of the current
# implementation. Look into some iterator package? Note that
# Scala avoids many calls to creating an empty list and PySpark
# similarly achieves this using `yield'. (duplicated from sampleRDD)
if (len > 0) {
res[1:len]
} else {
list()
}
}

lapplyPartitionsWithIndex(x, samplingFunc)
})
Loading

0 comments on commit 20b97a6

Please sign in to comment.