Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
AzureQ committed Dec 1, 2018
2 parents 149bed4 + 6be272b commit ff8d0ba
Show file tree
Hide file tree
Showing 785 changed files with 22,290 additions and 15,995 deletions.
2 changes: 1 addition & 1 deletion R/WINDOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
To build SparkR on Windows, the following steps are required

1. Install R (>= 3.1) and [Rtools](http://cran.r-project.org/bin/windows/Rtools/). Make sure to
include Rtools and R in `PATH`.
include Rtools and R in `PATH`. Note that support for R prior to version 3.4 is deprecated as of Spark 3.0.0.

2. Install
[JDK8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) and set
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ URL: http://www.apache.org/ http://spark.apache.org/
BugReports: http://spark.apache.org/contributing.html
SystemRequirements: Java (== 8)
Depends:
R (>= 3.0),
R (>= 3.1),
methods
Suggests:
knitr,
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ exportMethods("arrange",
"toJSON",
"transform",
"union",
"unionAll",
"unionByName",
"unique",
"unpersist",
Expand Down Expand Up @@ -350,6 +351,8 @@ exportMethods("%<=>%",
"row_number",
"rpad",
"rtrim",
"schema_of_csv",
"schema_of_json",
"second",
"sha1",
"sha2",
Expand Down
22 changes: 21 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,13 @@ setMethod("repartition",
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#' At least one partition-by expression must be specified.
#' When no explicit sort order is specified, "ascending nulls first" is assumed.
#'
#' Note that due to performance reasons this method uses sampling to estimate the ranges.
#' Hence, the output may not be consistent, since sampling can return different values.
#' The sample size can be controlled by the config
#' \code{spark.sql.execution.rangeExchange.sampleSizePerPartition}.
#'
#' @param x a SparkDataFrame.
#' @param numPartitions the number of partitions to use.
Expand Down Expand Up @@ -820,7 +827,6 @@ setMethod("repartitionByRange",
#' toJSON
#'
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
#'
#' Each row is turned into a JSON document with columns as different fields.
#' The returned SparkDataFrame has a single character column with the name \code{value}
#'
Expand Down Expand Up @@ -2724,6 +2730,20 @@ setMethod("union",
dataFrame(unioned)
})

#' Return a new SparkDataFrame containing the union of rows
#'
#' This is an alias for `union`.
#'
#' @rdname union
#' @name unionAll
#' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
#' @note unionAll since 1.4.0
setMethod("unionAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
union(x, y)
})

#' Return a new SparkDataFrame containing the union of rows, matched by column names
#'
#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
Expand Down
79 changes: 71 additions & 8 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,18 @@ NULL
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' }
#' @param ... additional argument(s). In \code{to_json}, \code{to_csv} and \code{from_json},
#' this contains additional named properties to control how it is converted, accepts
#' the same options as the JSON/CSV data source. Additionally \code{to_json} supports
#' the "pretty" option which enables pretty JSON generation. In \code{arrays_zip},
#' this contains additional Columns of arrays to be merged.
#' @param ... additional argument(s).
#' \itemize{
#' \item \code{to_json}, \code{from_json} and \code{schema_of_json}: this contains
#' additional named properties to control how it is converted and accepts the
#' same options as the JSON data source.
#' \item \code{to_json}: it supports the "pretty" option which enables pretty
#' JSON generation.
#' \item \code{to_csv}, \code{from_csv} and \code{schema_of_csv}: this contains
#' additional named properties to control how it is converted and accepts the
#' same options as the CSV data source.
#' \item \code{arrays_zip}, this contains additional Columns of arrays to be merged.
#' }
#' @name column_collection_functions
#' @rdname column_collection_functions
#' @family collection functions
Expand Down Expand Up @@ -1771,12 +1778,16 @@ setMethod("to_date",
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a JSON object
#' df2 <- sql("SELECT map('name', 'Bob')) as people")
#' df2 <- sql("SELECT map('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts an array of maps into a JSON array
#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))}
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a pretty JSON object
#' df2 <- sql("SELECT map('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people, pretty = TRUE))}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
function(x, ...) {
Expand Down Expand Up @@ -2285,6 +2296,32 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
column(jc)
})

#' @details
#' \code{schema_of_json}: Parses a JSON string and infers its schema in DDL format.
#'
#' @rdname column_collection_functions
#' @aliases schema_of_json schema_of_json,characterOrColumn-method
#' @examples
#'
#' \dontrun{
#' json <- "{\"name\":\"Bob\"}"
#' df <- sql("SELECT * FROM range(1)")
#' head(select(df, schema_of_json(json)))}
#' @note schema_of_json since 3.0.0
setMethod("schema_of_json", signature(x = "characterOrColumn"),
function(x, ...) {
if (class(x) == "character") {
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
} else {
col <- x@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"schema_of_json",
col, options)
column(jc)
})

#' @details
#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType}
#' with the specified \code{schema}.
Expand Down Expand Up @@ -2315,6 +2352,32 @@ setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
column(jc)
})

#' @details
#' \code{schema_of_csv}: Parses a CSV string and infers its schema in DDL format.
#'
#' @rdname column_collection_functions
#' @aliases schema_of_csv schema_of_csv,characterOrColumn-method
#' @examples
#'
#' \dontrun{
#' csv <- "Amsterdam,2018"
#' df <- sql("SELECT * FROM range(1)")
#' head(select(df, schema_of_csv(csv)))}
#' @note schema_of_csv since 3.0.0
setMethod("schema_of_csv", signature(x = "characterOrColumn"),
function(x, ...) {
if (class(x) == "character") {
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
} else {
col <- x@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"schema_of_csv",
col, options)
column(jc)
})

#' @details
#' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT
#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a
Expand Down Expand Up @@ -3370,7 +3433,7 @@ setMethod("flatten",
#'
#' @rdname column_collection_functions
#' @aliases map_entries map_entries,Column-method
#' @note map_entries since 2.4.0
#' @note map_entries since 3.0.0
setMethod("map_entries",
signature(x = "Column"),
function(x) {
Expand Down
11 changes: 11 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,9 @@ setGeneric("toRDD", function(x) { standardGeneric("toRDD") })
#' @rdname union
setGeneric("union", function(x, y) { standardGeneric("union") })

#' @rdname union
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })

#' @rdname unionByName
setGeneric("unionByName", function(x, y) { standardGeneric("unionByName") })

Expand Down Expand Up @@ -1203,6 +1206,14 @@ setGeneric("rpad", function(x, len, pad) { standardGeneric("rpad") })
#' @name NULL
setGeneric("rtrim", function(x, trimString) { standardGeneric("rtrim") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("schema_of_csv", function(x, ...) { standardGeneric("schema_of_csv") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("schema_of_json", function(x, ...) { standardGeneric("schema_of_json") })

#' @rdname column_aggregate_functions
#' @name NULL
setGeneric("sd", function(x, na.rm = FALSE) { standardGeneric("sd") })
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/stats.R
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ setMethod("corr",
#'
#' Finding frequent items for columns, possibly with false positives.
#' Using the frequent element count algorithm described in
#' \url{http://dx.doi.org/10.1145/762471.762473}, proposed by Karp, Schenker, and Papadimitriou.
#' \url{https://doi.org/10.1145/762471.762473}, proposed by Karp, Schenker, and Papadimitriou.
#'
#' @param x A SparkDataFrame.
#' @param cols A vector column names to search frequent items in.
Expand Down Expand Up @@ -143,7 +143,7 @@ setMethod("freqItems", signature(x = "SparkDataFrame", cols = "character"),
#' *exact* rank of x is close to (p * N). More precisely,
#' floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
#' This method implements a variation of the Greenwald-Khanna algorithm (with some speed
#' optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670
#' optimizations). The algorithm was first present in [[https://doi.org/10.1145/375663.375670
#' Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
#' Note that NA values will be ignored in numerical columns before calculation. For
#' columns only containing NA values, an empty list is returned.
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/inst/profile/general.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#

.First <- function() {
if (utils::compareVersion(paste0(R.version$major, ".", R.version$minor), "3.4.0") == -1) {
warning("Support for R prior to version 3.4 is deprecated since Spark 3.0.0")
}

packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
dirs <- strsplit(packageDir, ",")[[1]]
.libPaths(c(dirs, .libPaths()))
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/inst/profile/shell.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
#

.First <- function() {
if (utils::compareVersion(paste0(R.version$major, ".", R.version$minor), "3.4.0") == -1) {
warning("Support for R prior to version 3.4 is deprecated since Spark 3.0.0")
}

home <- Sys.getenv("SPARK_HOME")
.libPaths(c(file.path(home, "R", "lib"), .libPaths()))
Sys.setenv(NOAWT = 1)
Expand Down
19 changes: 16 additions & 3 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1620,14 +1620,20 @@ test_that("column functions", {
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)

# Test from_csv()
# Test from_csv(), schema_of_csv()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)

# Test to_json(), from_json()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")
c <- collect(select(df, schema_of_csv(lit("Amsterdam,2018"))))
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")

# Test to_json(), from_json(), schema_of_json()
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
Expand All @@ -1654,6 +1660,12 @@ test_that("column functions", {
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 })))
}

df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_json('{"name":"Bob"}')))
expect_equal(c[[1]], "struct<name:string>")
c <- collect(select(df, schema_of_json(lit('{"name":"Bob"}'))))
expect_equal(c[[1]], "struct<name:string>")

# Test to_json() supports arrays of primitive types and arrays
df <- sql("SELECT array(19, 42, 70) as age")
j <- collect(select(df, alias(to_json(df$age), "json")))
Expand All @@ -1674,7 +1686,7 @@ test_that("column functions", {

# check for unparseable
df <- as.DataFrame(list(list("a" = "")))
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)

# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
Expand Down Expand Up @@ -2458,6 +2470,7 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF
expect_equal(count(unioned), 6)
expect_equal(first(unioned)$name, "Michael")
expect_equal(count(arrange(suppressWarnings(union(df, df2)), df$age)), 6)
expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6)

df1 <- select(df2, "age", "name")
unioned1 <- arrange(unionByName(df1, df), df1$age)
Expand Down
1 change: 1 addition & 0 deletions R/pkg/tests/fulltests/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ test_that("Specify a schema by using a DDL-formatted string when reading", {
expect_false(awaitTermination(q, 5 * 1000))
callJMethod(q@ssq, "processAllAvailable")
expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
stopQuery(q)

expect_error(read.stream(path = parquetPath, schema = "name stri"),
"DataType stri is not supported.")
Expand Down
2 changes: 1 addition & 1 deletion assembly/README
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ This module is off by default. To activate it specify the profile in the command

If you need to build an assembly for a different version of Hadoop the
hadoop-version system property needs to be set as in this example:
-Dhadoop.version=2.7.3
-Dhadoop.version=2.7.4
4 changes: 2 additions & 2 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>spark-assembly_2.11</artifactId>
<artifactId>spark-assembly_2.12</artifactId>
<name>Spark Project Assembly</name>
<url>http://spark.apache.org/</url>
<packaging>pom</packaging>
Expand Down
Loading

0 comments on commit ff8d0ba

Please sign in to comment.