From 7b9348c8556fedbba4929cbb1484a5476e1f5de7 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 26 Feb 2014 19:12:39 -0800 Subject: [PATCH 1/2] Add tests for partition with string keys Add two tests one with a string array and one from a textFile to test both codepaths --- pkg/inst/tests/test_shuffle.R | 23 +++++++++++++++++++---- pkg/inst/tests/test_textFile.R | 16 ++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/pkg/inst/tests/test_shuffle.R b/pkg/inst/tests/test_shuffle.R index 6d1ea81b1a149..7bd413c3f9b9d 100644 --- a/pkg/inst/tests/test_shuffle.R +++ b/pkg/inst/tests/test_shuffle.R @@ -14,10 +14,8 @@ numPairs <- list(list(1L, 100), list(2L, 200), list(4L, -1), list(3L, 1), list(3L, 0)) numPairsRdd <- parallelize(sc, numPairs, length(numPairs)) -strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ", - "other times it helps me control the chaos.", - "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ", - "raising me. But they're both dead now. I didn't kill them. Honest.") +strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge and ", + "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ") strListRDD <- parallelize(sc, strList, 4) test_that("groupByKey for integers", { @@ -104,3 +102,20 @@ test_that("partitionBy works with dependencies", { expect_equal(actual_first, expected_first) expect_equal(actual_second, expected_second) }) + +test_that("test partitionBy with string keys", { + words <- flatMap(strListRDD, function(line) { strsplit(line, " ")[[1]] }) + wordCount <- lapply(words, function(word) { list(word, 1L) }) + + resultRDD <- partitionBy(wordCount, 2L) + expected_first <- list(list("Dexter", 1), list("Dexter", 1)) + expected_second <- list(list("and", 1), list("and", 1)) + + actual_first <- Filter(function(item) { item[[1]] == "Dexter" }, + collectPartition(resultRDD, 0L)) + actual_second <- Filter(function(item) { item[[1]] == "and" }, + collectPartition(resultRDD, 1L)) + + expect_equal(actual_first, expected_first) + expect_equal(actual_second, expected_second) +}) diff --git a/pkg/inst/tests/test_textFile.R b/pkg/inst/tests/test_textFile.R index 409caa9a8b544..b5a575452bfbc 100644 --- a/pkg/inst/tests/test_textFile.R +++ b/pkg/inst/tests/test_textFile.R @@ -26,3 +26,19 @@ test_that("textFile() followed by a collect() returns the same content", { unlink(fileName) }) + +test_that("textFile() word count works as expected", { + fileName <- tempfile(pattern="spark-test", fileext=".tmp") + writeLines(mockFile, fileName) + + rdd <- textFile(sc, fileName) + + words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] }) + wordCount <- lapply(words, function(word) { list(word, 1L) }) + + counts <- reduceByKey(wordCount, "+", 2L) + output <- collect(counts) + expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1), + list("Spark", 2)) + expect_equal(output, expected) +}) From 21fa2d8551c6d03957492ce79b49ec7856ce074a Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 26 Feb 2014 19:46:00 -0800 Subject: [PATCH 2/2] Fix bug where serialized was not changed for RRRD Reason: When an RRDD is created in getJRDD we have converted any possibly unserialized RDD to a serialized RDD. --- pkg/R/RDD.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/R/RDD.R b/pkg/R/RDD.R index 01fa671c70532..95c0b30bb2a5b 100644 --- a/pkg/R/RDD.R +++ b/pkg/R/RDD.R @@ -125,6 +125,8 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"), as.character(.sparkREnv[["libname"]]), broadcastArr, prev_jrdd$classTag()) + # The RDD is serialized after we create a RRDD + rdd@env$serialized <- TRUE rdd@env$jrdd_val <- rddRef$asJavaRDD() rdd@env$jrdd_val })