Skip to content

Commit

Permalink
ARROW-5123: [Rust] Parquet derive for simple structs
Browse files Browse the repository at this point in the history
A rebase and significant rewrite of sunchao/parquet-rs#197

Big improvement: I now use a more natural nested enum style, it helps break out what patterns of data types are . The rest of the broad strokes still apply.

Goal
===

Writing many columns to a file is a chore. If you can put your values in to a struct which mirrors the schema of your file, this `derive(ParquetRecordWriter)` will write out all the fields, in the order in which they are defined, to a row_group.

How to Use
===

```
extern crate parquet;
#[macro_use] extern crate parquet_derive;

#[derive(ParquetRecordWriter)]
struct ACompleteRecord<'a> {
  pub a_bool: bool,
  pub a_str: &'a str,
}
```

RecordWriter trait
===

This is the new trait which `parquet_derive` will implement for your structs.

```
use super::RowGroupWriter;

pub trait RecordWriter<T> {
  fn write_to_row_group(&self, row_group_writer: &mut Box<RowGroupWriter>);
}
```

How does it work?
===

The `parquet_derive` crate adds code generating functionality to the rust compiler. The code generation takes rust syntax and emits additional syntax. This macro expansion works on rust 1.15+ stable. This is a dynamic plugin, loaded by the machinery in cargo. Users don't have to do any special `build.rs` steps or anything like that, it's automatic by including `parquet_derive` in their project. The `parquet_derive/src/Cargo.toml` has a section saying as much:

```
[lib]
proc-macro = true
```

The rust struct tagged with `#[derive(ParquetRecordWriter)]` is provided to the `parquet_record_writer` function in `parquet_derive/src/lib.rs`. The `syn` crate parses the struct from a string-representation to a AST (a recursive enum value). The AST contains all the values I care about when generating a `RecordWriter` impl:

 - the name of the struct
 - the lifetime variables of the struct
 - the fields of the struct

The fields of the struct are translated from AST to a flat `FieldInfo` struct. It has the bits I care about for writing a column: `field_name`, `field_lifetime`, `field_type`, `is_option`, `column_writer_variant`.

The code then does the equivalent of templating to build the `RecordWriter` implementation. The templating functionality is provided by the `quote` crate. At a high-level the template for `RecordWriter` looks like:

```
impl RecordWriter for $struct_name {
  fn write_row_group(..) {
    $({
      $column_writer_snippet
    })
  }
}
```

this template is then added under the struct definition, ending up something like:

```
struct MyStruct {
}
impl RecordWriter for MyStruct {
  fn write_row_group(..) {
    {
       write_col_1();
    };
   {
       write_col_2();
   }
  }
}
```

and finally _THIS_ is the code passed to rustc. It's just code now, fully expanded and standalone. If a user ever changes their `struct MyValue` definition the `ParquetRecordWriter` will be regenerated. There's no intermediate values to version control or worry about.

Viewing the Derived Code
===

To see the generated code before it's compiled, one very useful bit is to install `cargo expand` [more info on gh](https://github.com/dtolnay/cargo-expand), then you can do:

```
$WORK_DIR/parquet-rs/parquet_derive_test
cargo expand --lib > ../temp.rs
```

then you can dump the contents:

```
struct DumbRecord {
    pub a_bool: bool,
    pub a2_bool: bool,
}
impl RecordWriter<DumbRecord> for &[DumbRecord] {
    fn write_to_row_group(
        &self,
        row_group_writer: &mut Box<parquet::file::writer::RowGroupWriter>,
    ) {
        let mut row_group_writer = row_group_writer;
        {
            let vals: Vec<bool> = self.iter().map(|x| x.a_bool).collect();
            let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
            if let parquet::column::writer::ColumnWriter::BoolColumnWriter(ref mut typed) =
                column_writer
            {
                typed.write_batch(&vals[..], None, None).unwrap();
            }
            row_group_writer.close_column(column_writer).unwrap();
        };
        {
            let vals: Vec<bool> = self.iter().map(|x| x.a2_bool).collect();
            let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
            if let parquet::column::writer::ColumnWriter::BoolColumnWriter(ref mut typed) =
                column_writer
            {
                typed.write_batch(&vals[..], None, None).unwrap();
            }
            row_group_writer.close_column(column_writer).unwrap();
        }
    }
}
```

now I need to write out all the combinations of types we support and make sure it writes out data.

Procedural Macros
===

The `parquet_derive` crate can ONLY export the derivation functionality. No traits, nothing else. The derive crate can not host test cases. It's kind of like a "dummy" crate which is only used by the compiler, never the code.

The parent crate cannot use the derivation functionality, which is important because it means test code cannot be in the parent crate. This forces us to have a third crate, `parquet_derive_test`.

I'm open to being wrong on any one of these finer points. I had to bang on this for a while to get it to compile!

Potentials For Better Design
===

 - [x] Recursion could be limited by generating the code as "snippets" instead of one big `quote!` AST generator. Or so I think. It might be nicer to push generating each columns writing code to another loop.
 - [X] ~~It would be nicer if I didn't have to be so picky about data going in to the `write_batch` function. Is it possible we could make a version of the function which accept `Into<DataType>` or similar? This would greatly simplify this derivation code as it would not need to enumerate all the supported types. Something like `write_generic_batch(&[impl Into<DataType>])` would be neat.~~ (not tackling in this generation of the plugin)
 - [X] ~~Another idea to improving writing columns, could we have a write function for `Iterator`s? I already have a `Vec<DumbRecord>`, if I could just write a mapping for accessing the one value, we could skip the whole intermediate vec for `write_batch`. Should have some significant memory advantages.~~ (not tackling in this generation of the plugin, it's a bigger parquet-rs enhancement)
 - [X] ~~It might be worthwhile to derive a parquet schema directly from a struct definition. That should stamp out opportunities for type errors.~~ (moved to apache#203)

Status
===

I have successfully integrated this work with my own data exporter (takes postgres/couchdb and outputs a single parquet file).

I think this code is worth including in the project, with the caveat that it only generates simplistic `RecordWriter`s. As people start to use we can add code generation for more complex, nested structs. We can convert the nested matching style to a fancier looping style. But for now, this explicit nesting is easier to debug and understand (to me at least!).

Closes apache#4140 from xrl/parquet_derive

Lead-authored-by: Xavier Lange <xrlange@gmail.com>
Co-authored-by: Neville Dipale <nevilledips@gmail.com>
Co-authored-by: Bryant Biggs <bryantbiggs@gmail.com>
Co-authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Neville Dipale <nevilledips@gmail.com>
  • Loading branch information
4 people authored and GeorgeAp committed Jun 7, 2021
1 parent 64d85f7 commit 6c257ef
Show file tree
Hide file tree
Showing 13 changed files with 1,453 additions and 23 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
!rust/arrow-flight/Cargo.toml
!rust/parquet/Cargo.toml
!rust/parquet/build.rs
!rust/parquet_derive/Cargo.toml
!rust/parquet_derive_test/Cargo.toml
!rust/datafusion/Cargo.toml
!rust/datafusion/benches
!rust/integration-testing/Cargo.toml
8 changes: 6 additions & 2 deletions ci/docker/debian-10-rust.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,18 @@ RUN mkdir \
/arrow/rust/benchmarks/src \
/arrow/rust/datafusion/src \
/arrow/rust/integration-testing/src \
/arrow/rust/parquet/src && \
/arrow/rust/parquet/src \
/arrow/rust/parquet_derive/src \
/arrow/rust/parquet_derive_test/src && \
touch \
/arrow/rust/arrow-flight/src/lib.rs \
/arrow/rust/arrow/src/lib.rs \
/arrow/rust/benchmarks/src/lib.rs \
/arrow/rust/datafusion/src/lib.rs \
/arrow/rust/integration-testing/src/lib.rs \
/arrow/rust/parquet/src/lib.rs
/arrow/rust/parquet/src/lib.rs \
/arrow/rust/parquet_derive/src/lib.rs \
/arrow/rust/parquet_derive_test/src/lib.rs

# Compile dependencies for the whole workspace
RUN cd /arrow/rust && cargo build --workspace --lib --all-features
58 changes: 58 additions & 0 deletions dev/release/00-prepare-test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,35 @@ def test_version_pre_tag
"+See [crate documentation](https://docs.rs/crate/parquet/#{@release_version}) on available API."],
],
},
{
path: "rust/parquet_derive/Cargo.toml",
hunks: [
["-version = \"#{@snapshot_version}\"",
"+version = \"#{@release_version}\""],
["-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }",
"+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"],
],
},
{
path: "rust/parquet_derive/README.md",
hunks: [
["-parquet = \"#{@snapshot_version}\"",
"-parquet_derive = \"#{@snapshot_version}\"",
"+parquet = \"#{@release_version}\"",
"+parquet_derive = \"#{@release_version}\""],
],
},
{
path: "rust/parquet_derive_test/Cargo.toml",
hunks: [
["-version = \"#{@snapshot_version}\"",
"+version = \"#{@release_version}\"",
"-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }",
"-parquet_derive = { path = \"../parquet_derive\", version = \"#{@snapshot_version}\" }",
"+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }",
"+parquet_derive = { path = \"../parquet_derive\", version = \"#{@release_version}\" }"],
],
},
],
parse_patch(git("log", "-n", "1", "-p")))
end
Expand Down Expand Up @@ -537,6 +566,35 @@ def test_version_post_tag
"+See [crate documentation](https://docs.rs/crate/parquet/#{@next_snapshot_version}) on available API."],
],
},
{
path: "rust/parquet_derive/Cargo.toml",
hunks: [
["-version = \"#{@release_version}\"",
"+version = \"#{@next_snapshot_version}\""],
["-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }",
"+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"],
],
},
{
path: "rust/parquet_derive/README.md",
hunks: [
["-parquet = \"#{@release_version}\"",
"-parquet_derive = \"#{@release_version}\"",
"+parquet = \"#{@next_snapshot_version}\"",
"+parquet_derive = \"#{@next_snapshot_version}\""],
],
},
{
path: "rust/parquet_derive_test/Cargo.toml",
hunks: [
["-version = \"#{@release_version}\"",
"+version = \"#{@next_snapshot_version}\"",
"-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }",
"-parquet_derive = { path = \"../parquet_derive\", version = \"#{@release_version}\" }",
"+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }",
"+parquet_derive = { path = \"../parquet_derive\", version = \"#{@next_snapshot_version}\" }"],
],
},
],
parse_patch(git("log", "-n", "1", "-p")))
end
Expand Down
26 changes: 7 additions & 19 deletions dev/release/00-prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -151,29 +151,17 @@ update_versions() {
-e "s/^(arrow = .* version = )\".*\"(( .*)|(, features = .*))$/\\1\"${version}\"\\2/g" \
-e "s/^(arrow-flight = .* version = )\".+\"( .*)/\\1\"${version}\"\\2/g" \
-e "s/^(parquet = .* version = )\".*\"(( .*)|(, features = .*))$/\\1\"${version}\"\\2/g" \
-e "s/^(parquet_derive = .* version = )\".*\"(( .*)|(, features = .*))$/\\1\"${version}\"\\2/g" \
*/Cargo.toml
rm -f */Cargo.toml.bak
git add */Cargo.toml

# Update version number for parquet README
sed -i.bak -E -e \
"s/^parquet = \".+\"/parquet = \"${version}\"/g" \
parquet/README.md
sed -i.bak -E -e \
"s/docs.rs\/crate\/parquet\/.+\)/docs.rs\/crate\/parquet\/${version}\)/g" \
parquet/README.md
rm -f parquet/README.md.bak
git add parquet/README.md

# Update version number for datafusion README
sed -i.bak -E -e \
"s/^datafusion = \".+\"/datafusion = \"${version}\"/g" \
datafusion/README.md
sed -i.bak -E -e \
"s/docs.rs\/crate\/datafusion\/.+\)/docs.rs\/crate\/datafusion\/${version}\)/g" \
datafusion/README.md
rm -f datafusion/README.md.bak
git add datafusion/README.md
sed -i.bak -E \
-e "s/^([^ ]+) = \".+\"/\\1 = \"${version}\"/g" \
-e "s,docs\.rs/crate/([^/]+)/[^)]+,docs.rs/crate/\\1/${version},g" \
*/README.md
rm -f */README.md.bak
git add */README.md
cd -
}

Expand Down
2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
members = [
"arrow",
"parquet",
"parquet_derive",
"parquet_derive_test",
"datafusion",
"arrow-flight",
"integration-testing",
Expand Down
6 changes: 4 additions & 2 deletions rust/parquet/src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
mod api;
pub mod reader;
mod record_writer;
mod triplet;

pub use self::api::{
List, ListAccessor, Map, MapAccessor, Row, RowAccessor, RowFormatter,
pub use self::{
api::{List, ListAccessor, Map, MapAccessor, Row, RowAccessor},
record_writer::RecordWriter,
};
26 changes: 26 additions & 0 deletions rust/parquet/src/record/record_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use super::super::errors::ParquetError;
use super::super::file::writer::RowGroupWriter;

pub trait RecordWriter<T> {
fn write_to_row_group(
&self,
row_group_writer: &mut Box<RowGroupWriter>,
) -> Result<(), ParquetError>;
}
37 changes: 37 additions & 0 deletions rust/parquet_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "parquet_derive"
version = "2.0.0-SNAPSHOT"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
keywords = [ "parquet" ]
edition = "2018"

[lib]
proc-macro = true

[features]
chrono = []
bigdecimal = []
uuid = []

[dependencies]
proc-macro2 = "1.0.8"
quote = "1.0.2"
syn = { version = "1.0.14", features = ["full", "extra-traits"] }
parquet = { path = "../parquet", version = "2.0.0-SNAPSHOT" }
98 changes: 98 additions & 0 deletions rust/parquet_derive/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Parquet Derive

A crate for deriving `RecordWriter` for arbitrary, _simple_ structs. This does not generate writers for arbitrarily nested
structures. It only works for primitives and a few generic structures and
various levels of reference. Please see features checklist for what is currently
supported.

Derive also has some support for the chrono time library. You must must enable the `chrono` feature to get this support.

## Usage
Add this to your Cargo.toml:
```toml
[dependencies]
parquet = "2.0.0-SNAPSHOT"
parquet_derive = "2.0.0-SNAPSHOT"
```

and this to your crate root:
```rust
extern crate parquet;
#[macro_use] extern crate parquet_derive;
```

Example usage of deriving a `RecordWriter` for your struct:

```rust
use parquet;
use parquet::record::RecordWriter;

#[derive(ParquetRecordWriter)]
struct ACompleteRecord<'a> {
pub a_bool: bool,
pub a_str: &'a str,
pub a_string: String,
pub a_borrowed_string: &'a String,
pub maybe_a_str: Option<&'a str>,
pub magic_number: i32,
pub low_quality_pi: f32,
pub high_quality_pi: f64,
pub maybe_pi: Option<f32>,
pub maybe_best_pi: Option<f64>,
}

// Initialize your parquet file
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
let mut row_group = writer.next_row_group().unwrap();

// Build up your records
let chunks = vec![ACompleteRecord{...}];

// The derived `RecordWriter` takes over here
(&chunks[..]).write_to_row_group(&mut row_group);

writer.close_row_group(row_group).unwrap();
writer.close().unwrap();
```

## Features
- [X] Support writing `String`, `&str`, `bool`, `i32`, `f32`, `f64`, `Vec<u8>`
- [ ] Support writing dictionaries
- [X] Support writing logical types like timestamp
- [X] Derive definition_levels for `Option`
- [ ] Derive definition levels for nested structures
- [ ] Derive writing tuple struct
- [ ] Derive writing `tuple` container types

## Requirements
- Same as `parquet-rs`

## Test
Testing a `*_derive` crate requires an intermediate crate. Go to `parquet_derive_test` and run `cargo test` for
unit tests.

## Docs
To build documentation, run `cargo doc --no-deps`.
To compile and view in the browser, run `cargo doc --no-deps --open`.

## License
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
Loading

0 comments on commit 6c257ef

Please sign in to comment.