Skip to content

Commit

Permalink
Merge branch 'sparkr-sql' of github.com:amplab-extras/SparkR-pkg into…
Browse files Browse the repository at this point in the history
… merge
  • Loading branch information
davies committed Mar 14, 2015
2 parents bc2ff38 + 3139325 commit 4b1628d
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 23 deletions.
24 changes: 13 additions & 11 deletions pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ tojson <- function(x) {
paste('"', x, '"', sep = '')
} else if (is.logical(x)) {
if (x) "true" else "false"
} else {
stop(paste("unexpected type:", class(x)))
}
}

Expand Down Expand Up @@ -97,7 +99,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
# get rid of factor type
dropFactor <- function(x) {
if (is.factor(x)) {
levels(x)[x]
as.character(x)
} else {
x
}
Expand All @@ -108,11 +110,13 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
}
if (is.list(data)) {
sc <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "getJavaSparkContext", sqlCtx)
data <- parallelize(sc, data)
rdd <- parallelize(sc, data)
} else if (inherits(data, "RDD")) {
rdd <- data
} else {
stop(paste("unexpected type:", class(data)))
}
stopifnot(inherits(data, "RDD"))

rdd <- data
if (is.null(schema) || is.null(names(schema))) {
row <- first(rdd)
names <- if (is.null(schema)) {
Expand All @@ -136,7 +140,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
stopifnot(class(schema) == "list")
stopifnot(schema$type == "struct")
stopifnot(class(schema$fields) == "list")
schemaString <- as.character(tojson(schema))
schemaString <- tojson(schema)

jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
srdd <- callJMethod(jrdd, "rdd")
Expand Down Expand Up @@ -237,17 +241,15 @@ jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) {
#' Loads a Parquet file, returning the result as a DataFrame.
#'
#' @param sqlCtx SQLContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed.
#' @param ... Path(s) of parquet file(s) to read.
#' @return DataFrame
#' @export

# TODO: Implement saveasParquetFile and write examples for both
parquetFile <- function(sqlCtx, path) {
parquetFile <- function(sqlCtx, ...) {
# Allow the user to have a more flexible definiton of the text file path
path <- normalizePath(path)
# Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",")
sdf <- callJMethod(sqlCtx, "parquetFile", path)
paths <- lapply(list(...), normalizePath)
sdf <- callJMethod(sqlCtx, "parquetFile", paths)
dataFrame(sdf)
}

Expand Down
28 changes: 18 additions & 10 deletions pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -648,15 +648,23 @@ test_that("withColumn() and withColumnRenamed()", {
expect_true(columns(newDF2)[1] == "newerAge")
})

# TODO: Enable and test once the parquetFile PR has been merged
# test_that("saveAsParquetFile() on DataFrame and works with parquetFile", {
# df <- jsonFile(sqlCtx, jsonPath)
# parquetPath <- tempfile(pattern="spark-test", fileext=".tmp")
# saveAsParquetFile(df, parquetPath)
# parquetDF <- parquetFile(sqlCtx, parquetPath)
# expect_true(inherits(parquetDF, "DataFrame"))
# expect_equal(collect(df), collect(parquetDF))
# unlink(parquetPath)
# })
test_that("saveDF() on DataFrame and works with parquetFile", {
df <- jsonFile(sqlCtx, jsonPath)
saveDF(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath)
expect_true(inherits(parquetDF, "DataFrame"))
expect_equal(collect(df), collect(parquetDF))
})

test_that("parquetFile works with multiple input paths", {
df <- jsonFile(sqlCtx, jsonPath)
saveDF(df, parquetPath, "parquet", mode="overwrite")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
saveDF(df, parquetPath2, "parquet", mode="overwrite")
parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2)
expect_true(inherits(parquetDF, "DataFrame"))
expect_true(count(parquetDF) == count(df)*2)
})

unlink(parquetPath)
unlink(jsonPath)
3 changes: 1 addition & 2 deletions pkg/src/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ libraryDependencies ++= Seq(
val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
val sbtYarnFlag = scala.util.Properties.envOrElse("USE_YARN", "")
val defaultHadoopVersion = "1.0.4"
val defaultSparkVersion = "1.3.0-rc2"
val defaultSparkVersion = "1.3.0"
val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", defaultHadoopVersion)
val sparkVersion = scala.util.Properties.envOrElse("SPARK_VERSION", defaultSparkVersion)
libraryDependencies ++= Seq(
Expand All @@ -53,7 +53,6 @@ libraryDependencies ++= Seq(
}

resolvers ++= Seq(
"Apache Staging" at "https://repository.apache.org/content/repositories/staging/",
"Typesafe" at "http://repo.typesafe.com/typesafe/releases",
"Scala Tools Snapshots" at "http://scala-tools.org/repo-snapshots/",
"Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ object SerDe {
// logical -> Boolean
// double, numeric -> Double
// raw -> Array[Byte]
// Date -> Date
// POSIXlt/POSIXct -> Time
//
// list[T] -> Array[T], where T is one of above mentioned types
// environment -> Map[String, T], where T is a native type
Expand Down Expand Up @@ -79,6 +81,7 @@ object SerDe {
val intVal = in.readInt()
if (intVal == 0) false else true
}

def readDate(in: DataInputStream) = {
val d = in.readInt()
new Date(d.toLong * 24 * 3600 * 1000)
Expand Down Expand Up @@ -154,6 +157,8 @@ object SerDe {
// Double -> double
// Long -> double
// Array[Byte] -> raw
// Date -> Date
// Time -> POSIXct
//
// Array[T] -> list()
// Object -> jobj
Expand Down Expand Up @@ -253,6 +258,7 @@ object SerDe {
val intValue = if (value) 1 else 0
out.writeInt(intValue)
}

def writeDate(out: DataOutputStream, value: Date) {
out.writeInt((value.getTime / 1000 / 3600 / 24).toInt)
}
Expand Down
1 change: 1 addition & 0 deletions sparkR
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ cat > /tmp/sparkR.profile << EOF
projecHome <- Sys.getenv("PROJECT_HOME")
Sys.setenv(NOAWT=1)
.libPaths(c(paste(projecHome,"/lib", sep=""), .libPaths()))
require(utils)
require(SparkR)
sc <- sparkR.init(Sys.getenv("MASTER", unset = ""))
assign("sc", sc, envir=.GlobalEnv)
Expand Down

0 comments on commit 4b1628d

Please sign in to comment.