Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6807] [SparkR] Merge recent SparkR-pkg changes #5436

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0e2a94f
Define functions for schema and fields
Mar 30, 2015
7dd81b7
Documentation
Mar 30, 2015
8b76e81
Merge pull request #233 from redbaron/fail-early-on-missing-dep
shivaram Mar 30, 2015
cd66603
new line at EOF
Mar 30, 2015
136a07e
Merge pull request #242 from hqzizania/stats
concretevitamin Apr 3, 2015
b317aa7
Merge pull request #243 from hqzizania/master
concretevitamin Apr 4, 2015
c9497a3
Merge pull request #208 from lythesia/master
shivaram Apr 7, 2015
ba54e34
Merge pull request #238 from sun-rui/SPARKR-154_4
shivaram Apr 7, 2015
20b97a6
Merge pull request #234 from hqzizania/assist
concretevitamin Apr 8, 2015
40338a4
Merge pull request #244 from sun-rui/SPARKR-154_5
concretevitamin Apr 8, 2015
be5d5c1
refactor schema functions
Apr 8, 2015
836c4bf
Update `createDataFrame` and `toDF`
Apr 8, 2015
1a3b63d
new version of `CreateDF`
Apr 8, 2015
275deb4
Update `NAMESPACE` and tests
Apr 8, 2015
f3ba785
Fixed duplicate export
Apr 8, 2015
ed66c81
Update `subtract` to work with `generics.R`
Apr 8, 2015
7e8caa3
Merge pull request #246 from hlin09/fixCombineByKey
shivaram Apr 9, 2015
07d0dbc
[SPARKR-244] Fix test failure after integration of subtract() and sub…
Apr 9, 2015
40199eb
Move except into sorted position
shivaram Apr 9, 2015
9387402
fix style
Apr 9, 2015
141efd8
Merge pull request #245 from hqzizania/upstream
shivaram Apr 9, 2015
7741d66
Rename the SQL DataType function
Apr 10, 2015
6ef5f2d
Fix spacing
Apr 10, 2015
8526d2e
Remove `tojson` functions
Apr 10, 2015
71372d9
Update docs and examples
Apr 10, 2015
5a553e7
Use object attribute instead of argument
Apr 10, 2015
1bdcb63
Updates to README.md.
concretevitamin Apr 11, 2015
ae78312
Merge pull request #237 from sun-rui/SPARKR-154_3
Apr 14, 2015
41f8184
rm man
Apr 14, 2015
4f5ac09
Merge branch 'master' of github.com:apache/spark into R5
Apr 14, 2015
e74c04e
fix schema.R
Apr 14, 2015
b1fe460
fix conflict in README.md
Apr 14, 2015
168b7fe
sort generics
Apr 14, 2015
a5a02f2
Merge branch 'master' of github.com:apache/spark into R3
Apr 16, 2015
c2b09be
SQLTypes -> schema
Apr 16, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ License: Apache License (== 2.0)
Collate:
'generics.R'
'jobj.R'
'SQLTypes.R'
'RDD.R'
'pairRDD.R'
'column.R'
'group.R'
'schema.R'
'DataFrame.R'
'SQLContext.R'
'broadcast.R'
Expand Down
20 changes: 17 additions & 3 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ exportMethods(
"aggregateByKey",
"aggregateRDD",
"cache",
"cartesian",
"checkpoint",
"coalesce",
"cogroup",
Expand All @@ -28,6 +29,7 @@ exportMethods(
"fullOuterJoin",
"glom",
"groupByKey",
"intersection",
"join",
"keyBy",
"keys",
Expand All @@ -52,11 +54,14 @@ exportMethods(
"reduceByKeyLocally",
"repartition",
"rightOuterJoin",
"sampleByKey",
"sampleRDD",
"saveAsTextFile",
"saveAsObjectFile",
"sortBy",
"sortByKey",
"subtract",
"subtractByKey",
"sumRDD",
"take",
"takeOrdered",
Expand Down Expand Up @@ -95,6 +100,7 @@ exportClasses("DataFrame")
exportMethods("columns",
"distinct",
"dtypes",
"except",
"explain",
"filter",
"groupBy",
Expand All @@ -118,7 +124,6 @@ exportMethods("columns",
"show",
"showDF",
"sortDF",
"subtract",
"toJSON",
"toRDD",
"unionAll",
Expand Down Expand Up @@ -178,5 +183,14 @@ export("cacheTable",
"toDF",
"uncacheTable")

export("print.structType",
"print.structField")
export("sparkRSQL.init",
"sparkRHive.init")

export("structField",
"structField.jobj",
"structField.character",
"print.structField",
"structType",
"structType.jobj",
"structType.structField",
"print.structType")
16 changes: 9 additions & 7 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1141,29 +1141,31 @@ setMethod("intersect",
dataFrame(intersected)
})

#' Subtract
#' except
#'
#' Return a new DataFrame containing rows in this DataFrame
#' but not in another DataFrame. This is equivalent to `EXCEPT` in SQL.
#'
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @return A DataFrame containing the result of the subtract operation.
#' @rdname subtract
#' @return A DataFrame containing the result of the except operation.
#' @rdname except
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df1 <- jsonFile(sqlCtx, path)
#' df2 <- jsonFile(sqlCtx, path2)
#' subtractDF <- subtract(df, df2)
#' exceptDF <- except(df, df2)
#' }
setMethod("subtract",
#' @rdname except
#' @export
setMethod("except",
signature(x = "DataFrame", y = "DataFrame"),
function(x, y) {
subtracted <- callJMethod(x@sdf, "except", y@sdf)
dataFrame(subtracted)
excepted <- callJMethod(x@sdf, "except", y@sdf)
dataFrame(excepted)
})

#' Save the contents of the DataFrame to a data source
Expand Down
205 changes: 133 additions & 72 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ setMethod("take",
index <- -1
jrdd <- getJRDD(x)
numPartitions <- numPartitions(x)
serializedModeRDD <- getSerializedMode(x)

# TODO(shivaram): Collect more than one partition based on size
# estimates similar to the scala version of `take`.
Expand All @@ -748,13 +749,14 @@ setMethod("take",
elems <- convertJListToRList(partition,
flatten = TRUE,
logicalUpperBound = size,
serializedMode = getSerializedMode(x))
# TODO: Check if this append is O(n^2)?
serializedMode = serializedModeRDD)

resList <- append(resList, elems)
}
resList
})


#' First
#'
#' Return the first element of an RDD
Expand Down Expand Up @@ -1092,21 +1094,42 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
if (num < length(part)) {
# R limitation: order works only on primitive types!
ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
list(part[ord[1:num]])
part[ord[1:num]]
} else {
list(part)
part
}
}

reduceFunc <- function(elems, part) {
newElems <- append(elems, part)
# R limitation: order works only on primitive types!
ord <- order(unlist(newElems, recursive = FALSE), decreasing = !ascending)
newElems[ord[1:num]]
}

newRdd <- mapPartitions(x, partitionFunc)
reduce(newRdd, reduceFunc)

resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
numPartitions <- numPartitions(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)

while (TRUE) {
index <- index + 1

if (index >= numPartitions) {
ord <- order(unlist(resList, recursive = FALSE), decreasing = !ascending)
resList <- resList[ord[1:num]]
break
}

# a JList of byte arrays
partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
partition <- partitionArr[[1]]

# elems is capped to have at most `num` elements
elems <- convertJListToRList(partition,
flatten = TRUE,
logicalUpperBound = num,
serializedMode = serializedModeRDD)

resList <- append(resList, elems)
}
resList
}

#' Returns the first N elements from an RDD in ascending order.
Expand Down Expand Up @@ -1465,67 +1488,105 @@ setMethod("zipRDD",
stop("Can only zip RDDs which have the same number of partitions.")
}

if (getSerializedMode(x) != getSerializedMode(other) ||
getSerializedMode(x) == "byte") {
# Append the number of elements in each partition to that partition so that we can later
# check if corresponding partitions of both RDDs have the same number of elements.
#
# Note that this appending also serves the purpose of reserialization, because even if
# any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
# as a single byte array. For example, partitions of an RDD generated from partitionBy()
# may be encoded as multiple byte arrays.
appendLength <- function(part) {
part[[length(part) + 1]] <- length(part) + 1
part
}
x <- lapplyPartition(x, appendLength)
other <- lapplyPartition(other, appendLength)
}
rdds <- appendPartitionLengths(x, other)
jrdd <- callJMethod(getJRDD(rdds[[1]]), "zip", getJRDD(rdds[[2]]))
# The jrdd's elements are of scala Tuple2 type. The serialized
# flag here is used for the elements inside the tuples.
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))

zippedJRDD <- callJMethod(getJRDD(x), "zip", getJRDD(other))
# The zippedRDD's elements are of scala Tuple2 type. The serialized
# flag Here is used for the elements inside the tuples.
serializerMode <- getSerializedMode(x)
zippedRDD <- RDD(zippedJRDD, serializerMode)
mergePartitions(rdd, TRUE)
})

#' Cartesian product of this RDD and another one.
#'
#' Return the Cartesian product of this RDD and another one,
#' that is, the RDD of all pairs of elements (a, b) where a
#' is in this and b is in other.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @return A new RDD which is the Cartesian product of these two RDDs.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:2)
#' sortByKey(cartesian(rdd, rdd))
#' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
#'}
#' @rdname cartesian
#' @aliases cartesian,RDD,RDD-method
setMethod("cartesian",
signature(x = "RDD", other = "RDD"),
function(x, other) {
rdds <- appendPartitionLengths(x, other)
jrdd <- callJMethod(getJRDD(rdds[[1]]), "cartesian", getJRDD(rdds[[2]]))
# The jrdd's elements are of scala Tuple2 type. The serialized
# flag here is used for the elements inside the tuples.
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))

partitionFunc <- function(split, part) {
len <- length(part)
if (len > 0) {
if (serializerMode == "byte") {
lengthOfValues <- part[[len]]
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)

# check if corresponding partitions of both RDDs have the same number of elements.
if (lengthOfKeys != lengthOfValues) {
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
}

if (lengthOfKeys > 1) {
keys <- part[1 : (lengthOfKeys - 1)]
values <- part[(lengthOfKeys + 1) : (len - 1)]
} else {
keys <- list()
values <- list()
}
} else {
# Keys, values must have same length here, because this has
# been validated inside the JavaRDD.zip() function.
keys <- part[c(TRUE, FALSE)]
values <- part[c(FALSE, TRUE)]
}
mapply(
function(k, v) {
list(k, v)
},
keys,
values,
SIMPLIFY = FALSE,
USE.NAMES = FALSE)
} else {
part
}
mergePartitions(rdd, FALSE)
})

#' Subtract an RDD with another RDD.
#'
#' Return an RDD with the elements from this that are not in other.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @param numPartitions Number of the partitions in the result RDD.
#' @return An RDD with the elements from this that are not in other.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
#' rdd2 <- parallelize(sc, list(2, 4))
#' collect(subtract(rdd1, rdd2))
#' # list(1, 1, 3)
#'}
#' @rdname subtract
#' @aliases subtract,RDD
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR::numPartitions(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
keys(subtractByKey(rdd1, rdd2, numPartitions))
})

#' Intersection of this RDD and another one.
#'
#' Return the intersection of this RDD and another one.
#' The output will not contain any duplicate elements,
#' even if the input RDDs did. Performs a hash partition
#' across the cluster.
#' Note that this method performs a shuffle internally.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @param numPartitions The number of partitions in the result RDD.
#' @return An RDD which is the intersection of these two RDDs.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
#' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
#' # list(1, 2, 3)
#'}
#' @rdname intersection
#' @aliases intersection,RDD
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR::numPartitions(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })

filterFunction <- function(elem) {
iters <- elem[[2]]
all(as.vector(
lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical"))
}
PipelinedRDD(zippedRDD, partitionFunc)

keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})
Loading