Skip to content

Commit

Permalink
Merge pull request apache#212 from davies/toDF
Browse files Browse the repository at this point in the history
[SPARKR-192]  createDataFrame from rdd
  • Loading branch information
shivaram committed Mar 14, 2015
2 parents dd52cbc + 6122e0e commit 3139325
Show file tree
Hide file tree
Showing 14 changed files with 511 additions and 86 deletions.
2 changes: 2 additions & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ exportMethods("agg")

export("cacheTable",
"clearCache",
"createDataFrame",
"createExternalTable",
"dropTempTable",
"jsonFile",
Expand All @@ -157,6 +158,7 @@ export("cacheTable",
"table",
"tableNames",
"tables",
"toDF",
"uncacheTable")

export("sparkRSQL.init",
Expand Down
8 changes: 3 additions & 5 deletions pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
setMethod("showDF",
signature(x = "DataFrame"),
function(x, numRows = 20) {
cat(callJMethod(x@sdf, "showString", numToInt(numRows)))
cat(callJMethod(x@sdf, "showString", numToInt(numRows)), "\n")
})

setMethod("show", "DataFrame",
Expand Down Expand Up @@ -569,10 +569,8 @@ setMethod("collect",
close(objRaw)
col
})
colNames <- callJMethod(x@sdf, "columns")
names(cols) <- colNames
dfOut <- do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors))
dfOut
names(cols) <- columns(x)
do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors))
})

#' Limit
Expand Down
1 change: 1 addition & 0 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
callJMethod(prev_jrdd, "rdd"),
serializedFuncArr,
rdd@env$prev_serializedMode,
serializedMode,
depsBin,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
Expand Down
179 changes: 179 additions & 0 deletions pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,184 @@
# SQLcontext.R: SQLContext-driven functions

#' infer the SQL type
infer_type <- function(x) {
if (is.null(x)) {
stop("can not infer type from NULL")
}

# class of POSIXlt is c("POSIXlt" "POSIXt")
type <- switch(class(x)[[1]],
integer = "integer",
character = "string",
logical = "boolean",
double = "double",
numeric = "double",
raw = "binary",
list = "array",
environment = "map",
Date = "date",
POSIXlt = "timestamp",
POSIXct = "timestamp",
stop(paste("Unsupported type for DataFrame:", class(x))))

if (type == "map") {
stopifnot(length(x) > 0)
key <- ls(x)[[1]]
list(type = "map",
keyType = "string",
valueType = infer_type(get(key, x)),
valueContainsNull = TRUE)
} else if (type == "array") {
stopifnot(length(x) > 0)
names <- names(x)
if (is.null(names)) {
list(type = "array", elementType = infer_type(x[[1]]), containsNull = TRUE)
} else {
# StructType
types <- lapply(x, infer_type)
fields <- lapply(1:length(x), function(i) {
list(name = names[[i]], type = types[[i]], nullable = TRUE)
})
list(type = "struct", fields = fields)
}
} else if (length(x) > 1) {
list(type = "array", elementType = type, containsNull = TRUE)
} else {
type
}
}

#' dump the schema into JSON string
tojson <- function(x) {
if (is.list(x)) {
names <- names(x)
if (!is.null(names)) {
items <- lapply(names, function(n) {
safe_n <- gsub('"', '\\"', n)
paste(tojson(safe_n), ':', tojson(x[[n]]), sep = '')
})
d <- paste(items, collapse = ', ')
paste('{', d, '}', sep = '')
} else {
l <- paste(lapply(x, tojson), collapse = ', ')
paste('[', l, ']', sep = '')
}
} else if (is.character(x)) {
paste('"', x, '"', sep = '')
} else if (is.logical(x)) {
if (x) "true" else "false"
} else {
stop(paste("unexpected type:", class(x)))
}
}

#' Create a DataFrame from an RDD
#'
#' Converts an RDD to a DataFrame by infer the types.
#'
#' @param sqlCtx A SQLContext
#' @param data An RDD or list or data.frame
#' @param schema a list of column names or named list (StructType), optional
#' @return an DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
#' df <- createDataFrame(sqlCtx, rdd)
#' }

# TODO(davies): support sampling and infer type from NA
createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
if (is.data.frame(data)) {
# get the names of columns, they will be put into RDD
schema <- names(data)
n <- nrow(data)
m <- ncol(data)
# get rid of factor type
dropFactor <- function(x) {
if (is.factor(x)) {
as.character(x)
} else {
x
}
}
data <- lapply(1:n, function(i) {
lapply(1:m, function(j) { dropFactor(data[i,j]) })
})
}
if (is.list(data)) {
sc <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "getJavaSparkContext", sqlCtx)
rdd <- parallelize(sc, data)
} else if (inherits(data, "RDD")) {
rdd <- data
} else {
stop(paste("unexpected type:", class(data)))
}

if (is.null(schema) || is.null(names(schema))) {
row <- first(rdd)
names <- if (is.null(schema)) {
names(row)
} else {
as.list(schema)
}
if (is.null(names)) {
names <- lapply(1:length(row), function(x) {
paste("_", as.character(x), sep = "")
})
}

types <- lapply(row, infer_type)
fields <- lapply(1:length(row), function(i) {
list(name = names[[i]], type = types[[i]], nullable = TRUE)
})
schema <- list(type = "struct", fields = fields)
}

stopifnot(class(schema) == "list")
stopifnot(schema$type == "struct")
stopifnot(class(schema$fields) == "list")
schemaString <- tojson(schema)

jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
srdd <- callJMethod(jrdd, "rdd")
sdf <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "createDF",
srdd, schemaString, sqlCtx)
dataFrame(sdf)
}

#' toDF()
#'
#' Converts an RDD to a DataFrame by infer the types.
#'
#' @param x An RDD
#'
#' @rdname DataFrame
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
#' df <- toDF(rdd)
#' }

setGeneric("toDF", function(x, ...) { standardGeneric("toDF") })

setMethod("toDF", signature(x = "RDD"),
function(x, ...) {
sqlCtx <- if (exists(".sparkRHivesc", envir = .sparkREnv)) {
get(".sparkRHivesc", envir = .sparkREnv)
} else if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
get(".sparkRSQLsc", envir = .sparkREnv)
} else {
stop("no SQL context available")
}
createDataFrame(sqlCtx, x, ...)
})

#' Create a DataFrame from a JSON file.
#'
#' Loads a JSON file (one object per line), returning the result as a DataFrame
Expand Down
58 changes: 38 additions & 20 deletions pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
# Double -> double
# Long -> double
# Array[Byte] -> raw
# Date -> Date
# Time -> POSIXct
#
# Array[T] -> list()
# Object -> jobj
Expand All @@ -26,10 +28,12 @@ readTypedObject <- function(con, type) {
"b" = readBoolean(con),
"d" = readDouble(con),
"r" = readRaw(con),
"D" = readDate(con),
"t" = readTime(con),
"l" = readList(con),
"n" = NULL,
"j" = getJobj(readString(con)),
stop("Unsupported type for deserialization"))
stop(paste("Unsupported type for deserialization", type)))
}

readString <- function(con) {
Expand All @@ -54,6 +58,15 @@ readType <- function(con) {
rawToChar(readBin(con, "raw", n = 1L))
}

readDate <- function(con) {
as.Date(readInt(con), origin = "1970-01-01")
}

readTime <- function(con) {
t <- readDouble(con)
as.POSIXct(t, origin = "1970-01-01")
}

# We only support lists where all elements are of same type
readList <- function(con) {
type <- readType(con)
Expand Down Expand Up @@ -107,11 +120,12 @@ readDeserializeRows <- function(inputCon) {
# the number of rows varies, we put the readRow function in a while loop
# that termintates when the next row is empty.
data <- list()
numCols <- readInt(inputCon)
# We write a length for each row out
while(length(numCols) > 0 && numCols > 0) {
data[[length(data) + 1L]] <- readRow(inputCon, numCols)
numCols <- readInt(inputCon)
while(TRUE) {
row <- readRow(inputCon, numCols)
if (length(row) == 0) {
break
}
data[[length(data) + 1L]] <- row
}
data # this is a list of named lists now
}
Expand All @@ -122,28 +136,32 @@ readRowList <- function(obj) {
# the numCols bytes inside the read function in order to correctly
# deserialize the row.
rawObj <- rawConnection(obj, "r+")
numCols <- SparkR:::readInt(rawObj)
rowOut <- SparkR:::readRow(rawObj, numCols)
close(rawObj)
rowOut
on.exit(close(rawObj))
SparkR:::readRow(rawObj, numCols)
}

readRow <- function(inputCon, numCols) {
lapply(1:numCols, function(x) {
obj <- readObject(inputCon)
if (is.null(obj)) {
NA
} else {
obj
}
}) # each row is a list now
numCols <- readInt(inputCon)
if (length(numCols) > 0 && numCols > 0) {
lapply(1:numCols, function(x) {
obj <- readObject(inputCon)
if (is.null(obj)) {
NA
} else {
obj
}
}) # each row is a list now
} else {
list()
}
}

# Take a single column as Array[Byte] and deserialize it into an atomic vector
readCol <- function(inputCon, numRows) {
sapply(1:numRows, function(x) {
# sapply can not work with POSIXlt
do.call(c, lapply(1:numRows, function(x) {
value <- readObject(inputCon)
# Replace NULL with NA so we can coerce to vectors
if (is.null(value)) NA else value
}) # each column is an atomic vector now
}))
}
Loading

0 comments on commit 3139325

Please sign in to comment.