-
Notifications
You must be signed in to change notification settings - Fork 6
/
dplyr_integration.R
378 lines (347 loc) · 16.2 KB
/
dplyr_integration.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
#' @include utils.R
######################################################################
# dplyr generic
######################################################################
#' @title S3 implementation of \code{db_compute} for Athena
#'
#' @description This is a backend function for dplyr's \code{compute} function. Users won't be required to access and run this function.
#' @param con A \code{\link{dbConnect}} object, as returned by \code{dbConnect()}
#' @param table Table name, if left default RAthena will use the default from \code{dplyr}'s \code{compute} function.
#' @param sql SQL code to be sent to the data
#' @param ... passes \code{RAthena} table creation parameters: [\code{file_type},\code{s3_location},\code{partition}]
#' \itemize{
#' \item{\code{file_type:} What file type to store data.frame on s3, RAthena currently supports ["NULL","csv", "parquet", "json"].
#' \code{"NULL"} will let Athena set the file_type for you.}
#' \item{\code{s3_location:} s3 bucket to store Athena table, must be set as a s3 uri for example ("s3://mybucket/data/")}
#' \item{\code{partition:} Partition Athena table, requires to be a partitioned variable from previous table.}}
#' @name db_compute
#' @return
#' \code{db_compute} returns table name
#' @seealso \code{\link{AthenaWriteTables}} \code{\link{backend_dbplyr_v2}} \code{\link{backend_dbplyr_v1}}
#' @examples
#' \dontrun{
#' # Note:
#' # - Require AWS Account to run below example.
#' # - Different connection methods can be used please see `RAthena::dbConnect` documentation
#'
#' library(DBI)
#' library(dplyr)
#'
#' # Demo connection to Athena using profile name
#' con <- dbConnect(RAthena::athena())
#'
#' # Write data.frame to Athena table
#' copy_to(con, mtcars,
#' s3_location = "s3://mybucket/data/")
#'
#' # Write Athena table from tbl_sql
#' athena_mtcars <- tbl(con, "mtcars")
#' mtcars_filter <- athena_mtcars %>% filter(gear >=4)
#'
#' # create athena with unique table name
#' mtcars_filer %>%
#' compute()
#'
#' # create athena with specified name and s3 location
#' mtcars_filer %>%
#' compute("mtcars_filer",
#' s3_location = "s3://mybucket/mtcars_filer/")
#'
#' # Disconnect from Athena
#' dbDisconnect(con)
#' }
db_compute.AthenaConnection <- function(con,
table,
sql,
...) {
in_schema <- pkg_method("in_schema", "dbplyr")
if(athena_unload()){
stop(
"Unable to create table when `RAthena_options(unload = TRUE)`. Please run `RAthena_options(unload = FALSE)` and try again.",
call. = FALSE
)
}
table <- athena_query_save(con, sql, table, ...)
ll <- db_detect(con, table)
in_schema(ll[["dbms.name"]], ll[["table"]])
}
athena_query_save <- function(con, sql, name ,
file_type = c("NULL","csv", "tsv", "parquet", "json", "orc"),
s3_location = NULL,
partition = NULL,
compress = TRUE,
...){
stopifnot(is.null(s3_location) || is.s3_uri(s3_location))
file_type = match.arg(file_type)
tt_sql <- SQL(paste0("CREATE TABLE ", paste0('"',unlist(strsplit(name,"\\.")),'"', collapse = '.'),
" ", ctas_sql_with(partition, s3_location, file_type, compress), "AS ",
sql, ";"))
res <- dbExecute(con, tt_sql, unload = FALSE)
dbClearResult(res)
return(name)
}
#' S3 implementation of \code{db_copy_to} for Athena
#'
#' This is an Athena method for dbplyr function \code{db_copy_to} to create an Athena table from a \code{data.frame}.
#' @param con A \code{\link{dbConnect}} object, as returned by \code{dbConnect()}
#' @param table A character string specifying a table name. Names will be
#' automatically quoted so you can use any sequence of characters, not
#' just any valid bare table name.
#' @param values A data.frame to write to the database.
#' @param overwrite Allows overwriting the destination table. Cannot be \code{TRUE} if \code{append} is also \code{TRUE}.
#' @param append Allow appending to the destination table. Cannot be \code{TRUE} if \code{overwrite} is also \code{TRUE}. Existing Athena DDL file type will be retained
#' and used when uploading data to AWS Athena. If parameter \code{file.type} doesn't match AWS Athena DDL file type a warning message will be created
#' notifying user and \code{RAthena} will use the file type for the Athena DDL.
#' @param types Additional field types used to override derived types.
#' @param s3_location s3 bucket to store Athena table, must be set as a s3 uri for example ("s3://mybucket/data/")
#' @param partition Partition Athena table (needs to be a named list or vector) for example: \code{c(var1 = "2019-20-13")}
#' @param file_type What file type to store data.frame on s3, RAthena currently supports ["tsv", "csv", "parquet"]. Default delimited file type is "tsv", in previous versions
#' of \code{RAthena (=< 1.4.0)} file type "csv" was used as default. The reason for the change is that columns containing \code{Array/JSON} format cannot be written to
#' Athena due to the separating value ",". This would cause issues with AWS Athena.
#' \strong{Note:} "parquet" format is supported by the \code{arrow} package and it will need to be installed to utilise the "parquet" format.
#' @param compress \code{FALSE | TRUE} To determine if to compress file.type. If file type is ["csv", "tsv"] then "gzip" compression is used, for file type "parquet"
#' "snappy" compression is used.
#' @param max_batch Split the data frame by max number of rows i.e. 100,000 so that multiple files can be uploaded into AWS S3. By default when compression
#' is set to \code{TRUE} and file.type is "csv" or "tsv" max.batch will split data.frame into 20 batches. This is to help the
#' performance of AWS Athena when working with files compressed in "gzip" format. \code{max.batch} will not split the data.frame
#' when loading file in parquet format. For more information please go to \href{https://github.com/DyfanJones/RAthena/issues/36}{link}
#' @param ... other parameters currently not supported in RAthena
#' @name db_copy_to
#' @seealso \code{\link{AthenaWriteTables}}
#' @return
#' db_copy_to returns table name
#' @examples
#' \dontrun{
#' # Note:
#' # - Require AWS Account to run below example.
#' # - Different connection methods can be used please see `RAthena::dbConnect` documnentation
#'
#' library(DBI)
#' library(dplyr)
#'
#' # Demo connection to Athena using profile name
#' con <- dbConnect(RAthena::athena())
#'
#' # List existing tables in Athena
#' dbListTables(con)
#'
#' # Write data.frame to Athena table
#' copy_to(con, mtcars,
#' s3_location = "s3://mybucket/data/")
#'
#' # Checking if uploaded table exists in Athena
#' dbExistsTable(con, "mtcars")
#'
#' # Write Athena table from tbl_sql
#' athena_mtcars <- tbl(con, "mtcars")
#' mtcars_filter <- athena_mtcars %>% filter(gear >=4)
#'
#' copy_to(con, mtcars_filter)
#'
#' # Checking if uploaded table exists in Athena
#' dbExistsTable(con, "mtcars_filter")
#'
#' # Disconnect from Athena
#' dbDisconnect(con)
#' }
db_copy_to.AthenaConnection <- function(con, table, values,
overwrite = FALSE, append = FALSE,
types = NULL, partition = NULL,
s3_location = NULL,
file_type = c("csv", "tsv", "parquet"),
compress = FALSE,
max_batch = Inf, ...){
types <- types %||% dbDataType(con, values)
names(types) <- names(values)
file_type = match.arg(file_type)
dbWriteTable(conn = con, name = table, value = values,
overwrite = overwrite, append = append,
field.types = types, partition = partition,
s3.location = s3_location, file.type = file_type,
compress = compress,
max.batch = max_batch)
return(table)
}
######################################################################
# dplyr v2 api support
######################################################################
#' Declare which version of dbplyr API is being called.
#'
#' @param con A \code{\link{dbConnect}} object, as returned by \code{dbConnect()}
#' @name dbplyr_edition
#' @return
#' Integer for which version of `dbplyr` is going to be used.
#' @export
dbplyr_edition.AthenaConnection <- function(con) 2L
#' S3 implementation of \code{db_connection_describe} for Athena (api version 2).
#'
#' This is a backend function for dplyr to retrieve meta data about Athena queries. Users won't be required to access and run this function.
#' @param con A \code{\link{dbConnect}} object, as returned by \code{dbConnect()}
#' @name db_connection_describe
#' @return
#' Character variable containing Meta Data about query sent to Athena. The Meta Data is returned in the following format:
#'
#' \code{"Athena <boto3 version> [<profile_name>@region/database]"}
NULL
athena_conn_desc <- function(con){
info <- dbGetInfo(con)
profile <- if(!is.null(info$profile_name)) paste0(info$profile_name, "@")
paste0("Athena ",info$boto," [",profile,info$region_name,"/", info$dbms.name,"]")
}
#' @rdname db_connection_describe
db_connection_describe.AthenaConnection <- function(con) {
athena_conn_desc(con)
}
#' Athena S3 implementation of dbplyr backend functions (api version 2).
#'
#' These functions are used to build the different types of SQL queries.
#' The AWS Athena implementation give extra parameters to allow access the to standard DBI Athena methods. They also
#' utilise AWS Glue to speed up sql query execution.
#' @param con A \code{\link{dbConnect}} object, as returned by \code{dbConnect()}
#' @param sql SQL code to be sent to AWS Athena
#' @param x R object to be transformed into athena equivalent
#' @param format returning format for explain queries, default set to `"text"`. Other formats can be found: \url{https://docs.aws.amazon.com/athena/latest/ug/athena-explain-statement.html}
#' @param type return plan for explain queries, default set to `NULL`. Other type can be found: \url{https://docs.aws.amazon.com/athena/latest/ug/athena-explain-statement.html}
#' @param ... other parameters, currently not implemented
#' @name backend_dbplyr_v2
#' @return
#' \describe{
#' \item{sql_query_explain}{Returns sql query for \href{https://docs.aws.amazon.com/athena/latest/ug/athena-explain-statement.html}{AWS Athena explain statement}}
#' \item{sql_query_fields}{Returns sql query column names}
#' \item{sql_escape_date}{Returns sql escaping from dates}
#' \item{sql_escape_datetime}{Returns sql escaping from date times}
#' }
NULL
athena_explain <- function(con, sql, format = "text", type=NULL, ...){
if(athena_unload()){
stop(
"Unable to create table when `RAthena_options(unload = TRUE)`. Please run `RAthena_options(unload = FALSE)` and try again.",
call. = FALSE
)
}
# AWS Athena now supports explain: https://docs.aws.amazon.com/athena/latest/ug/athena-explain-statement.html
format <- match.arg(format, c("text", "json"))
if(!is.null(type)) {
type <- match.arg(type, c("LOGICAL", "DISTRIBUTED", "VALIDATE", "IO"))
format <- NULL
}
build_sql <- pkg_method("build_sql", "dbplyr")
dplyr_sql <- pkg_method("sql", "dbplyr")
build_sql(
"EXPLAIN ",
if (!is.null(format)) dplyr_sql(paste0("(FORMAT ", format, ") ")),
if (!is.null(type)) dplyr_sql(paste0("(TYPE ", type, ") ")),
dplyr_sql(sql),
con = con
)
}
#' @rdname backend_dbplyr_v2
sql_query_explain.AthenaConnection <- athena_explain
#' @rdname backend_dbplyr_v2
sql_query_fields.AthenaConnection <- function(con, sql, ...) {
# pass ident class to dbGetQuery to continue same functionality as dbplyr v1 api.
if(inherits(sql, "ident")) {
return(sql)
} else {
# None ident class uses dbplyr:::sql_query_fields.DBIConnection method
dbplyr_query_select <- pkg_method("dbplyr_query_select", "dbplyr")
sql_subquery <- pkg_method("sql_subquery", "dplyr")
dplyr_sql <- pkg_method("sql", "dplyr")
return(dbplyr_query_select(con, dplyr_sql("*"), sql_subquery(con, sql), where = dplyr_sql("0 = 1")))
}
}
#' @rdname backend_dbplyr_v2
#' @export
sql_escape_date.AthenaConnection <- function(con, x) {
dbQuoteString(con, x)
}
#' @rdname backend_dbplyr_v2
#' @export
sql_escape_datetime.AthenaConnection <- function(con, x) {
str = dbQuoteString(con, x)
return(gsub("^date ", "timestamp ", str))
}
######################################################################
# dplyr v1 api support
######################################################################
#' S3 implementation of \code{db_desc} for Athena (api version 1).
#'
#' This is a backend function for dplyr to retrieve meta data about Athena queries. Users won't be required to access and run this function.
#' @param x A \code{\link{dbConnect}} object, as returned by \code{dbConnect()}
#' @name db_desc
#' @return
#' Character variable containing Meta Data about query sent to Athena. The Meta Data is returned in the following format:
#'
#' \code{"Athena <boto3 version> [<profile_name>@region/database]"}
db_desc.AthenaConnection <- function(x) {
return(athena_conn_desc(x))
}
#' Athena S3 implementation of dbplyr backend functions (api version 1).
#'
#' These functions are used to build the different types of SQL queries.
#' The AWS Athena implementation give extra parameters to allow access the to standard DBI Athena methods. They also
#' utilise AWS Glue to speed up sql query execution.
#' @param con A \code{\link{dbConnect}} object, as returned by \code{dbConnect()}
#' @param sql SQL code to be sent to AWS Athena
#' @param ... other parameters, currently not implemented
#' @name backend_dbplyr_v1
#' @return
#' \describe{
#' \item{db_explain}{Returns \href{https://docs.aws.amazon.com/athena/latest/ug/athena-explain-statement.html}{AWS Athena explain statement}}
#' \item{db_query_fields}{Returns sql query column names}
#' }
#' @rdname backend_dbplyr_v1
db_explain.AthenaConnection <- function(con, sql, ...){
sql <- athena_explain(con, sql, ...)
expl <- dbGetQuery(con, sql, unload = FALSE)
out <- utils::capture.output(print(expl))
paste(out, collapse = "\n")
}
# NOTE: dbplyr v2 integration has to use this in dbGetQuery
athena_query_fields_ident <- function(con, sql){
if (str_count(sql, "\\.") < 2) {
if (grepl("\\.", sql)) {
schema_parts <- gsub('"', "", strsplit(sql, "\\.")[[1]])
} else {
schema_parts <- c(con@info$dbms.name, gsub('"', "", sql))
}
# If dbplyr schema, get the fields from Glue
tryCatch(
output <- py_to_r(con@ptr$glue$get_table(
DatabaseName = schema_parts[1],
Name = schema_parts[2]))$Table
)
col_names = vapply(output$StorageDescriptor$Columns, function(y) y$Name, FUN.VALUE = character(1))
partitions = vapply(output$PartitionKeys,function(y) y$Name, FUN.VALUE = character(1))
return(c(col_names, partitions))
} else {
# If a subquery, query Athena for the fields
# return dplyr methods
sql_select <- pkg_method("sql_select", "dplyr")
sql_subquery <- pkg_method("sql_subquery", "dplyr")
dplyr_sql <- pkg_method("sql", "dplyr")
sql <- sql_select(con, dplyr_sql("*"), sql_subquery(con, sql), where = dplyr_sql("0 = 1"))
qry <- dbSendQuery(con, sql)
on.exit(dbClearResult(qry))
res <- dbFetch(qry, 0)
return(names(res))
}
}
#' @rdname backend_dbplyr_v1
db_query_fields.AthenaConnection <- function(con, sql, ...) {
# check if sql is dbplyr schema
if(inherits(sql, "ident")) {
return(athena_query_fields_ident(con, sql))
} else {
# If a subquery, query Athena for the fields
# return dplyr methods
sql_select <- pkg_method("sql_select", "dplyr")
sql_subquery <- pkg_method("sql_subquery", "dplyr")
dplyr_sql <- pkg_method("sql", "dplyr")
sql <- sql_select(con, dplyr_sql("*"), sql_subquery(con, sql), where = dplyr_sql("0 = 1"))
qry <- dbSendQuery(con, sql)
on.exit(dbClearResult(qry))
res <- dbFetch(qry, 0)
return(names(res))
}
}