Skip to content

Commit

Permalink
[SPARKR-150] phase 1: implement sortBy() and sortByKey().
Browse files Browse the repository at this point in the history
  • Loading branch information
Sun Rui committed Feb 4, 2015
1 parent 554bda0 commit d9da451
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ exportMethods(
"sampleRDD",
"saveAsTextFile",
"saveAsObjectFile",
"sortBy",
"sortByKey",
"take",
"takeSample",
"unionRDD",
Expand Down
106 changes: 106 additions & 0 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,40 @@ setMethod("flatMapValues",
flatMap(X, flatMapFunc)
})

#' Sort an RDD by the given key function.
#'
#' @param rdd An RDD to be sorted.
#' @param func A function used to compute the sort key for each element.
#' @param ascending A flag to indicate whether the sorting is ascending or descending.
#' @param numPartitions Number of partitions to create.
#' @return An RDD where all elements are sorted.
#' @rdname sortBy
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(3, 2, 1))
#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
#'}
setGeneric("sortBy", function(rdd, func, ascending, numPartitions) { standardGeneric("sortBy") })

setClassUnion("missingOrLogical", c("missing", "logical"))
#' @rdname sortBy
#' @aliases sortBy,RDD,RDD-method
setMethod("sortBy",
signature(rdd = "RDD", func = "function",
ascending = "missingOrLogical", numPartitions = "missingOrInteger"),
function(rdd, func, ascending, numPartitions) {
if (missing(ascending)) {
ascending = TRUE
}
if (missing(numPartitions)) {
numPartitions = SparkR::numPartitions(rdd)
}

values(sortByKey(keyBy(rdd, func), ascending, numPartitions))
})

############ Shuffle Functions ############

#' Partition an RDD by key
Expand Down Expand Up @@ -1796,6 +1830,78 @@ setMethod("cogroup",
group.func)
})

#' Sort an (k, v) pair RDD by k.
#'
#' @param rdd An (k, v) pair RDD to be sorted.
#' @param ascending A flag to indicate whether the sorting is ascending or descending.
#' @param numPartitions Number of partitions to create.
#' @return An RDD where all (k, v) pair elements are sorted.
#' @rdname sortByKey
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(list(3, 3), list(2, 2), list(1, 1)))
#' collect(sortByKey(rdd)) # list (list(1, 1), list(2, 2), list(3, 3))
#'}
setGeneric("sortByKey", function(rdd, ascending, numPartitions) { standardGeneric("sortByKey") })

#' @rdname sortByKey
#' @aliases sortByKey,RDD,RDD-method
setMethod("sortByKey",
signature(rdd = "RDD", ascending = "missingOrLogical", numPartitions = "missingOrInteger"),
function(rdd, ascending, numPartitions) {
if (missing(ascending)) {
ascending = TRUE
}
if (missing(numPartitions)) {
numPartitions = SparkR::numPartitions(rdd)
}

rangeBounds <- list()

if (numPartitions > 1) {
rddSize <- count(rdd)
# constant from Spark's RangePartitioner
maxSampleSize <- numPartitions * 20
fraction <- min(maxSampleSize / max(rddSize, 1), 1.0)

samples <- collect(keys(sampleRDD(rdd, FALSE, fraction, 1L)))

# Note: the built-in R sort() function only atomic vectors
samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending)

if (length(samples) > 0) {
rangeBounds <- lapply(seq_len(numPartitions - 1),
function(i) {
j <- ceiling(length(samples) * i / numPartitions)
samples[j]
})
}
}

rangePartitionFunc <- function(key) {
partition <- 0

while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) {
partition <- partition + 1
}

if (ascending) {
partition
} else {
numPartitions - partition - 1
}
}

partitionFunc <- function(part) {
sortKeyValueList(part, decreasing = !ascending)
}

newRDD <- partitionBy(rdd, numPartitions, rangePartitionFunc)
lapplyPartition(newRDD, partitionFunc)
})

# TODO: Consider caching the name in the RDD's environment
#' Return an RDD's name.
#'
Expand Down
4 changes: 2 additions & 2 deletions pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ initAccumulator <- function() {

# Utility function to sort a list of key value pairs
# Used in unit tests
sortKeyValueList <- function(kv_list) {
sortKeyValueList <- function(kv_list, decreasing = FALSE) {
keys <- sapply(kv_list, function(x) x[[1]])
kv_list[order(keys)]
kv_list[order(keys, decreasing = decreasing)]
}

# Utility function to generate compact R lists from grouped rdd
Expand Down
14 changes: 14 additions & 0 deletions pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ test_that("keyBy on RDDs", {
expect_equal(actual, lapply(nums, function(x) { list(func(x), x) }))
})

test_that("sortBy() on RDDs", {
sortedRdd <- sortBy(rdd, function(x) { x }, ascending = FALSE)
actual <- collect(sortedRdd)
expect_equal(actual, as.list(sort(nums, decreasing = TRUE)))
})

test_that("keys() on RDDs", {
keys <- keys(intRdd)
actual <- collect(keys)
Expand Down Expand Up @@ -373,3 +379,11 @@ test_that("fullOuterJoin() on pairwise RDDs", {
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})

test_that("sortByKey() on pairwise RDDs", {
numPairsRdd <- map(rdd, function(x) { list (x, x) })
sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE)
actual <- collect(sortedRdd)
numPairs <- lapply(nums, function(x) { list (x, x) })
expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE))
})
36 changes: 36 additions & 0 deletions pkg/man/sortBy.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{sortBy}
\alias{sortBy}
\alias{sortBy,RDD,RDD-method}
\alias{sortBy,RDD,function,missingOrLogical,missingOrInteger-method}
\title{Sort an RDD by the given key function.}
\usage{
sortBy(rdd, func, ascending, numPartitions)

\S4method{sortBy}{RDD,`function`,missingOrLogical,missingOrInteger}(rdd, func,
ascending, numPartitions)
}
\arguments{
\item{rdd}{An RDD to be sorted.}

\item{func}{A function used to compute the sort key for each element.}

\item{ascending}{A flag to indicate whether the sorting is ascending or descending.}

\item{numPartitions}{Number of partitions to create.}
}
\value{
An RDD where all elements are sorted.
}
\description{
Sort an RDD by the given key function.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(3, 2, 1))
collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
}
}

34 changes: 34 additions & 0 deletions pkg/man/sortByKey.Rd
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
% Generated by roxygen2 (4.0.2): do not edit by hand
\docType{methods}
\name{sortByKey}
\alias{sortByKey}
\alias{sortByKey,RDD,RDD-method}
\alias{sortByKey,RDD,missingOrLogical,missingOrInteger-method}
\title{Sort an (k, v) pair RDD by k.}
\usage{
sortByKey(rdd, ascending, numPartitions)

\S4method{sortByKey}{RDD,missingOrLogical,missingOrInteger}(rdd, ascending,
numPartitions)
}
\arguments{
\item{rdd}{An (k, v) pair RDD to be sorted.}

\item{ascending}{A flag to indicate whether the sorting is ascending or descending.}

\item{numPartitions}{Number of partitions to create.}
}
\value{
An RDD where all (k, v) pair elements are sorted.
}
\description{
Sort an (k, v) pair RDD by k.
}
\examples{
\dontrun{
sc <- sparkR.init()
rdd <- parallelize(sc, list(list(3, 3), list(2, 2), list(1, 1)))
collect(sortByKey(rdd)) # list (list(1, 1), list(2, 2), list(3, 3))
}
}

0 comments on commit d9da451

Please sign in to comment.