From eef5735620e6e824eaba4fd7952e26a17a68308b Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 1 May 2017 08:47:37 +0200 Subject: [PATCH 1/5] Inital implementation --- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 20 ++++++++++++++++++++ R/pkg/R/generics.R | 6 ++++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 12 ++++++++++++ 4 files changed, 39 insertions(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 7ecd168137e8d..d720037958f46 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -257,6 +257,7 @@ exportMethods("%<=>%", "hypot", "ifelse", "initcap", + "input_file_name", "instr", "isNaN", "isNotNull", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 38384a89919a2..2fca9925a9d05 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3974,3 +3974,23 @@ setMethod("grouping_id", jc <- callJStatic("org.apache.spark.sql.functions", "grouping_id", jcols) column(jc) }) + +#' input_file_name +#' +#' Creates a string column for the file name of the current Spark task. +#' +#' @rdname input_file_name +#' @name input_file_name +#' @aliases input_file_name,missing-method +#' @export +#' @examples \dontrun{ +#' df <- read.text("README.md") +#' +#' head(select(df, input_file_name())) +#' } +#' @note input_file_name since 2.3.0 +setMethod("input_file_name", signature("missing"), + function() { + jc <- callJStatic("org.apache.spark.sql.functions", "input_file_name") + column(jc) + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index e02d46426a5a6..b232f54e1041e 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1076,6 +1076,12 @@ setGeneric("hypot", function(y, x) { standardGeneric("hypot") }) #' @export setGeneric("initcap", function(x) { standardGeneric("initcap") }) +#' @param x empty. Should be used with no argument. +#' @rdname input_file_name +#' @export +setGeneric("input_file_name", + function(x = "missing") { standardGeneric("input_file_name") }) + #' @rdname instr #' @export setGeneric("instr", function(y, x) { standardGeneric("instr") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 12867c15d1f95..30bfd9ffd098e 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1656,6 +1656,18 @@ test_that("greatest() and least() on a DataFrame", { expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3)) }) +test_that("input_file_name()", { + path <- tempfile(pattern = "input_file_name_test", fileext = ".txt") + write.table(iris[1:50, ], path, row.names = FALSE, col.names = FALSE) + + df <- read.text(path) + actual_names <- sort(collect(distinct(select(df, input_file_name())))) + expect_equal(length(actual_names), 1) + expect_true(gsub("^.*?://", "", actual_names) == path) + + unlink(path) +}) + test_that("time windowing (window()) with all inputs", { df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds") From cb7f2d650e6f13805bd626359ee5a4b6b2bd9539 Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 1 May 2017 20:41:04 +0200 Subject: [PATCH 2/5] Make test Windows friendlier --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 30bfd9ffd098e..8bcc1a2dc1fd7 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1663,7 +1663,7 @@ test_that("input_file_name()", { df <- read.text(path) actual_names <- sort(collect(distinct(select(df, input_file_name())))) expect_equal(length(actual_names), 1) - expect_true(gsub("^.*?://", "", actual_names) == path) + expect_equal(basename(actual_names[1, 1]), basename(path)) unlink(path) }) From 38f43d058df9a52eb1c1adc523b0a90c6e291ceb Mon Sep 17 00:00:00 2001 From: zero323 Date: Tue, 2 May 2017 08:09:23 +0200 Subject: [PATCH 3/5] Reuse existing tests for input_file_name --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 8bcc1a2dc1fd7..02161c9481ee8 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1366,6 +1366,11 @@ test_that("column functions", { expect_equal(collect(df2)[[3, 1]], FALSE) expect_equal(collect(df2)[[3, 2]], TRUE) + # Test that input_file_name() + actual_names <- sort(collect(distinct(select(df, input_file_name())))) + expect_equal(length(actual_names), 1) + expect_equal(basename(actual_names[1, 1]), basename(jsonPath)) + df3 <- select(df, between(df$name, c("Apache", "Spark"))) expect_equal(collect(df3)[[1, 1]], TRUE) expect_equal(collect(df3)[[2, 1]], FALSE) @@ -1656,18 +1661,6 @@ test_that("greatest() and least() on a DataFrame", { expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3)) }) -test_that("input_file_name()", { - path <- tempfile(pattern = "input_file_name_test", fileext = ".txt") - write.table(iris[1:50, ], path, row.names = FALSE, col.names = FALSE) - - df <- read.text(path) - actual_names <- sort(collect(distinct(select(df, input_file_name())))) - expect_equal(length(actual_names), 1) - expect_equal(basename(actual_names[1, 1]), basename(path)) - - unlink(path) -}) - test_that("time windowing (window()) with all inputs", { df <- createDataFrame(data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds") From ca49c248e10105b1ceea848cb4c1c9eda45e0451 Mon Sep 17 00:00:00 2001 From: zero323 Date: Wed, 3 May 2017 01:37:46 +0200 Subject: [PATCH 4/5] Adjust description --- R/pkg/R/functions.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 2fca9925a9d05..739d2693cefb7 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3977,7 +3977,7 @@ setMethod("grouping_id", #' input_file_name #' -#' Creates a string column for the file name of the current Spark task. +#' Creates a string column with the input file name for a given row #' #' @rdname input_file_name #' @name input_file_name From 72f3fb739240b9f27fcab47cbb9d82aff3272f93 Mon Sep 17 00:00:00 2001 From: zero323 Date: Wed, 3 May 2017 01:38:31 +0200 Subject: [PATCH 5/5] Add @family --- R/pkg/R/functions.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 739d2693cefb7..1b2338c63d902 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3981,6 +3981,7 @@ setMethod("grouping_id", #' #' @rdname input_file_name #' @name input_file_name +#' @family normal_funcs #' @aliases input_file_name,missing-method #' @export #' @examples \dontrun{