diff --git a/pkg/R/SQLContext.R b/pkg/R/SQLContext.R index b9c17a31003a9..f5ca720b8d5c5 100644 --- a/pkg/R/SQLContext.R +++ b/pkg/R/SQLContext.R @@ -67,6 +67,8 @@ tojson <- function(x) { paste('"', x, '"', sep = '') } else if (is.logical(x)) { if (x) "true" else "false" + } else { + stop(paste("unexpected type:", class(x))) } } @@ -97,7 +99,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { # get rid of factor type dropFactor <- function(x) { if (is.factor(x)) { - levels(x)[x] + as.character(x) } else { x } @@ -108,11 +110,13 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { } if (is.list(data)) { sc <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "getJavaSparkContext", sqlCtx) - data <- parallelize(sc, data) + rdd <- parallelize(sc, data) + } else if (inherits(data, "RDD")) { + rdd <- data + } else { + stop(paste("unexpected type:", class(data))) } - stopifnot(inherits(data, "RDD")) - rdd <- data if (is.null(schema) || is.null(names(schema))) { row <- first(rdd) names <- if (is.null(schema)) { @@ -136,7 +140,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { stopifnot(class(schema) == "list") stopifnot(schema$type == "struct") stopifnot(class(schema$fields) == "list") - schemaString <- as.character(tojson(schema)) + schemaString <- tojson(schema) jrdd <- getJRDD(lapply(rdd, function(x) x), "row") srdd <- callJMethod(jrdd, "rdd") @@ -237,17 +241,15 @@ jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) { #' Loads a Parquet file, returning the result as a DataFrame. #' #' @param sqlCtx SQLContext to use -#' @param path Path of file to read. A vector of multiple paths is allowed. +#' @param ... Path(s) of parquet file(s) to read. #' @return DataFrame #' @export # TODO: Implement saveasParquetFile and write examples for both -parquetFile <- function(sqlCtx, path) { +parquetFile <- function(sqlCtx, ...) { # Allow the user to have a more flexible definiton of the text file path - path <- normalizePath(path) - # Convert a string vector of paths to a string containing comma separated paths - path <- paste(path, collapse = ",") - sdf <- callJMethod(sqlCtx, "parquetFile", path) + paths <- lapply(list(...), normalizePath) + sdf <- callJMethod(sqlCtx, "parquetFile", paths) dataFrame(sdf) } diff --git a/pkg/inst/tests/test_sparkSQL.R b/pkg/inst/tests/test_sparkSQL.R index 37d0ecb0b9e8c..d8096ddcd9678 100644 --- a/pkg/inst/tests/test_sparkSQL.R +++ b/pkg/inst/tests/test_sparkSQL.R @@ -648,15 +648,23 @@ test_that("withColumn() and withColumnRenamed()", { expect_true(columns(newDF2)[1] == "newerAge") }) -# TODO: Enable and test once the parquetFile PR has been merged -# test_that("saveAsParquetFile() on DataFrame and works with parquetFile", { -# df <- jsonFile(sqlCtx, jsonPath) -# parquetPath <- tempfile(pattern="spark-test", fileext=".tmp") -# saveAsParquetFile(df, parquetPath) -# parquetDF <- parquetFile(sqlCtx, parquetPath) -# expect_true(inherits(parquetDF, "DataFrame")) -# expect_equal(collect(df), collect(parquetDF)) -# unlink(parquetPath) -# }) +test_that("saveDF() on DataFrame and works with parquetFile", { + df <- jsonFile(sqlCtx, jsonPath) + saveDF(df, parquetPath, "parquet", mode="overwrite") + parquetDF <- parquetFile(sqlCtx, parquetPath) + expect_true(inherits(parquetDF, "DataFrame")) + expect_equal(collect(df), collect(parquetDF)) +}) + +test_that("parquetFile works with multiple input paths", { + df <- jsonFile(sqlCtx, jsonPath) + saveDF(df, parquetPath, "parquet", mode="overwrite") + parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") + saveDF(df, parquetPath2, "parquet", mode="overwrite") + parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2) + expect_true(inherits(parquetDF, "DataFrame")) + expect_true(count(parquetDF) == count(df)*2) +}) +unlink(parquetPath) unlink(jsonPath) diff --git a/pkg/src/build.sbt b/pkg/src/build.sbt index e750af13d16de..a1fb240d8d81a 100644 --- a/pkg/src/build.sbt +++ b/pkg/src/build.sbt @@ -29,7 +29,7 @@ libraryDependencies ++= Seq( val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop") val sbtYarnFlag = scala.util.Properties.envOrElse("USE_YARN", "") val defaultHadoopVersion = "1.0.4" - val defaultSparkVersion = "1.3.0-rc2" + val defaultSparkVersion = "1.3.0" val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", defaultHadoopVersion) val sparkVersion = scala.util.Properties.envOrElse("SPARK_VERSION", defaultSparkVersion) libraryDependencies ++= Seq( @@ -53,7 +53,6 @@ libraryDependencies ++= Seq( } resolvers ++= Seq( - "Apache Staging" at "https://repository.apache.org/content/repositories/staging/", "Typesafe" at "http://repo.typesafe.com/typesafe/releases", "Scala Tools Snapshots" at "http://scala-tools.org/repo-snapshots/", "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/", diff --git a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SerDe.scala b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SerDe.scala index 1de10b2a5053b..02b9c1dfe82d6 100644 --- a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SerDe.scala +++ b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SerDe.scala @@ -18,6 +18,8 @@ object SerDe { // logical -> Boolean // double, numeric -> Double // raw -> Array[Byte] + // Date -> Date + // POSIXlt/POSIXct -> Time // // list[T] -> Array[T], where T is one of above mentioned types // environment -> Map[String, T], where T is a native type @@ -79,6 +81,7 @@ object SerDe { val intVal = in.readInt() if (intVal == 0) false else true } + def readDate(in: DataInputStream) = { val d = in.readInt() new Date(d.toLong * 24 * 3600 * 1000) @@ -154,6 +157,8 @@ object SerDe { // Double -> double // Long -> double // Array[Byte] -> raw + // Date -> Date + // Time -> POSIXct // // Array[T] -> list() // Object -> jobj @@ -253,6 +258,7 @@ object SerDe { val intValue = if (value) 1 else 0 out.writeInt(intValue) } + def writeDate(out: DataOutputStream, value: Date) { out.writeInt((value.getTime / 1000 / 3600 / 24).toInt) } diff --git a/sparkR b/sparkR index 5987b58a7effa..ee25123041046 100755 --- a/sparkR +++ b/sparkR @@ -27,6 +27,7 @@ cat > /tmp/sparkR.profile << EOF projecHome <- Sys.getenv("PROJECT_HOME") Sys.setenv(NOAWT=1) .libPaths(c(paste(projecHome,"/lib", sep=""), .libPaths())) + require(utils) require(SparkR) sc <- sparkR.init(Sys.getenv("MASTER", unset = "")) assign("sc", sc, envir=.GlobalEnv)