diff --git a/r/DESCRIPTION b/r/DESCRIPTION index 45e0f83dcbd0a..a2632973134b9 100644 --- a/r/DESCRIPTION +++ b/r/DESCRIPTION @@ -55,6 +55,7 @@ Collate: 'array.R' 'buffer.R' 'compute.R' + 'csv.R' 'dictionary.R' 'feather.R' 'io.R' diff --git a/r/NAMESPACE b/r/NAMESPACE index 65d60d846f4cb..8846defbd8e65 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -39,6 +39,11 @@ S3method(buffer,default) S3method(buffer,integer) S3method(buffer,numeric) S3method(buffer,raw) +S3method(csv_table_reader,"arrow::csv::TableReader") +S3method(csv_table_reader,"arrow::io::InputStream") +S3method(csv_table_reader,character) +S3method(csv_table_reader,default) +S3method(csv_table_reader,fs_path) S3method(length,"arrow::Array") S3method(names,"arrow::RecordBatch") S3method(print,"arrow-enum") @@ -92,6 +97,10 @@ export(boolean) export(buffer) export(cast_options) export(chunked_array) +export(csv_convert_options) +export(csv_parse_options) +export(csv_read_options) +export(csv_table_reader) export(date32) export(date64) export(decimal) @@ -111,6 +120,7 @@ export(mmap_open) export(null) export(print.integer64) export(read_arrow) +export(read_csv_arrow) export(read_feather) export(read_message) export(read_record_batch) @@ -141,6 +151,7 @@ importFrom(glue,glue) importFrom(purrr,map) importFrom(purrr,map2) importFrom(purrr,map_int) +importFrom(rlang,abort) importFrom(rlang,dots_n) importFrom(rlang,list2) importFrom(rlang,warn) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index 0310eab2027b9..55b9ab33ebf98 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -193,6 +193,26 @@ Table__cast <- function(table, schema, options) { .Call(`_arrow_Table__cast`, table, schema, options) } +csv___ReadOptions__initialize <- function(options) { + .Call(`_arrow_csv___ReadOptions__initialize`, options) +} + +csv___ParseOptions__initialize <- function(options) { + .Call(`_arrow_csv___ParseOptions__initialize`, options) +} + +csv___ConvertOptions__initialize <- function(options) { + .Call(`_arrow_csv___ConvertOptions__initialize`, options) +} + +csv___TableReader__Make <- function(input, read_options, parse_options, convert_options) { + .Call(`_arrow_csv___TableReader__Make`, input, read_options, parse_options, convert_options) +} + +csv___TableReader__Read <- function(table_reader) { + .Call(`_arrow_csv___TableReader__Read`, table_reader) +} + shared_ptr_is_null <- function(xp) { .Call(`_arrow_shared_ptr_is_null`, xp) } diff --git a/r/R/csv.R b/r/R/csv.R new file mode 100644 index 0000000000000..bad87559c05e5 --- /dev/null +++ b/r/R/csv.R @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' @include R6.R + +`arrow::csv::TableReader` <- R6Class("arrow::csv::TableReader", inherit = `arrow::Object`, + public = list( + Read = function() shared_ptr(`arrow::Table`, csv___TableReader__Read(self)) + ) +) + +`arrow::csv::ReadOptions` <- R6Class("arrow::csv::ReadOptions", inherit = `arrow::Object`) +`arrow::csv::ParseOptions` <- R6Class("arrow::csv::ParseOptions", inherit = `arrow::Object`) +`arrow::csv::ConvertOptions` <- R6Class("arrow::csv::ConvertOptions", inherit = `arrow::Object`) + +#' read options for the csv reader +#' +#' @param use_threads Whether to use the global CPU thread pool +#' @param block_size Block size we request from the IO layer; also determines the size of chunks when use_threads is `TRUE` +#' +#' @export +csv_read_options <- function(use_threads = TRUE, block_size = 1048576L) { + shared_ptr(`arrow::csv::ReadOptions`, csv___ReadOptions__initialize( + list( + use_threads = use_threads, + block_size = block_size + ) + )) +} + +#' Parsing options +#' +#' @param delimiter Field delimiter +#' @param quoting Whether quoting is used +#' @param quote_char Quoting character (if `quoting` is `TRUE`) +#' @param double_quote Whether a quote inside a value is double-quoted +#' @param escaping Whether escaping is used +#' @param escape_char Escaping character (if `escaping` is `TRUE`) +#' @param newlines_in_values Whether values are allowed to contain CR (`0x0d``) and LF (`0x0a``) characters +#' @param ignore_empty_lines Whether empty lines are ignored. If false, an empty line represents +#' @param header_rows Number of header rows to skip (including the first row containing column names) +#' +#' @export +csv_parse_options <- function( + delimiter = ",", quoting = TRUE, quote_char = '"', + double_quote = TRUE, escaping = FALSE, escape_char = '\\', + newlines_in_values = FALSE, ignore_empty_lines = TRUE, + header_rows = 1L +){ + shared_ptr(`arrow::csv::ParseOptions`, csv___ParseOptions__initialize( + list( + delimiter = delimiter, + quoting = quoting, + quote_char = quote_char, + double_quote = double_quote, + escaping = escaping, + escape_char = escape_char, + newlines_in_values = newlines_in_values, + ignore_empty_lines = ignore_empty_lines, + header_rows = header_rows + ) + )) +} + +#' Conversion Options for the csv reader +#' +#' @param check_utf8 Whether to check UTF8 validity of string columns +#' +#' @export +csv_convert_options <- function(check_utf8 = TRUE){ + shared_ptr(`arrow::csv::ConvertOptions`, csv___ConvertOptions__initialize( + list( + check_utf8 = check_utf8 + ) + )) +} + +#' CSV table reader +#' +#' @param file file +#' @param read_options, see [csv_read_options()] +#' @param parse_options, see [csv_parse_options()] +#' @param convert_options, see [csv_convert_options()] +#' @param ... additional parameters. +#' +#' @export +csv_table_reader <- function(file, + read_options = csv_read_options(), + parse_options = csv_parse_options(), + convert_options = csv_convert_options(), + ... +){ + UseMethod("csv_table_reader") +} + +#' @importFrom rlang abort +#' @export +csv_table_reader.default <- function(file, + read_options = csv_read_options(), + parse_options = csv_parse_options(), + convert_options = csv_convert_options(), + ... +) { + abort("unsupported") +} + +#' @export +`csv_table_reader.character` <- function(file, + read_options = csv_read_options(), + parse_options = csv_parse_options(), + convert_options = csv_convert_options(), + ... +){ + csv_table_reader(fs::path_abs(file), + read_options = read_options, + parse_options = parse_options, + convert_options = convert_options, + ... + ) +} + +#' @export +`csv_table_reader.fs_path` <- function(file, + read_options = csv_read_options(), + parse_options = csv_parse_options(), + convert_options = csv_convert_options(), + ... +){ + csv_table_reader(ReadableFile(file), + read_options = read_options, + parse_options = parse_options, + convert_options = convert_options, + ... + ) +} + +#' @export +`csv_table_reader.arrow::io::InputStream` <- function(file, + read_options = csv_read_options(), + parse_options = csv_parse_options(), + convert_options = csv_convert_options(), + ... +){ + shared_ptr(`arrow::csv::TableReader`, + csv___TableReader__Make(file, read_options, parse_options, convert_options) + ) +} + +#' @export +`csv_table_reader.arrow::csv::TableReader` <- function(file, + read_options = csv_read_options(), + parse_options = csv_parse_options(), + convert_options = csv_convert_options(), + ... +){ + file +} + +#' Read csv file into an arrow::Table +#' +#' Use arrow::csv::TableReader from [csv_table_reader()] +#' +#' @param ... Used to construct an arrow::csv::TableReader +#' @export +read_csv_arrow <- function(...) { + csv_table_reader(...)$Read() +} + diff --git a/r/man/csv_convert_options.Rd b/r/man/csv_convert_options.Rd new file mode 100644 index 0000000000000..323c6e01970ca --- /dev/null +++ b/r/man/csv_convert_options.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/csv.R +\name{csv_convert_options} +\alias{csv_convert_options} +\title{Conversion Options for the csv reader} +\usage{ +csv_convert_options(check_utf8 = TRUE) +} +\arguments{ +\item{check_utf8}{Whether to check UTF8 validity of string columns} +} +\description{ +Conversion Options for the csv reader +} diff --git a/r/man/csv_parse_options.Rd b/r/man/csv_parse_options.Rd new file mode 100644 index 0000000000000..9540771437f75 --- /dev/null +++ b/r/man/csv_parse_options.Rd @@ -0,0 +1,33 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/csv.R +\name{csv_parse_options} +\alias{csv_parse_options} +\title{Parsing options} +\usage{ +csv_parse_options(delimiter = ",", quoting = TRUE, + quote_char = "\\"", double_quote = TRUE, escaping = FALSE, + escape_char = "\\\\", newlines_in_values = FALSE, + ignore_empty_lines = TRUE, header_rows = 1L) +} +\arguments{ +\item{delimiter}{Field delimiter} + +\item{quoting}{Whether quoting is used} + +\item{quote_char}{Quoting character (if \code{quoting} is \code{TRUE})} + +\item{double_quote}{Whether a quote inside a value is double-quoted} + +\item{escaping}{Whether escaping is used} + +\item{escape_char}{Escaping character (if \code{escaping} is \code{TRUE})} + +\item{newlines_in_values}{Whether values are allowed to contain CR (\code{0x0d``) and LF (}0x0a``) characters} + +\item{ignore_empty_lines}{Whether empty lines are ignored. If false, an empty line represents} + +\item{header_rows}{Number of header rows to skip (including the first row containing column names)} +} +\description{ +Parsing options +} diff --git a/r/man/csv_read_options.Rd b/r/man/csv_read_options.Rd new file mode 100644 index 0000000000000..3fa2d8ccbf2f2 --- /dev/null +++ b/r/man/csv_read_options.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/csv.R +\name{csv_read_options} +\alias{csv_read_options} +\title{read options for the csv reader} +\usage{ +csv_read_options(use_threads = TRUE, block_size = 1048576L) +} +\arguments{ +\item{use_threads}{Whether to use the global CPU thread pool} + +\item{block_size}{Block size we request from the IO layer; also determines the size of chunks when use_threads is \code{TRUE}} +} +\description{ +read options for the csv reader +} diff --git a/r/man/csv_table_reader.Rd b/r/man/csv_table_reader.Rd new file mode 100644 index 0000000000000..029cd0b5923c2 --- /dev/null +++ b/r/man/csv_table_reader.Rd @@ -0,0 +1,24 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/csv.R +\name{csv_table_reader} +\alias{csv_table_reader} +\title{CSV table reader} +\usage{ +csv_table_reader(file, read_options = csv_read_options(), + parse_options = csv_parse_options(), + convert_options = csv_convert_options(), ...) +} +\arguments{ +\item{file}{file} + +\item{read_options, }{see \code{\link[=csv_read_options]{csv_read_options()}}} + +\item{parse_options, }{see \code{\link[=csv_parse_options]{csv_parse_options()}}} + +\item{convert_options, }{see \code{\link[=csv_convert_options]{csv_convert_options()}}} + +\item{...}{additional parameters.} +} +\description{ +CSV table reader +} diff --git a/r/man/read_csv_arrow.Rd b/r/man/read_csv_arrow.Rd new file mode 100644 index 0000000000000..4cdca91246b5b --- /dev/null +++ b/r/man/read_csv_arrow.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/csv.R +\name{read_csv_arrow} +\alias{read_csv_arrow} +\title{Read csv file into an arrow::Table} +\usage{ +read_csv_arrow(...) +} +\arguments{ +\item{...}{Used to construct an arrow::csv::TableReader} +} +\description{ +Use arrow::csv::TableReader from \code{\link[=csv_table_reader]{csv_table_reader()}} +} diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index e5a784eb70c23..c752afba1c258 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -558,6 +558,64 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// csv___ReadOptions__initialize +std::shared_ptr csv___ReadOptions__initialize(List_ options); +RcppExport SEXP _arrow_csv___ReadOptions__initialize(SEXP optionsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< List_ >::type options(optionsSEXP); + rcpp_result_gen = Rcpp::wrap(csv___ReadOptions__initialize(options)); + return rcpp_result_gen; +END_RCPP +} +// csv___ParseOptions__initialize +std::shared_ptr csv___ParseOptions__initialize(List_ options); +RcppExport SEXP _arrow_csv___ParseOptions__initialize(SEXP optionsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< List_ >::type options(optionsSEXP); + rcpp_result_gen = Rcpp::wrap(csv___ParseOptions__initialize(options)); + return rcpp_result_gen; +END_RCPP +} +// csv___ConvertOptions__initialize +std::shared_ptr csv___ConvertOptions__initialize(List_ options); +RcppExport SEXP _arrow_csv___ConvertOptions__initialize(SEXP optionsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< List_ >::type options(optionsSEXP); + rcpp_result_gen = Rcpp::wrap(csv___ConvertOptions__initialize(options)); + return rcpp_result_gen; +END_RCPP +} +// csv___TableReader__Make +std::shared_ptr csv___TableReader__Make(const std::shared_ptr& input, const std::shared_ptr& read_options, const std::shared_ptr& parse_options, const std::shared_ptr& convert_options); +RcppExport SEXP _arrow_csv___TableReader__Make(SEXP inputSEXP, SEXP read_optionsSEXP, SEXP parse_optionsSEXP, SEXP convert_optionsSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr& >::type input(inputSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type read_options(read_optionsSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type parse_options(parse_optionsSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type convert_options(convert_optionsSEXP); + rcpp_result_gen = Rcpp::wrap(csv___TableReader__Make(input, read_options, parse_options, convert_options)); + return rcpp_result_gen; +END_RCPP +} +// csv___TableReader__Read +std::shared_ptr csv___TableReader__Read(const std::shared_ptr& table_reader); +RcppExport SEXP _arrow_csv___TableReader__Read(SEXP table_readerSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr& >::type table_reader(table_readerSEXP); + rcpp_result_gen = Rcpp::wrap(csv___TableReader__Read(table_reader)); + return rcpp_result_gen; +END_RCPP +} // shared_ptr_is_null bool shared_ptr_is_null(SEXP xp); RcppExport SEXP _arrow_shared_ptr_is_null(SEXP xpSEXP) { @@ -2200,6 +2258,11 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_ChunkedArray__cast", (DL_FUNC) &_arrow_ChunkedArray__cast, 3}, {"_arrow_RecordBatch__cast", (DL_FUNC) &_arrow_RecordBatch__cast, 3}, {"_arrow_Table__cast", (DL_FUNC) &_arrow_Table__cast, 3}, + {"_arrow_csv___ReadOptions__initialize", (DL_FUNC) &_arrow_csv___ReadOptions__initialize, 1}, + {"_arrow_csv___ParseOptions__initialize", (DL_FUNC) &_arrow_csv___ParseOptions__initialize, 1}, + {"_arrow_csv___ConvertOptions__initialize", (DL_FUNC) &_arrow_csv___ConvertOptions__initialize, 1}, + {"_arrow_csv___TableReader__Make", (DL_FUNC) &_arrow_csv___TableReader__Make, 4}, + {"_arrow_csv___TableReader__Read", (DL_FUNC) &_arrow_csv___TableReader__Read, 1}, {"_arrow_shared_ptr_is_null", (DL_FUNC) &_arrow_shared_ptr_is_null, 1}, {"_arrow_unique_ptr_is_null", (DL_FUNC) &_arrow_unique_ptr_is_null, 1}, {"_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index dba7a91c21e33..6fef7997dbfa7 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -22,6 +22,7 @@ #undef Free #include #include +#include #include #include #include diff --git a/r/src/csv.cpp b/r/src/csv.cpp new file mode 100644 index 0000000000000..0e1d09fb65e8b --- /dev/null +++ b/r/src/csv.cpp @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow_types.h" + +using namespace Rcpp; + +// [[Rcpp::export]] +std::shared_ptr csv___ReadOptions__initialize(List_ options) { + auto res = + std::make_shared(arrow::csv::ReadOptions::Defaults()); + res->use_threads = options["use_threads"]; + res->block_size = options["block_size"]; + return res; +} + +inline char get_char(CharacterVector x) { return CHAR(STRING_ELT(x, 0))[0]; } + +// [[Rcpp::export]] +std::shared_ptr csv___ParseOptions__initialize(List_ options) { + auto res = + std::make_shared(arrow::csv::ParseOptions::Defaults()); + res->delimiter = get_char(options["delimiter"]); + res->quoting = options["quoting"]; + res->quote_char = get_char(options["quote_char"]); + res->double_quote = options["double_quote"]; + res->escape_char = get_char(options["escape_char"]); + res->newlines_in_values = options["newlines_in_values"]; + res->header_rows = options["header_rows"]; + res->ignore_empty_lines = options["ignore_empty_lines"]; + return res; +} + +// [[Rcpp::export]] +std::shared_ptr csv___ConvertOptions__initialize( + List_ options) { + auto res = std::make_shared( + arrow::csv::ConvertOptions::Defaults()); + res->check_utf8 = options["check_utf8"]; + return res; +} + +// [[Rcpp::export]] +std::shared_ptr csv___TableReader__Make( + const std::shared_ptr& input, + const std::shared_ptr& read_options, + const std::shared_ptr& parse_options, + const std::shared_ptr& convert_options) { + std::shared_ptr table_reader; + STOP_IF_NOT_OK(arrow::csv::TableReader::Make(arrow::default_memory_pool(), input, + *read_options, *parse_options, + *convert_options, &table_reader)); + return table_reader; +} + +// [[Rcpp::export]] +std::shared_ptr csv___TableReader__Read( + const std::shared_ptr& table_reader) { + std::shared_ptr table; + STOP_IF_NOT_OK(table_reader->Read(&table)); + return table; +} diff --git a/r/tests/testthat/test-arrow-csv-.R b/r/tests/testthat/test-arrow-csv-.R new file mode 100644 index 0000000000000..2afd0622821ae --- /dev/null +++ b/r/tests/testthat/test-arrow-csv-.R @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +context("arrow::csv::TableReader") + +test_that("Can read csv file", { + tf <- local_tempfile() + write.csv(iris, tf, row.names = FALSE, quote = FALSE) + + tab1 <- read_csv_arrow(tf) + tab2 <- read_csv_arrow(mmap_open(tf)) + tab3 <- read_csv_arrow(ReadableFile(tf)) + + iris$Species <- as.character(iris$Species) + tab0 <- table(iris) + expect_equal(tab0, tab1) + expect_equal(tab0, tab2) + expect_equal(tab0, tab3) +})