Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12290: [Rust][DataFusion] Add input_file_name function #9944

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/arrow/examples/read_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ fn main() {

let file = File::open("test/data/uk_cities.csv").unwrap();

let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None);
let mut csv =
csv::Reader::new(None, file, Arc::new(schema), false, None, 1024, None, None);
let _batch = csv.next().unwrap().unwrap();
#[cfg(feature = "prettyprint")]
{
Expand Down
45 changes: 41 additions & 4 deletions rust/arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@
//!
//! let file = File::open("test/data/uk_cities.csv").unwrap();
//!
//! let mut csv = csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, None);
//! let mut csv = csv::Reader::new(None, file, Arc::new(schema), false, None, 1024, None, None);
//! let batch = csv.next().unwrap().unwrap();
//! ```

use core::cmp::min;
use lazy_static::lazy_static;
use regex::{Regex, RegexBuilder};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
Expand Down Expand Up @@ -251,6 +251,8 @@ pub struct Reader<R: Read> {
projection: Option<Vec<usize>>,
/// File reader
reader: csv_crate::Reader<R>,
/// Current file
filename: Option<String>,
/// Current line number
line_number: usize,
/// Maximum number of rows to read
Expand Down Expand Up @@ -280,7 +282,9 @@ impl<R: Read> Reader<R> {
/// If reading a `File` or an input that supports `std::io::Read` and `std::io::Seek`;
/// you can customise the Reader, such as to enable schema inference, use
/// `ReaderBuilder`.
#[allow(clippy::too_many_arguments)]
pub fn new(
filename: Option<String>,
reader: R,
schema: SchemaRef,
has_header: bool,
Expand All @@ -290,7 +294,8 @@ impl<R: Read> Reader<R> {
projection: Option<Vec<usize>>,
) -> Self {
Self::from_reader(
reader, schema, has_header, delimiter, batch_size, bounds, projection,
filename, reader, schema, has_header, delimiter, batch_size, bounds,
projection,
)
}

Expand All @@ -313,7 +318,9 @@ impl<R: Read> Reader<R> {
///
/// This constructor allows you more flexibility in what records are processed by the
/// csv reader.
#[allow(clippy::too_many_arguments)]
pub fn from_reader(
filename: Option<String>,
reader: R,
schema: SchemaRef,
has_header: bool,
Expand Down Expand Up @@ -359,6 +366,7 @@ impl<R: Read> Reader<R> {
schema,
projection,
reader: csv_reader,
filename,
line_number: if has_header { start + 1 } else { start },
batch_size,
end,
Expand Down Expand Up @@ -406,6 +414,19 @@ impl<R: Read> Iterator for Reader<R> {

self.line_number += read_records;

let result = result.map(|batch| match self.filename.clone() {
Some(filename) => {
let mut metadata = HashMap::new();
metadata.insert("filename".to_string(), filename);
let schema = Arc::new(Schema::new_with_metadata(
batch.schema().fields().clone(),
metadata,
));
RecordBatch::try_new(schema, batch.columns().to_vec()).unwrap()
}
None => batch,
});

Some(result)
}
}
Expand Down Expand Up @@ -670,6 +691,8 @@ fn build_boolean_array(
/// CSV file reader builder
#[derive(Debug)]
pub struct ReaderBuilder {
/// Optional filename
filename: Option<String>,
/// Optional schema for the CSV file
///
/// If the schema is not supplied, the reader will try to infer the schema
Expand Down Expand Up @@ -699,6 +722,7 @@ pub struct ReaderBuilder {
impl Default for ReaderBuilder {
fn default() -> Self {
Self {
filename: None,
schema: None,
has_header: false,
delimiter: None,
Expand Down Expand Up @@ -738,6 +762,12 @@ impl ReaderBuilder {
ReaderBuilder::default()
}

/// Set the CSV file's schema
pub fn with_file_name(mut self, file_name: String) -> Self {
self.filename = Some(file_name);
self
}

/// Set the CSV file's schema
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
Expand Down Expand Up @@ -794,6 +824,7 @@ impl ReaderBuilder {
}
};
Ok(Reader::from_reader(
self.filename,
reader,
schema,
self.has_header,
Expand Down Expand Up @@ -827,6 +858,7 @@ mod tests {
let file = File::open("test/data/uk_cities.csv").unwrap();

let mut csv = Reader::new(
None,
file,
Arc::new(schema.clone()),
false,
Expand Down Expand Up @@ -874,6 +906,7 @@ mod tests {
let file = File::open("test/data/uk_cities.csv").unwrap();

let mut csv = Reader::new(
None,
file,
Arc::new(schema.clone()),
false,
Expand Down Expand Up @@ -905,6 +938,7 @@ mod tests {
.chain(Cursor::new("\n".to_string()))
.chain(file_without_headers);
let mut csv = Reader::from_reader(
None,
both_files,
Arc::new(schema),
true,
Expand Down Expand Up @@ -1002,6 +1036,7 @@ mod tests {
let file = File::open("test/data/uk_cities.csv").unwrap();

let mut csv = Reader::new(
None,
file,
Arc::new(schema),
false,
Expand Down Expand Up @@ -1031,7 +1066,8 @@ mod tests {

let file = File::open("test/data/null_test.csv").unwrap();

let mut csv = Reader::new(file, Arc::new(schema), true, None, 1024, None, None);
let mut csv =
Reader::new(None, file, Arc::new(schema), true, None, 1024, None, None);
let batch = csv.next().unwrap().unwrap();

assert_eq!(false, batch.column(1).is_null(0));
Expand Down Expand Up @@ -1227,6 +1263,7 @@ mod tests {
let reader = std::io::Cursor::new(data);

let mut csv = Reader::new(
None,
reader,
Arc::new(schema),
false,
Expand Down
1 change: 1 addition & 0 deletions rust/arrow/src/csv/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03\n";
buf.set_position(0);

let mut reader = Reader::new(
None,
buf,
Arc::new(schema),
false,
Expand Down
57 changes: 47 additions & 10 deletions rust/arrow/src/json/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
//! Field::new("c", DataType::Float64, false),
//! ]);
//!
//! let file = File::open("test/data/basic.json").unwrap();
//! let filename = "test/data/basic.json";
//! let file = File::open(filename).unwrap();
//!
//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema), 1024, None);
//! let mut json = json::Reader::new(Some(filename.to_string()), BufReader::new(file), Arc::new(schema), 1024, None);
//! let batch = json.next().unwrap().unwrap();
//! ```

use std::collections::HashMap as CollectionsHashMap;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::iter::FromIterator;
use std::sync::Arc;
Expand Down Expand Up @@ -559,11 +561,12 @@ where
/// use std::io::{BufReader, Seek, SeekFrom};
/// use std::sync::Arc;
///
/// let filename = "test/data/mixed_arrays.json";
/// let mut reader =
/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
/// BufReader::new(File::open(filename).unwrap());
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
/// let batch_size = 1024;
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None);
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None, Some(filename.to_string()));
///
/// // seek back to start so that the original file is usable again
/// reader.seek(SeekFrom::Start(0)).unwrap();
Expand All @@ -580,6 +583,8 @@ pub struct Decoder {
projection: Option<Vec<String>>,
/// Batch size (number of records to load each time)
batch_size: usize,
// filename
filename: Option<String>,
}

impl Decoder {
Expand All @@ -589,11 +594,13 @@ impl Decoder {
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
filename: Option<String>,
) -> Self {
Self {
schema,
projection,
batch_size,
filename,
}
}

Expand Down Expand Up @@ -661,7 +668,25 @@ impl Decoder {

let projected_schema = Arc::new(Schema::new(projected_fields));

arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
arrays.and_then(|arr| {
RecordBatch::try_new(projected_schema, arr).map(|batch| {
match self.filename.clone() {
Some(filename) => {
let mut metadata = CollectionsHashMap::new();
metadata.insert("filename".to_string(), filename);
let schema = Arc::new(Schema::new_with_metadata(
batch.schema().fields().clone(),
metadata,
));
Some(
RecordBatch::try_new(schema, batch.columns().to_vec())
.unwrap(),
)
}
None => Some(batch),
}
})
})
}

fn build_wrapped_list_array(
Expand Down Expand Up @@ -1422,26 +1447,34 @@ impl<R: Read> Reader<R> {
/// If reading a `File`, you can customise the Reader, such as to enable schema
/// inference, use `ReaderBuilder`.
pub fn new(
filename: Option<String>,
reader: R,
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Self {
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
Self::from_buf_reader(
filename,
BufReader::new(reader),
schema,
batch_size,
projection,
)
}

/// Create a new JSON Reader from a `BufReader<R: Read>`
///
/// To customize the schema, such as to enable schema inference, use `ReaderBuilder`
pub fn from_buf_reader(
filename: Option<String>,
reader: BufReader<R>,
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Self {
Self {
reader,
decoder: Decoder::new(schema, batch_size, projection),
decoder: Decoder::new(schema, batch_size, projection, filename),
}
}

Expand Down Expand Up @@ -1561,6 +1594,7 @@ impl ReaderBuilder {
};

Ok(Reader::from_buf_reader(
None,
buf_reader,
schema,
self.batch_size,
Expand Down Expand Up @@ -1708,6 +1742,7 @@ mod tests {
]);

let mut reader: Reader<File> = Reader::new(
None,
File::open("test/data/basic.json").unwrap(),
Arc::new(schema.clone()),
1024,
Expand Down Expand Up @@ -1760,6 +1795,7 @@ mod tests {
]);

let mut reader: Reader<File> = Reader::new(
None,
File::open("test/data/basic.json").unwrap(),
Arc::new(schema),
1024,
Expand Down Expand Up @@ -1929,7 +1965,8 @@ mod tests {
file.seek(SeekFrom::Start(0)).unwrap();

let reader = BufReader::new(GzDecoder::new(&file));
let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), 64, None);
let mut reader =
Reader::from_buf_reader(None, reader, Arc::new(schema), 64, None);
let batch_gz = reader.next().unwrap().unwrap();

for batch in vec![batch, batch_gz] {
Expand Down Expand Up @@ -2886,7 +2923,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, None);
let decoder = Decoder::new(Arc::new(schema), 1024, None, None);
let batch = decoder
.next_batch(
&mut vec![
Expand Down Expand Up @@ -2921,7 +2958,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, None);
let decoder = Decoder::new(Arc::new(schema), 1024, None, None);
let batch = decoder
.next_batch(
// NOTE: total struct element count needs to be greater than
Expand Down
8 changes: 4 additions & 4 deletions rust/datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::{
array::{ArrayRef, Float32Array, Float64Array},
datatypes::DataType,
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
util::pretty,
};

use datafusion::prelude::*;
use datafusion::{error::Result, physical_plan::functions::make_scalar_function};
use std::sync::Arc;

// create local execution context with an in-memory table
fn create_context() -> Result<ExecutionContext> {
use arrow::datatypes::{Field, Schema};
use datafusion::datasource::MemTable;
// define a schema.
let schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn main() -> Result<()> {
let mut ctx = create_context()?;

// First, declare the actual implementation of the calculation
let pow = |args: &[ArrayRef]| {
let pow = |args: &[ArrayRef], _: &Schema| {
// in DataFusion, all `args` and output are dynamically-typed arrays, which means that we need to:
// 1. cast the values to the type we want
// 2. perform the computation for every element in the array (using a loop or SIMD) and construct the result
Expand Down
Loading