Skip to content

Commit

Permalink
Merge pull request #246 from hlin09/fixCombineByKey
Browse files Browse the repository at this point in the history
Fixes combineByKey
  • Loading branch information
shivaram authored and Davies Liu committed Apr 14, 2015
1 parent ed66c81 commit 7e8caa3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
4 changes: 2 additions & 2 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ setMethod("combineByKey",
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
item$hash <- as.character(item[[1]])
item$hash <- as.character(hashCode(item[[1]]))
updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
})
convertEnvsToList(keys, combiners)
Expand All @@ -441,7 +441,7 @@ setMethod("combineByKey",
pred <- function(item) exists(item$hash, keys)
lapply(part,
function(item) {
item$hash <- as.character(item[[1]])
item$hash <- as.character(hashCode(item[[1]]))
updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
})
convertEnvsToList(keys, combiners)
Expand Down
12 changes: 12 additions & 0 deletions R/pkg/inst/tests/test_shuffle.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ test_that("combineByKey for doubles", {
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})

test_that("combineByKey for characters", {
stringKeyRDD <- parallelize(sc,
list(list("max", 1L), list("min", 2L),
list("other", 3L), list("max", 4L)), 2L)
reduced <- combineByKey(stringKeyRDD,
function(x) { x }, "+", "+", 2L)
actual <- collect(reduced)

expected <- list(list("max", 5L), list("min", 2L), list("other", 3L))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
})

test_that("aggregateByKey", {
# test aggregateByKey for int keys
rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
Expand Down

0 comments on commit 7e8caa3

Please sign in to comment.