Skip to content

Commit

Permalink
Restore rdd argument to getJRDD
Browse files Browse the repository at this point in the history
  • Loading branch information
Chris Freeman committed Feb 26, 2015
1 parent c652b4c commit 3294949
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,18 @@ PipelinedRDD <- function(prev, func) {


# The jrdd accessor function.
setGeneric("getJRDD", function(x, ...) { standardGeneric("getJRDD") })
setMethod("getJRDD", signature(x = "RDD"), function(x) x@jrdd )
setMethod("getJRDD", signature(x = "PipelinedRDD"),
function(x, dataSerialization = TRUE) {
if (!is.null(x@env$jrdd_val)) {
return(x@env$jrdd_val)
setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") })
setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd )
setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
function(rdd, dataSerialization = TRUE) {
if (!is.null(rdd@env$jrdd_val)) {
return(rdd@env$jrdd_val)
}

# TODO: This is to handle anonymous functions. Find out a
# better way to do this.
computeFunc <- function(split, part) {
x@func(split, part)
rdd@func(split, part)
}
serializedFuncArr <- serialize("computeFunc", connection = NULL)

Expand All @@ -120,13 +120,13 @@ setMethod("getJRDD", signature(x = "PipelinedRDD"),

depsBin <- getDependencies(computeFunc)

prev_jrdd <- x@prev_jrdd
prev_jrdd <- rdd@prev_jrdd

if (dataSerialization) {
rddRef <- newJObject("edu.berkeley.cs.amplab.sparkr.RRDD",
callJMethod(prev_jrdd, "rdd"),
serializedFuncArr,
x@env$prev_serialized,
rdd@env$prev_serialized,
depsBin,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
Expand All @@ -136,17 +136,17 @@ setMethod("getJRDD", signature(x = "PipelinedRDD"),
rddRef <- newJObject("edu.berkeley.cs.amplab.sparkr.StringRRDD",
callJMethod(prev_jrdd, "rdd"),
serializedFuncArr,
x@env$prev_serialized,
rdd@env$prev_serialized,
depsBin,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
}
# Save the serialization flag after we create a RRDD
x@env$serialized <- dataSerialization
x@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD") # rddRef$asJavaRDD()
x@env$jrdd_val
rdd@env$serialized <- dataSerialization
rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD") # rddRef$asJavaRDD()
rdd@env$jrdd_val
})


Expand Down

0 comments on commit 3294949

Please sign in to comment.