From 6c257efd35c5472b8d554990907c73464528d25c Mon Sep 17 00:00:00 2001 From: Xavier Lange Date: Mon, 14 Sep 2020 13:49:33 +0200 Subject: [PATCH] ARROW-5123: [Rust] Parquet derive for simple structs A rebase and significant rewrite of https://github.com/sunchao/parquet-rs/pull/197 Big improvement: I now use a more natural nested enum style, it helps break out what patterns of data types are . The rest of the broad strokes still apply. Goal === Writing many columns to a file is a chore. If you can put your values in to a struct which mirrors the schema of your file, this `derive(ParquetRecordWriter)` will write out all the fields, in the order in which they are defined, to a row_group. How to Use === ``` extern crate parquet; #[macro_use] extern crate parquet_derive; #[derive(ParquetRecordWriter)] struct ACompleteRecord<'a> { pub a_bool: bool, pub a_str: &'a str, } ``` RecordWriter trait === This is the new trait which `parquet_derive` will implement for your structs. ``` use super::RowGroupWriter; pub trait RecordWriter { fn write_to_row_group(&self, row_group_writer: &mut Box); } ``` How does it work? === The `parquet_derive` crate adds code generating functionality to the rust compiler. The code generation takes rust syntax and emits additional syntax. This macro expansion works on rust 1.15+ stable. This is a dynamic plugin, loaded by the machinery in cargo. Users don't have to do any special `build.rs` steps or anything like that, it's automatic by including `parquet_derive` in their project. The `parquet_derive/src/Cargo.toml` has a section saying as much: ``` [lib] proc-macro = true ``` The rust struct tagged with `#[derive(ParquetRecordWriter)]` is provided to the `parquet_record_writer` function in `parquet_derive/src/lib.rs`. The `syn` crate parses the struct from a string-representation to a AST (a recursive enum value). The AST contains all the values I care about when generating a `RecordWriter` impl: - the name of the struct - the lifetime variables of the struct - the fields of the struct The fields of the struct are translated from AST to a flat `FieldInfo` struct. It has the bits I care about for writing a column: `field_name`, `field_lifetime`, `field_type`, `is_option`, `column_writer_variant`. The code then does the equivalent of templating to build the `RecordWriter` implementation. The templating functionality is provided by the `quote` crate. At a high-level the template for `RecordWriter` looks like: ``` impl RecordWriter for $struct_name { fn write_row_group(..) { $({ $column_writer_snippet }) } } ``` this template is then added under the struct definition, ending up something like: ``` struct MyStruct { } impl RecordWriter for MyStruct { fn write_row_group(..) { { write_col_1(); }; { write_col_2(); } } } ``` and finally _THIS_ is the code passed to rustc. It's just code now, fully expanded and standalone. If a user ever changes their `struct MyValue` definition the `ParquetRecordWriter` will be regenerated. There's no intermediate values to version control or worry about. Viewing the Derived Code === To see the generated code before it's compiled, one very useful bit is to install `cargo expand` [more info on gh](https://github.com/dtolnay/cargo-expand), then you can do: ``` $WORK_DIR/parquet-rs/parquet_derive_test cargo expand --lib > ../temp.rs ``` then you can dump the contents: ``` struct DumbRecord { pub a_bool: bool, pub a2_bool: bool, } impl RecordWriter for &[DumbRecord] { fn write_to_row_group( &self, row_group_writer: &mut Box, ) { let mut row_group_writer = row_group_writer; { let vals: Vec = self.iter().map(|x| x.a_bool).collect(); let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); if let parquet::column::writer::ColumnWriter::BoolColumnWriter(ref mut typed) = column_writer { typed.write_batch(&vals[..], None, None).unwrap(); } row_group_writer.close_column(column_writer).unwrap(); }; { let vals: Vec = self.iter().map(|x| x.a2_bool).collect(); let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); if let parquet::column::writer::ColumnWriter::BoolColumnWriter(ref mut typed) = column_writer { typed.write_batch(&vals[..], None, None).unwrap(); } row_group_writer.close_column(column_writer).unwrap(); } } } ``` now I need to write out all the combinations of types we support and make sure it writes out data. Procedural Macros === The `parquet_derive` crate can ONLY export the derivation functionality. No traits, nothing else. The derive crate can not host test cases. It's kind of like a "dummy" crate which is only used by the compiler, never the code. The parent crate cannot use the derivation functionality, which is important because it means test code cannot be in the parent crate. This forces us to have a third crate, `parquet_derive_test`. I'm open to being wrong on any one of these finer points. I had to bang on this for a while to get it to compile! Potentials For Better Design === - [x] Recursion could be limited by generating the code as "snippets" instead of one big `quote!` AST generator. Or so I think. It might be nicer to push generating each columns writing code to another loop. - [X] ~~It would be nicer if I didn't have to be so picky about data going in to the `write_batch` function. Is it possible we could make a version of the function which accept `Into` or similar? This would greatly simplify this derivation code as it would not need to enumerate all the supported types. Something like `write_generic_batch(&[impl Into])` would be neat.~~ (not tackling in this generation of the plugin) - [X] ~~Another idea to improving writing columns, could we have a write function for `Iterator`s? I already have a `Vec`, if I could just write a mapping for accessing the one value, we could skip the whole intermediate vec for `write_batch`. Should have some significant memory advantages.~~ (not tackling in this generation of the plugin, it's a bigger parquet-rs enhancement) - [X] ~~It might be worthwhile to derive a parquet schema directly from a struct definition. That should stamp out opportunities for type errors.~~ (moved to #203) Status === I have successfully integrated this work with my own data exporter (takes postgres/couchdb and outputs a single parquet file). I think this code is worth including in the project, with the caveat that it only generates simplistic `RecordWriter`s. As people start to use we can add code generation for more complex, nested structs. We can convert the nested matching style to a fancier looping style. But for now, this explicit nesting is easier to debug and understand (to me at least!). Closes #4140 from xrl/parquet_derive Lead-authored-by: Xavier Lange Co-authored-by: Neville Dipale Co-authored-by: Bryant Biggs Co-authored-by: Sutou Kouhei Signed-off-by: Neville Dipale --- .dockerignore | 2 + ci/docker/debian-10-rust.dockerfile | 8 +- dev/release/00-prepare-test.rb | 58 ++ dev/release/00-prepare.sh | 26 +- rust/Cargo.toml | 2 + rust/parquet/src/record/mod.rs | 6 +- rust/parquet/src/record/record_writer.rs | 26 + rust/parquet_derive/Cargo.toml | 37 + rust/parquet_derive/README.md | 98 +++ rust/parquet_derive/src/lib.rs | 126 +++ rust/parquet_derive/src/parquet_field.rs | 931 +++++++++++++++++++++++ rust/parquet_derive_test/Cargo.toml | 27 + rust/parquet_derive_test/src/lib.rs | 129 ++++ 13 files changed, 1453 insertions(+), 23 deletions(-) create mode 100644 rust/parquet/src/record/record_writer.rs create mode 100644 rust/parquet_derive/Cargo.toml create mode 100644 rust/parquet_derive/README.md create mode 100644 rust/parquet_derive/src/lib.rs create mode 100644 rust/parquet_derive/src/parquet_field.rs create mode 100644 rust/parquet_derive_test/Cargo.toml create mode 100644 rust/parquet_derive_test/src/lib.rs diff --git a/.dockerignore b/.dockerignore index 083905c74396a..eb71138c679a5 100644 --- a/.dockerignore +++ b/.dockerignore @@ -55,6 +55,8 @@ !rust/arrow-flight/Cargo.toml !rust/parquet/Cargo.toml !rust/parquet/build.rs +!rust/parquet_derive/Cargo.toml +!rust/parquet_derive_test/Cargo.toml !rust/datafusion/Cargo.toml !rust/datafusion/benches !rust/integration-testing/Cargo.toml diff --git a/ci/docker/debian-10-rust.dockerfile b/ci/docker/debian-10-rust.dockerfile index b23b03c9a1c6c..9c9c9b510484e 100644 --- a/ci/docker/debian-10-rust.dockerfile +++ b/ci/docker/debian-10-rust.dockerfile @@ -58,14 +58,18 @@ RUN mkdir \ /arrow/rust/benchmarks/src \ /arrow/rust/datafusion/src \ /arrow/rust/integration-testing/src \ - /arrow/rust/parquet/src && \ + /arrow/rust/parquet/src \ + /arrow/rust/parquet_derive/src \ + /arrow/rust/parquet_derive_test/src && \ touch \ /arrow/rust/arrow-flight/src/lib.rs \ /arrow/rust/arrow/src/lib.rs \ /arrow/rust/benchmarks/src/lib.rs \ /arrow/rust/datafusion/src/lib.rs \ /arrow/rust/integration-testing/src/lib.rs \ - /arrow/rust/parquet/src/lib.rs + /arrow/rust/parquet/src/lib.rs \ + /arrow/rust/parquet_derive/src/lib.rs \ + /arrow/rust/parquet_derive_test/src/lib.rs # Compile dependencies for the whole workspace RUN cd /arrow/rust && cargo build --workspace --lib --all-features diff --git a/dev/release/00-prepare-test.rb b/dev/release/00-prepare-test.rb index e6841aa85a8be..eb5859c842471 100644 --- a/dev/release/00-prepare-test.rb +++ b/dev/release/00-prepare-test.rb @@ -330,6 +330,35 @@ def test_version_pre_tag "+See [crate documentation](https://docs.rs/crate/parquet/#{@release_version}) on available API."], ], }, + { + path: "rust/parquet_derive/Cargo.toml", + hunks: [ + ["-version = \"#{@snapshot_version}\"", + "+version = \"#{@release_version}\""], + ["-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }", + "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"], + ], + }, + { + path: "rust/parquet_derive/README.md", + hunks: [ + ["-parquet = \"#{@snapshot_version}\"", + "-parquet_derive = \"#{@snapshot_version}\"", + "+parquet = \"#{@release_version}\"", + "+parquet_derive = \"#{@release_version}\""], + ], + }, + { + path: "rust/parquet_derive_test/Cargo.toml", + hunks: [ + ["-version = \"#{@snapshot_version}\"", + "+version = \"#{@release_version}\"", + "-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }", + "-parquet_derive = { path = \"../parquet_derive\", version = \"#{@snapshot_version}\" }", + "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", + "+parquet_derive = { path = \"../parquet_derive\", version = \"#{@release_version}\" }"], + ], + }, ], parse_patch(git("log", "-n", "1", "-p"))) end @@ -537,6 +566,35 @@ def test_version_post_tag "+See [crate documentation](https://docs.rs/crate/parquet/#{@next_snapshot_version}) on available API."], ], }, + { + path: "rust/parquet_derive/Cargo.toml", + hunks: [ + ["-version = \"#{@release_version}\"", + "+version = \"#{@next_snapshot_version}\""], + ["-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", + "+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"], + ], + }, + { + path: "rust/parquet_derive/README.md", + hunks: [ + ["-parquet = \"#{@release_version}\"", + "-parquet_derive = \"#{@release_version}\"", + "+parquet = \"#{@next_snapshot_version}\"", + "+parquet_derive = \"#{@next_snapshot_version}\""], + ], + }, + { + path: "rust/parquet_derive_test/Cargo.toml", + hunks: [ + ["-version = \"#{@release_version}\"", + "+version = \"#{@next_snapshot_version}\"", + "-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", + "-parquet_derive = { path = \"../parquet_derive\", version = \"#{@release_version}\" }", + "+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }", + "+parquet_derive = { path = \"../parquet_derive\", version = \"#{@next_snapshot_version}\" }"], + ], + }, ], parse_patch(git("log", "-n", "1", "-p"))) end diff --git a/dev/release/00-prepare.sh b/dev/release/00-prepare.sh index 1678c0228da49..df5283e01fa62 100755 --- a/dev/release/00-prepare.sh +++ b/dev/release/00-prepare.sh @@ -151,29 +151,17 @@ update_versions() { -e "s/^(arrow = .* version = )\".*\"(( .*)|(, features = .*))$/\\1\"${version}\"\\2/g" \ -e "s/^(arrow-flight = .* version = )\".+\"( .*)/\\1\"${version}\"\\2/g" \ -e "s/^(parquet = .* version = )\".*\"(( .*)|(, features = .*))$/\\1\"${version}\"\\2/g" \ + -e "s/^(parquet_derive = .* version = )\".*\"(( .*)|(, features = .*))$/\\1\"${version}\"\\2/g" \ */Cargo.toml rm -f */Cargo.toml.bak git add */Cargo.toml - # Update version number for parquet README - sed -i.bak -E -e \ - "s/^parquet = \".+\"/parquet = \"${version}\"/g" \ - parquet/README.md - sed -i.bak -E -e \ - "s/docs.rs\/crate\/parquet\/.+\)/docs.rs\/crate\/parquet\/${version}\)/g" \ - parquet/README.md - rm -f parquet/README.md.bak - git add parquet/README.md - - # Update version number for datafusion README - sed -i.bak -E -e \ - "s/^datafusion = \".+\"/datafusion = \"${version}\"/g" \ - datafusion/README.md - sed -i.bak -E -e \ - "s/docs.rs\/crate\/datafusion\/.+\)/docs.rs\/crate\/datafusion\/${version}\)/g" \ - datafusion/README.md - rm -f datafusion/README.md.bak - git add datafusion/README.md + sed -i.bak -E \ + -e "s/^([^ ]+) = \".+\"/\\1 = \"${version}\"/g" \ + -e "s,docs\.rs/crate/([^/]+)/[^)]+,docs.rs/crate/\\1/${version},g" \ + */README.md + rm -f */README.md.bak + git add */README.md cd - } diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 0cb529fb690f4..459fe8fd4edd1 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -19,6 +19,8 @@ members = [ "arrow", "parquet", + "parquet_derive", + "parquet_derive_test", "datafusion", "arrow-flight", "integration-testing", diff --git a/rust/parquet/src/record/mod.rs b/rust/parquet/src/record/mod.rs index 4427ada72af21..ab61514846006 100644 --- a/rust/parquet/src/record/mod.rs +++ b/rust/parquet/src/record/mod.rs @@ -19,8 +19,10 @@ mod api; pub mod reader; +mod record_writer; mod triplet; -pub use self::api::{ - List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowFormatter, +pub use self::{ + api::{List, ListAccessor, Map, MapAccessor, Row, RowAccessor}, + record_writer::RecordWriter, }; diff --git a/rust/parquet/src/record/record_writer.rs b/rust/parquet/src/record/record_writer.rs new file mode 100644 index 0000000000000..00ce9fd4e47a3 --- /dev/null +++ b/rust/parquet/src/record/record_writer.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::super::errors::ParquetError; +use super::super::file::writer::RowGroupWriter; + +pub trait RecordWriter { + fn write_to_row_group( + &self, + row_group_writer: &mut Box, + ) -> Result<(), ParquetError>; +} diff --git a/rust/parquet_derive/Cargo.toml b/rust/parquet_derive/Cargo.toml new file mode 100644 index 0000000000000..b4debaf410b39 --- /dev/null +++ b/rust/parquet_derive/Cargo.toml @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "parquet_derive" +version = "2.0.0-SNAPSHOT" +authors = ["Apache Arrow "] +keywords = [ "parquet" ] +edition = "2018" + +[lib] +proc-macro = true + +[features] +chrono = [] +bigdecimal = [] +uuid = [] + +[dependencies] +proc-macro2 = "1.0.8" +quote = "1.0.2" +syn = { version = "1.0.14", features = ["full", "extra-traits"] } +parquet = { path = "../parquet", version = "2.0.0-SNAPSHOT" } diff --git a/rust/parquet_derive/README.md b/rust/parquet_derive/README.md new file mode 100644 index 0000000000000..6d3f6696ef79c --- /dev/null +++ b/rust/parquet_derive/README.md @@ -0,0 +1,98 @@ + + +# Parquet Derive + +A crate for deriving `RecordWriter` for arbitrary, _simple_ structs. This does not generate writers for arbitrarily nested +structures. It only works for primitives and a few generic structures and +various levels of reference. Please see features checklist for what is currently +supported. + +Derive also has some support for the chrono time library. You must must enable the `chrono` feature to get this support. + +## Usage +Add this to your Cargo.toml: +```toml +[dependencies] +parquet = "2.0.0-SNAPSHOT" +parquet_derive = "2.0.0-SNAPSHOT" +``` + +and this to your crate root: +```rust +extern crate parquet; +#[macro_use] extern crate parquet_derive; +``` + +Example usage of deriving a `RecordWriter` for your struct: + +```rust +use parquet; +use parquet::record::RecordWriter; + +#[derive(ParquetRecordWriter)] +struct ACompleteRecord<'a> { + pub a_bool: bool, + pub a_str: &'a str, + pub a_string: String, + pub a_borrowed_string: &'a String, + pub maybe_a_str: Option<&'a str>, + pub magic_number: i32, + pub low_quality_pi: f32, + pub high_quality_pi: f64, + pub maybe_pi: Option, + pub maybe_best_pi: Option, +} + +// Initialize your parquet file +let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); +let mut row_group = writer.next_row_group().unwrap(); + +// Build up your records +let chunks = vec![ACompleteRecord{...}]; + +// The derived `RecordWriter` takes over here +(&chunks[..]).write_to_row_group(&mut row_group); + +writer.close_row_group(row_group).unwrap(); +writer.close().unwrap(); +``` + +## Features +- [X] Support writing `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec` +- [ ] Support writing dictionaries +- [X] Support writing logical types like timestamp +- [X] Derive definition_levels for `Option` +- [ ] Derive definition levels for nested structures +- [ ] Derive writing tuple struct +- [ ] Derive writing `tuple` container types + +## Requirements +- Same as `parquet-rs` + +## Test +Testing a `*_derive` crate requires an intermediate crate. Go to `parquet_derive_test` and run `cargo test` for +unit tests. + +## Docs +To build documentation, run `cargo doc --no-deps`. +To compile and view in the browser, run `cargo doc --no-deps --open`. + +## License +Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0. diff --git a/rust/parquet_derive/src/lib.rs b/rust/parquet_derive/src/lib.rs new file mode 100644 index 0000000000000..35a538f932c00 --- /dev/null +++ b/rust/parquet_derive/src/lib.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#![recursion_limit = "128"] + +extern crate proc_macro; +extern crate proc_macro2; +extern crate syn; +#[macro_use] +extern crate quote; + +extern crate parquet; + +use syn::{parse_macro_input, Data, DataStruct, DeriveInput}; + +mod parquet_field; + +/// Derive flat, simple RecordWriter implementations. Works by parsing +/// a struct tagged with `#[derive(ParquetRecordWriter)]` and emitting +/// the correct writing code for each field of the struct. Column writers +/// are generated in the order they are defined. +/// +/// It is up to the programmer to keep the order of the struct +/// fields lined up with the schema. +/// +/// Example: +/// +/// ```ignore +/// use parquet; +/// use parquet::record::RecordWriter; +/// use parquet::schema::parser::parse_message_type; +/// +/// use std::rc::Rc; +// +/// #[derive(ParquetRecordWriter)] +/// struct ACompleteRecord<'a> { +/// pub a_bool: bool, +/// pub a_str: &'a str, +/// } +/// +/// let schema_str = "message schema { +/// REQUIRED boolean a_bool; +/// REQUIRED BINARY a_str (UTF8); +/// }"; +/// +/// pub fn write_some_records() { +/// let samples = vec![ +/// ACompleteRecord { +/// a_bool: true, +/// a_str: "I'm true" +/// }, +/// ACompleteRecord { +/// a_bool: false, +/// a_str: "I'm false" +/// } +/// ]; +/// +/// let schema = Rc::new(parse_message_type(schema_str).unwrap()); +/// +/// let props = Rc::new(WriterProperties::builder().build()); +/// let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); +/// +/// let mut row_group = writer.next_row_group().unwrap(); +/// samples.as_slice().write_to_row_group(&mut row_group).unwrap(); +/// writer.close_row_group(row_group).unwrap(); +/// writer.close().unwrap(); +/// } +/// ``` +/// +#[proc_macro_derive(ParquetRecordWriter)] +pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input: DeriveInput = parse_macro_input!(input as DeriveInput); + let fields = match input.data { + Data::Struct(DataStruct { fields, .. }) => fields, + Data::Enum(_) => unimplemented!("Enum currently is not supported"), + Data::Union(_) => unimplemented!("Union currently is not supported"), + }; + + let field_infos: Vec<_> = fields + .iter() + .map(|f: &syn::Field| parquet_field::Field::from(f)) + .collect(); + + let writer_snippets: Vec = + field_infos.iter().map(|x| x.writer_snippet()).collect(); + + let derived_for = input.ident; + let generics = input.generics; + + (quote! { + impl#generics RecordWriter<#derived_for#generics> for &[#derived_for#generics] { + fn write_to_row_group(&self, row_group_writer: &mut Box) -> Result<(), parquet::errors::ParquetError> { + let mut row_group_writer = row_group_writer; + let records = &self; // Used by all the writer snippets to be more clear + + #( + { + let mut some_column_writer = row_group_writer.next_column().unwrap(); + if let Some(mut column_writer) = some_column_writer { + #writer_snippets + row_group_writer.close_column(column_writer)?; + } else { + return Err(parquet::errors::ParquetError::General("Failed to get next column".into())) + } + } + );* + + Ok(()) + } + } + }).into() +} diff --git a/rust/parquet_derive/src/parquet_field.rs b/rust/parquet_derive/src/parquet_field.rs new file mode 100644 index 0000000000000..6b74743ce3490 --- /dev/null +++ b/rust/parquet_derive/src/parquet_field.rs @@ -0,0 +1,931 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[derive(Debug, PartialEq)] +pub struct Field { + ident: syn::Ident, + ty: Type, + is_a_byte_buf: bool, + third_party_type: Option, +} + +/// Use third party libraries, detected +/// at compile time. These libraries will +/// be written to parquet as their preferred +/// physical type. +/// +/// ChronoNaiveDateTime is written as i64 +/// ChronoNaiveDate is written as i32 +#[derive(Debug, PartialEq)] +enum ThirdPartyType { + ChronoNaiveDateTime, + ChronoNaiveDate, + Uuid, +} + +impl Field { + pub fn from(f: &syn::Field) -> Self { + let ty = Type::from(f); + let is_a_byte_buf = ty.physical_type() == parquet::basic::Type::BYTE_ARRAY; + + let third_party_type = match &ty.last_part()[..] { + "NaiveDateTime" => Some(ThirdPartyType::ChronoNaiveDateTime), + "NaiveDate" => Some(ThirdPartyType::ChronoNaiveDate), + "Uuid" => Some(ThirdPartyType::Uuid), + _ => None, + }; + + Field { + ident: f + .ident + .clone() + .expect("Only structs with named fields are currently supported"), + ty, + is_a_byte_buf, + third_party_type, + } + } + + /// Takes the parsed field of the struct and emits a valid + /// column writer snippet. Should match exactly what you + /// would write by hand. + /// + /// Can only generate writers for basic structs, for example: + /// + /// struct Record { + /// a_bool: bool, + /// maybe_a_bool: Option + /// } + /// + /// but not + /// + /// struct UnsupportedNestedRecord { + /// a_property: bool, + /// nested_record: Record + /// } + /// + /// because this parsing logic is not sophisticated enough for definition + /// levels beyond 2. + pub fn writer_snippet(&self) -> proc_macro2::TokenStream { + let ident = &self.ident; + let column_writer = self.ty.column_writer(); + + let vals_builder = match &self.ty { + Type::TypePath(_) => self.copied_direct_vals(), + Type::Option(ref first_type) => match **first_type { + Type::TypePath(_) => self.option_into_vals(), + Type::Reference(_, ref second_type) => match **second_type { + Type::TypePath(_) => self.option_into_vals(), + _ => unimplemented!("Unsupported type encountered"), + }, + ref f @ _ => unimplemented!("Unsupported: {:#?}", f), + }, + Type::Reference(_, ref first_type) => match **first_type { + Type::TypePath(_) => self.copied_direct_vals(), + Type::Option(ref second_type) => match **second_type { + Type::TypePath(_) => self.option_into_vals(), + Type::Reference(_, ref second_type) => match **second_type { + Type::TypePath(_) => self.option_into_vals(), + _ => unimplemented!("Unsupported type encountered"), + }, + ref f @ _ => unimplemented!("Unsupported: {:#?}", f), + }, + ref f @ _ => unimplemented!("Unsupported: {:#?}", f), + }, + f @ _ => unimplemented!("Unsupported: {:#?}", f), + }; + + let definition_levels = match &self.ty { + Type::TypePath(_) => None, + Type::Option(ref first_type) => match **first_type { + Type::TypePath(_) => Some(self.optional_definition_levels()), + Type::Option(_) => unimplemented!("Unsupported nesting encountered"), + Type::Reference(_, ref second_type) + | Type::Vec(ref second_type) + | Type::Array(ref second_type) => match **second_type { + Type::TypePath(_) => Some(self.optional_definition_levels()), + _ => unimplemented!("Unsupported nesting encountered"), + }, + }, + Type::Reference(_, ref first_type) + | Type::Vec(ref first_type) + | Type::Array(ref first_type) => match **first_type { + Type::TypePath(_) => None, + Type::Reference(_, ref second_type) + | Type::Vec(ref second_type) + | Type::Array(ref second_type) + | Type::Option(ref second_type) => match **second_type { + Type::TypePath(_) => Some(self.optional_definition_levels()), + Type::Reference(_, ref third_type) => match **third_type { + Type::TypePath(_) => Some(self.optional_definition_levels()), + _ => unimplemented!("Unsupported definition encountered"), + }, + _ => unimplemented!("Unsupported definition encountered"), + }, + }, + }; + + // "vals" is the run of primitive data being written for the column + // "definition_levels" is a vector of bools which controls whether a value is missing or present + // this TokenStream is only one part of the code for writing a column and + // it relies on values calculated in prior code snippets, namely "definition_levels" and "vals_builder". + // All the context is put together in this functions final quote and + // this expression just switches between non-nullable and nullable write statements + let write_batch_expr = if definition_levels.is_some() { + quote! { + if let #column_writer(ref mut typed) = column_writer { + typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{#ident}) + } + } + } else { + quote! { + if let #column_writer(ref mut typed) = column_writer { + typed.write_batch(&vals[..], None, None)?; + } else { + panic!("Schema and struct disagree on type for {}", stringify!{#ident}) + } + } + }; + + quote! { + { + #definition_levels + + #vals_builder + + #write_batch_expr + } + } + } + + fn option_into_vals(&self) -> proc_macro2::TokenStream { + let field_name = &self.ident; + let is_a_byte_buf = self.is_a_byte_buf; + let is_a_timestamp = + self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime); + let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate); + let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid); + let copy_to_vec = match self.ty.physical_type() { + parquet::basic::Type::BYTE_ARRAY + | parquet::basic::Type::FIXED_LEN_BYTE_ARRAY => false, + _ => true, + }; + + let binding = if copy_to_vec { + quote! { let Some(inner) = rec.#field_name } + } else { + quote! { let Some(ref inner) = rec.#field_name } + }; + + let some = if is_a_timestamp { + quote! { Some(inner.timestamp_millis()) } + } else if is_a_date { + quote! { Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) } + } else if is_a_uuid { + quote! { Some((&inner.to_string()[..]).into()) } + } else if is_a_byte_buf { + quote! { Some((&inner[..]).into())} + } else { + quote! { Some(inner) } + }; + + quote! { + let vals: Vec<_> = records.iter().filter_map(|rec| { + if #binding { + #some + } else { + None + } + }).collect(); + } + } + + fn copied_direct_vals(&self) -> proc_macro2::TokenStream { + let field_name = &self.ident; + let is_a_byte_buf = self.is_a_byte_buf; + let is_a_timestamp = + self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime); + let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate); + let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid); + + let access = if is_a_timestamp { + quote! { rec.#field_name.timestamp_millis() } + } else if is_a_date { + quote! { rec.#field_name.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 } + } else if is_a_uuid { + quote! { (&rec.#field_name.to_string()[..]).into() } + } else if is_a_byte_buf { + quote! { (&rec.#field_name[..]).into() } + } else { + quote! { rec.#field_name } + }; + + quote! { + let vals: Vec<_> = records.iter().map(|rec| #access).collect(); + } + } + + fn optional_definition_levels(&self) -> proc_macro2::TokenStream { + let field_name = &self.ident; + + quote! { + let definition_levels: Vec = self + .iter() + .map(|rec| if rec.#field_name.is_some() { 1 } else { 0 }) + .collect(); + } + } +} + +#[derive(Debug, PartialEq)] +enum Type { + Array(Box), + Option(Box), + Vec(Box), + TypePath(syn::Type), + Reference(Option, Box), +} + +impl Type { + /// Takes a rust type and returns the appropriate + /// parquet-rs column writer + fn column_writer(&self) -> syn::TypePath { + use parquet::basic::Type as BasicType; + + match self.physical_type() { + BasicType::BOOLEAN => { + syn::parse_quote!(parquet::column::writer::ColumnWriter::BoolColumnWriter) + } + BasicType::INT32 => syn::parse_quote!( + parquet::column::writer::ColumnWriter::Int32ColumnWriter + ), + BasicType::INT64 => syn::parse_quote!( + parquet::column::writer::ColumnWriter::Int64ColumnWriter + ), + BasicType::INT96 => syn::parse_quote!( + parquet::column::writer::ColumnWriter::Int96ColumnWriter + ), + BasicType::FLOAT => syn::parse_quote!( + parquet::column::writer::ColumnWriter::FloatColumnWriter + ), + BasicType::DOUBLE => syn::parse_quote!( + parquet::column::writer::ColumnWriter::DoubleColumnWriter + ), + BasicType::BYTE_ARRAY => syn::parse_quote!( + parquet::column::writer::ColumnWriter::ByteArrayColumnWriter + ), + BasicType::FIXED_LEN_BYTE_ARRAY => syn::parse_quote!( + parquet::column::writer::ColumnWriter::FixedLenByteArrayColumnWriter + ), + } + } + + /// Helper to simplify a nested field definition to its leaf type + /// + /// Ex: + /// Option<&String> => Type::TypePath(String) + /// &Option => Type::TypePath(i32) + /// Vec> => Type::Vec(u8) + /// + /// Useful in determining the physical type of a field and the + /// definition levels. + fn leaf_type_recursive(&self) -> &Type { + self.leaf_type_recursive_helper(self, None) + } + + fn leaf_type_recursive_helper<'a>( + &'a self, + ty: &'a Type, + parent_ty: Option<&'a Type>, + ) -> &Type { + match ty { + Type::TypePath(_) => parent_ty.unwrap_or(ty), + Type::Option(ref first_type) + | Type::Vec(ref first_type) + | Type::Array(ref first_type) + | Type::Reference(_, ref first_type) => { + self.leaf_type_recursive_helper(first_type, Some(ty)) + } + } + } + + /// Helper method to further unwrap leaf_type() to get inner-most + /// type information, useful for determining the physical type + /// and normalizing the type paths. + fn inner_type(&self) -> &syn::Type { + let leaf_type = self.leaf_type_recursive(); + + match leaf_type { + Type::TypePath(ref type_) => type_, + Type::Option(ref first_type) + | Type::Vec(ref first_type) + | Type::Array(ref first_type) + | Type::Reference(_, ref first_type) => match **first_type { + Type::TypePath(ref type_) => type_, + _ => unimplemented!("leaf_type() should only return shallow types"), + }, + } + } + + /// Helper to normalize a type path by extracting the + /// most identifiable part + /// + /// Ex: + /// std::string::String => String + /// Vec => Vec + /// chrono::NaiveDateTime => NaiveDateTime + /// + /// Does run the risk of mis-identifying a type if import + /// rename is in play. Please note procedural macros always + /// run before type resolution so this is a risk the user + /// takes on when renaming imports. + fn last_part(&self) -> String { + let inner_type = self.inner_type(); + let inner_type_str = (quote! { #inner_type }).to_string(); + + inner_type_str + .split("::") + .last() + .unwrap() + .trim() + .to_string() + } + + /// Converts rust types to parquet physical types. + /// + /// Ex: + /// [u8; 10] => FIXED_LEN_BYTE_ARRAY + /// Vec => BYTE_ARRAY + /// String => BYTE_ARRAY + /// i32 => INT32 + fn physical_type(&self) -> parquet::basic::Type { + use parquet::basic::Type as BasicType; + + let last_part = self.last_part(); + let leaf_type = self.leaf_type_recursive(); + + match leaf_type { + Type::Array(ref first_type) => { + if let Type::TypePath(_) = **first_type { + if last_part == "u8" { + return BasicType::FIXED_LEN_BYTE_ARRAY; + } + } + } + Type::Vec(ref first_type) => { + if let Type::TypePath(_) = **first_type { + if last_part == "u8" { + return BasicType::BYTE_ARRAY; + } + } + } + _ => (), + } + + match last_part.trim() { + "bool" => BasicType::BOOLEAN, + "u8" | "u16" | "u32" => BasicType::INT32, + "i8" | "i16" | "i32" | "NaiveDate" => BasicType::INT32, + "u64" | "i64" | "usize" | "NaiveDateTime" => BasicType::INT64, + "f32" => BasicType::FLOAT, + "f64" => BasicType::DOUBLE, + "String" | "str" | "Uuid" => BasicType::BYTE_ARRAY, + f @ _ => unimplemented!("{} currently is not supported", f), + } + } + + /// Convert a parsed rust field AST in to a more easy to manipulate + /// parquet_derive::Field + fn from(f: &syn::Field) -> Self { + Type::from_type(f, &f.ty) + } + + fn from_type(f: &syn::Field, ty: &syn::Type) -> Self { + match ty { + syn::Type::Path(ref p) => Type::from_type_path(f, p), + syn::Type::Reference(ref tr) => Type::from_type_reference(f, tr), + syn::Type::Array(ref ta) => Type::from_type_array(f, ta), + other @ _ => unimplemented!( + "Unable to derive {:?} - it is currently an unsupported type\n{:#?}", + f.ident.as_ref().unwrap(), + other + ), + } + } + + fn from_type_path(f: &syn::Field, p: &syn::TypePath) -> Self { + let last_segment = p.path.segments.last().unwrap(); + + let is_vec = + last_segment.ident == syn::Ident::new("Vec", proc_macro2::Span::call_site()); + let is_option = last_segment.ident + == syn::Ident::new("Option", proc_macro2::Span::call_site()); + + if is_vec || is_option { + let generic_type = match &last_segment.arguments { + syn::PathArguments::AngleBracketed(angle_args) => { + assert_eq!(angle_args.args.len(), 1); + let first_arg = &angle_args.args[0]; + + match first_arg { + syn::GenericArgument::Type(ref typath) => typath.clone(), + other @ _ => unimplemented!("Unsupported: {:#?}", other), + } + } + other @ _ => unimplemented!("Unsupported: {:#?}", other), + }; + + if is_vec { + Type::Vec(Box::new(Type::from_type(f, &generic_type))) + } else { + Type::Option(Box::new(Type::from_type(f, &generic_type))) + } + } else { + Type::TypePath(syn::Type::Path(p.clone())) + } + } + + fn from_type_reference(f: &syn::Field, tr: &syn::TypeReference) -> Self { + let lifetime = tr.lifetime.clone(); + let inner_type = Type::from_type(f, tr.elem.as_ref()); + Type::Reference(lifetime, Box::new(inner_type)) + } + + fn from_type_array(f: &syn::Field, ta: &syn::TypeArray) -> Self { + let inner_type = Type::from_type(f, ta.elem.as_ref()); + Type::Array(Box::new(inner_type)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use syn::{self, Data, DataStruct, DeriveInput}; + + fn extract_fields(input: proc_macro2::TokenStream) -> Vec { + let input: DeriveInput = syn::parse2(input).unwrap(); + + let fields = match input.data { + Data::Struct(DataStruct { fields, .. }) => fields, + _ => panic!("Input must be a struct"), + }; + + fields.iter().map(|field| field.to_owned()).collect() + } + + #[test] + fn test_generating_a_simple_writer_snippet() { + let snippet: proc_macro2::TokenStream = quote! { + struct ABoringStruct { + counter: usize, + } + }; + + let fields = extract_fields(snippet); + let counter = Field::from(&fields[0]); + + let snippet = counter.writer_snippet().to_string(); + assert_eq!(snippet, + (quote!{ + { + let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter ) . collect ( ); + + if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer { + typed . write_batch ( & vals [ .. ] , None , None ) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ counter } ) + } + } + }).to_string() + ) + } + + #[test] + fn test_optional_to_writer_snippet() { + let struct_def: proc_macro2::TokenStream = quote! { + struct StringBorrower<'a> { + optional_str: Option<&'a str>, + optional_string: &Option, + optional_dumb_int: &Option<&i32>, + } + }; + + let fields = extract_fields(struct_def); + + let optional = Field::from(&fields[0]); + let snippet = optional.writer_snippet(); + assert_eq!(snippet.to_string(), + (quote! { + { + let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_str . is_some ( ) { 1 } else { 0 } ) . collect ( ) ; + + let vals: Vec <_> = records.iter().filter_map( |rec| { + if let Some ( ref inner ) = rec . optional_str { + Some ( (&inner[..]).into() ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer { + typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; + } else { + panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } ) + } + } + } + ).to_string()); + + let optional = Field::from(&fields[1]); + let snippet = optional.writer_snippet(); + assert_eq!(snippet.to_string(), + (quote!{ + { + let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_string . is_some ( ) { 1 } else { 0 } ) . collect ( ) ; + + let vals: Vec <_> = records.iter().filter_map( |rec| { + if let Some ( ref inner ) = rec . optional_string { + Some ( (&inner[..]).into() ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer { + typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; + } else { + panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } ) + } + } + }).to_string()); + + let optional = Field::from(&fields[2]); + let snippet = optional.writer_snippet(); + assert_eq!(snippet.to_string(), + (quote!{ + { + let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_dumb_int . is_some ( ) { 1 } else { 0 } ) . collect ( ) ; + + let vals: Vec <_> = records.iter().filter_map( |rec| { + if let Some ( inner ) = rec . optional_dumb_int { + Some ( inner ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer { + typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; + } else { + panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } ) + } + } + }).to_string()); + } + + #[test] + fn test_converting_to_column_writer_type() { + let snippet: proc_macro2::TokenStream = quote! { + struct ABasicStruct { + yes_no: bool, + name: String, + } + }; + + let fields = extract_fields(snippet); + let processed: Vec<_> = fields.iter().map(|field| Field::from(field)).collect(); + + let column_writers: Vec<_> = processed + .iter() + .map(|field| field.ty.column_writer()) + .collect(); + + assert_eq!( + column_writers, + vec![ + syn::parse_quote!( + parquet::column::writer::ColumnWriter::BoolColumnWriter + ), + syn::parse_quote!( + parquet::column::writer::ColumnWriter::ByteArrayColumnWriter + ) + ] + ); + } + + #[test] + fn convert_basic_struct() { + let snippet: proc_macro2::TokenStream = quote! { + struct ABasicStruct { + yes_no: bool, + name: String, + } + }; + + let fields = extract_fields(snippet); + let processed: Vec<_> = fields.iter().map(|field| Field::from(field)).collect(); + assert_eq!(processed.len(), 2); + + assert_eq!( + processed, + vec![ + Field { + ident: syn::Ident::new("yes_no", proc_macro2::Span::call_site()), + ty: Type::TypePath(syn::parse_quote!(bool)), + is_a_byte_buf: false, + third_party_type: None, + }, + Field { + ident: syn::Ident::new("name", proc_macro2::Span::call_site()), + ty: Type::TypePath(syn::parse_quote!(String)), + is_a_byte_buf: true, + third_party_type: None, + } + ] + ) + } + + #[test] + fn test_get_inner_type() { + let snippet: proc_macro2::TokenStream = quote! { + struct LotsOfInnerTypes { + a_vec: Vec, + a_option: std::option::Option, + a_silly_string: std::string::String, + a_complicated_thing: std::option::Option>, + } + }; + + let fields = extract_fields(snippet); + let converted_fields: Vec<_> = + fields.iter().map(|field| Type::from(field)).collect(); + let inner_types: Vec<_> = converted_fields + .iter() + .map(|field| field.inner_type()) + .collect(); + let inner_types_strs: Vec<_> = inner_types + .iter() + .map(|ty| (quote! { #ty }).to_string()) + .collect(); + + assert_eq!( + inner_types_strs, + vec![ + "u8", + "bool", + "std :: string :: String", + "std :: result :: Result < ( ) , ( ) >" + ] + ) + } + + #[test] + fn test_physical_type() { + use parquet::basic::Type as BasicType; + let snippet: proc_macro2::TokenStream = quote! { + struct LotsOfInnerTypes { + a_buf: Vec, + a_number: i32, + a_verbose_option: std::option::Option, + a_silly_string: std::string::String, + a_fix_byte_buf: [u8; 10], + a_complex_option: Option<&Vec>, + a_complex_vec: &Vec<&Option>, + } + }; + + let fields = extract_fields(snippet); + let converted_fields: Vec<_> = + fields.iter().map(|field| Type::from(field)).collect(); + let physical_types: Vec<_> = converted_fields + .iter() + .map(|ty| ty.physical_type()) + .collect(); + + assert_eq!( + physical_types, + vec![ + BasicType::BYTE_ARRAY, + BasicType::INT32, + BasicType::BOOLEAN, + BasicType::BYTE_ARRAY, + BasicType::FIXED_LEN_BYTE_ARRAY, + BasicType::BYTE_ARRAY, + BasicType::INT32 + ] + ) + } + + #[test] + fn test_convert_comprehensive_owned_struct() { + let snippet: proc_macro2::TokenStream = quote! { + struct VecHolder { + a_vec: Vec, + a_option: std::option::Option, + a_silly_string: std::string::String, + a_complicated_thing: std::option::Option>, + } + }; + + let fields = extract_fields(snippet); + let converted_fields: Vec<_> = + fields.iter().map(|field| Type::from(field)).collect(); + + assert_eq!( + converted_fields, + vec![ + Type::Vec(Box::new(Type::TypePath(syn::parse_quote!(u8)))), + Type::Option(Box::new(Type::TypePath(syn::parse_quote!(bool)))), + Type::TypePath(syn::parse_quote!(std::string::String)), + Type::Option(Box::new(Type::TypePath( + syn::parse_quote!(std::result::Result<(),()>) + ))), + ] + ); + } + + #[test] + fn test_convert_borrowed_struct() { + let snippet: proc_macro2::TokenStream = quote! { + struct Borrower<'a> { + a_str: &'a str, + a_borrowed_option: &'a Option, + so_many_borrows: &'a Option<&'a str>, + } + }; + + let fields = extract_fields(snippet); + let types: Vec<_> = fields.iter().map(|field| Type::from(field)).collect(); + + assert_eq!( + types, + vec![ + Type::Reference( + Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())), + Box::new(Type::TypePath(syn::parse_quote!(str))) + ), + Type::Reference( + Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())), + Box::new(Type::Option(Box::new(Type::TypePath(syn::parse_quote!( + bool + ))))) + ), + Type::Reference( + Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())), + Box::new(Type::Option(Box::new(Type::Reference( + Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())), + Box::new(Type::TypePath(syn::parse_quote!(str))) + )))) + ), + ] + ); + } + + #[test] + #[cfg(feature = "chrono")] + fn test_chrono_timestamp_millis() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimestampStruct { + henceforth: chrono::NaiveDateTime, + maybe_happened: Option<&chrono::NaiveDateTime>, + } + }; + + let fields = extract_fields(snippet); + let when = Field::from(&fields[0]); + assert_eq!(when.writer_snippet().to_string(),(quote!{ + { + let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect(); + if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], None, None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth }) + } + } + }).to_string()); + + let maybe_happened = Field::from(&fields[1]); + assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{ + { + let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect(); + let vals : Vec<_> = records.iter().filter_map(|rec| { + if let Some(inner) = rec.maybe_happened { + Some( inner.timestamp_millis() ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened }) + } + } + }).to_string()); + } + + #[test] + #[cfg(feature = "chrono")] + fn test_chrono_date() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimestampStruct { + henceforth: chrono::NaiveDate, + maybe_happened: Option<&chrono::NaiveDate>, + } + }; + + let fields = extract_fields(snippet); + let when = Field::from(&fields[0]); + assert_eq!(when.writer_snippet().to_string(),(quote!{ + { + let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect(); + if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], None, None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth }) + } + } + }).to_string()); + + let maybe_happened = Field::from(&fields[1]); + assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{ + { + let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect(); + let vals : Vec<_> = records.iter().filter_map(|rec| { + if let Some(inner) = rec.maybe_happened { + Some( inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened }) + } + } + }).to_string()); + } + + #[test] + #[cfg(feature = "uuid")] + fn test_uuid() { + let snippet: proc_macro2::TokenStream = quote! { + struct ATimestampStruct { + unique_id: uuid::Uuid, + maybe_unique_id: Option<&uuid::Uuid>, + } + }; + + let fields = extract_fields(snippet); + let when = Field::from(&fields[0]); + assert_eq!(when.writer_snippet().to_string(),(quote!{ + { + let vals : Vec<_> = records.iter().map(|rec| (&rec.unique_id.to_string()[..]).into() ).collect(); + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], None, None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id }) + } + } + }).to_string()); + + let maybe_happened = Field::from(&fields[1]); + assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{ + { + let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect(); + let vals : Vec<_> = records.iter().filter_map(|rec| { + if let Some(ref inner) = rec.maybe_unique_id { + Some( (&inner.to_string()[..]).into() ) + } else { + None + } + }).collect(); + + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer { + typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; + } else { + panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id }) + } + } + }).to_string()); + } +} diff --git a/rust/parquet_derive_test/Cargo.toml b/rust/parquet_derive_test/Cargo.toml new file mode 100644 index 0000000000000..fc5af3efd4b11 --- /dev/null +++ b/rust/parquet_derive_test/Cargo.toml @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "parquet_derive_test" +version = "2.0.0-SNAPSHOT" +authors = ["Apache Arrow "] +keywords = [ "parquet" ] +edition = "2018" + +[dependencies] +parquet = { path = "../parquet", version = "2.0.0-SNAPSHOT" } +parquet_derive = { path = "../parquet_derive", version = "2.0.0-SNAPSHOT" } diff --git a/rust/parquet_derive_test/src/lib.rs b/rust/parquet_derive_test/src/lib.rs new file mode 100644 index 0000000000000..aca4dc57049d1 --- /dev/null +++ b/rust/parquet_derive_test/src/lib.rs @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate parquet; + +#[macro_use] +extern crate parquet_derive; + +use parquet::record::RecordWriter; + +#[derive(ParquetRecordWriter)] +struct ACompleteRecord<'a> { + pub a_bool: bool, + pub a_str: &'a str, + pub a_string: String, + pub a_borrowed_string: &'a String, + pub maybe_a_str: Option<&'a str>, + pub maybe_a_string: Option, + pub magic_number: i32, + pub low_quality_pi: f32, + pub high_quality_pi: f64, + pub maybe_pi: Option, + pub maybe_best_pi: Option, + pub borrowed_maybe_a_string: &'a Option, + pub borrowed_maybe_a_str: &'a Option<&'a str>, +} + +#[cfg(test)] +mod tests { + use super::*; + + use parquet::{ + file::{ + properties::WriterProperties, + writer::{FileWriter, SerializedFileWriter}, + }, + schema::parser::parse_message_type, + }; + use std::{env, fs, io::Write, rc::Rc}; + + #[test] + fn test_parquet_derive_hello() { + let file = get_temp_file("test_parquet_derive_hello", &[]); + let schema_str = "message schema { + REQUIRED boolean a_bool; + REQUIRED BINARY a_str (UTF8); + REQUIRED BINARY a_string (UTF8); + REQUIRED BINARY a_borrowed_string (UTF8); + OPTIONAL BINARY a_maybe_str (UTF8); + OPTIONAL BINARY a_maybe_string (UTF8); + REQUIRED INT32 magic_number; + REQUIRED FLOAT low_quality_pi; + REQUIRED DOUBLE high_quality_pi; + OPTIONAL FLOAT maybe_pi; + OPTIONAL DOUBLE maybe_best_pi; + OPTIONAL BINARY borrowed_maybe_a_string (UTF8); + OPTIONAL BINARY borrowed_maybe_a_str (UTF8); + }"; + + let schema = Rc::new(parse_message_type(schema_str).unwrap()); + + let props = Rc::new(WriterProperties::builder().build()); + let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); + + let a_str = "hello mother".to_owned(); + let a_borrowed_string = "cool news".to_owned(); + let maybe_a_string = Some("it's true, I'm a string".to_owned()); + let maybe_a_str = Some(&a_str[..]); + + let drs: Vec = vec![ACompleteRecord { + a_bool: true, + a_str: &a_str[..], + a_string: "hello father".into(), + a_borrowed_string: &a_borrowed_string, + maybe_a_str: Some(&a_str[..]), + maybe_a_string: Some(a_str.clone()), + magic_number: 100, + low_quality_pi: 3.14, + high_quality_pi: 3.1415, + maybe_pi: Some(3.14), + maybe_best_pi: Some(3.1415), + borrowed_maybe_a_string: &maybe_a_string, + borrowed_maybe_a_str: &maybe_a_str, + }]; + + let mut row_group = writer.next_row_group().unwrap(); + drs.as_slice().write_to_row_group(&mut row_group).unwrap(); + writer.close_row_group(row_group).unwrap(); + writer.close().unwrap(); + } + + /// Returns file handle for a temp file in 'target' directory with a provided content + pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File { + // build tmp path to a file in "target/debug/testdata" + let mut path_buf = env::current_dir().unwrap(); + path_buf.push("target"); + path_buf.push("debug"); + path_buf.push("testdata"); + fs::create_dir_all(&path_buf).unwrap(); + path_buf.push(file_name); + + // write file content + let mut tmp_file = fs::File::create(path_buf.as_path()).unwrap(); + tmp_file.write_all(content).unwrap(); + tmp_file.sync_all().unwrap(); + + // return file handle for both read and write + let file = fs::OpenOptions::new() + .read(true) + .write(true) + .open(path_buf.as_path()); + assert!(file.is_ok()); + file.unwrap() + } +}