Skip to content

Commit

Permalink
Support Serializing File Footers into Flatbuffers (#37)
Browse files Browse the repository at this point in the history
This commit:

- Adds all the infrastructure needed to serialize file footers
- Makes the changes in serde_arrow_ipc_file:to_ipc to include the
  serialized footers
  • Loading branch information
Benjamin-Philip authored Oct 20, 2023
1 parent fec8cf5 commit 53db7e2
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 7 deletions.
104 changes: 104 additions & 0 deletions native/arrow_format_nif/src/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use super::message::schema::Schema;
use super::message::Version;
use crate::utils::CustomMetadata;

use arrow_format::ipc;
use arrow_format::ipc::planus::Builder;
use rustler::NifRecord;

#[derive(Debug, NifRecord)]
#[tag = "footer"]
pub struct Footer {
pub version: Version,
pub schema: Schema,
pub dictionaries: Vec<Block>,
pub record_batches: Vec<Block>,
pub custom_metadata: CustomMetadata,
}

#[derive(Debug, NifRecord)]
#[tag = "block"]
pub struct Block {
pub offset: i64,
pub metadata_length: i32,
pub body_length: i64,
}

impl Footer {
// A Word on Terminology:
//
// The function `serialize_to_ipc` is what actually serializes a footer to
// its final binary/flatbuffer form.
//
// The function `serialize` just "serializes" (in this case converts) our
// footer struct to `arrow_format`'s file struct. A similar "naming
// convention" was followed in all the other traits which implement
// `serialize`

pub fn serialize_to_ipc(&self) -> Vec<u8> {
let file = self.serialize();

let mut builder = Builder::new();
builder.finish(file, None).to_vec()
}

pub fn serialize(&self) -> ipc::Footer {
ipc::Footer {
version: self.version.serialize(),
schema: Some(Box::new(self.schema.serialize())),
dictionaries: self
.dictionaries
.iter()
.map(|dictionary| dictionary.serialize())
.collect::<Vec<_>>()
.into(),
record_batches: self
.record_batches
.iter()
.map(|record_batch| record_batch.serialize())
.collect::<Vec<_>>()
.into(),
custom_metadata: None,
}
}
}

impl Block {
pub fn serialize(&self) -> ipc::Block {
ipc::Block {
offset: self.offset,
meta_data_length: self.metadata_length,
body_length: self.body_length,
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::test::fixtures;
// use crate::utils;
use arrow_format::ipc;

#[test]
fn test_block_serialize() {
let block = Block {
offset: 10,
metadata_length: 10,
body_length: 100,
};
let arrow_block = ipc::Block {
offset: 10,
meta_data_length: 10,
body_length: 100,
};
assert_eq!(block.serialize(), arrow_block);
}

#[test]
fn test_footer_serialize() {
let footer = fixtures::footer();
let arrow_footer = fixtures::arrow_footer();
assert_eq!(footer.serialize(), arrow_footer);
}
}
59 changes: 58 additions & 1 deletion native/arrow_format_nif/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use rustler::types::Binary;
use rustler::{Atom, Env};

mod file;
mod message;
mod utils;

use file::Footer;
use message::Message;

mod atoms {
Expand Down Expand Up @@ -49,6 +51,9 @@ mod atoms {
#[cfg(test)]
pub mod test {
pub mod fixtures {
use crate::file::{Block, Footer};
use crate::message::{Header, Version};
use crate::utils;
use arrow_format::ipc;

pub fn arrow_schema() -> ipc::Message {
Expand Down Expand Up @@ -189,6 +194,40 @@ pub mod test {
custom_metadata: None,
}
}

pub fn footer() -> Footer {
let Header::Schema(schema) = utils::schema().header else {
panic!("Record Batch!")
};
Footer {
version: Version::V5,
schema: schema,
dictionaries: vec![],
record_batches: vec![Block {
offset: 128,
metadata_length: 32,
body_length: 512,
}],
custom_metadata: vec![],
}
}

pub fn arrow_footer() -> ipc::Footer {
let ipc::MessageHeader::Schema(schema) = arrow_schema().header.unwrap() else {
panic!("This is not gonna panic!")
};
ipc::Footer {
version: ipc::MetadataVersion::V5,
schema: Some(schema),
dictionaries: vec![].into(),
record_batches: Some(vec![ipc::Block {
offset: 128,
meta_data_length: 32,
body_length: 512,
}]),
custom_metadata: None,
}
}
}
}

Expand Down Expand Up @@ -229,7 +268,25 @@ fn serialize_message(env: Env, message: Message) -> Binary {
erl_bin.release(env)
}

/// Serializes a footer into its correspondding flatbuffers.
///
/// This function serializes a footer into its correspondding flatbuffers and
/// returns a binary
#[rustler::nif]
fn serialize_footer(env: Env, footer: Footer) -> Binary {
let flatbuffers = footer.serialize_to_ipc();

let mut erl_bin = rustler::types::OwnedBinary::new(flatbuffers.len()).unwrap();
erl_bin.as_mut_slice().copy_from_slice(&flatbuffers);
erl_bin.release(env)
}

rustler::init!(
"arrow_format_nif",
[test_decode, test_encode, serialize_message]
[
test_decode,
test_encode,
serialize_message,
serialize_footer
]
);
2 changes: 1 addition & 1 deletion native/arrow_format_nif/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum Body {
impl Message {
// A Word on Terminology:
//
// The function `serialize_to_ipc` is what actually serializes a function to
// The function `serialize_to_ipc` is what actually serializes a message to
// its final binary/flatbuffer form.
//
// The function `serialize` just "serializes" (in this case converts) our
Expand Down
5 changes: 4 additions & 1 deletion src/arrow_format_nif.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-module(arrow_format_nif).
-export([test_decode/1, test_encode/1, serialize_message/1]).
-export([test_decode/1, test_encode/1, serialize_message/1, serialize_footer/1]).

-include("cargo.hrl").
-on_load(init/0).
Expand All @@ -18,6 +18,9 @@ test_encode(_) ->
serialize_message(_) ->
?NOT_LOADED.

serialize_footer(_) ->
?NOT_LOADED.

%%%===================================================================
%%% NIF
%%%===================================================================
Expand Down
2 changes: 1 addition & 1 deletion src/serde_arrow_ipc_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ blocks(Offset, [H | T], Blocks, EMFs) ->
%% @doc Serializes a file into the IPC File Format
-spec to_ipc(File :: #file{}) -> SerializedFile :: binary().
to_ipc(File) ->
Footer = <<"Footer!">>,
Footer = arrow_format_nif:serialize_footer(File#file.footer),
Sz = byte_size(Footer),
<<"ARROW1", "00", (File#file.body)/binary, Footer/bitstring, Sz:4/little-signed-integer-unit:8,
"ARROW1">>.
3 changes: 2 additions & 1 deletion src/serde_arrow_ipc_file.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
version = v5 :: serde_arrow_ipc_message:metadata_version(),
schema :: #schema{},
dictionaries = [] :: [#block{}],
record_batches :: [#block{}]
record_batches :: [#block{}],
custom_metadata = [] :: [serde_arrow_ipc_message:key_value()]
}).

-record(file, {footer :: #footer{}, body :: binary()}).
10 changes: 9 additions & 1 deletion test/arrow_format_nif_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
-include_lib("stdlib/include/assert.hrl").

-include("serde_arrow_ipc_message.hrl").
-include("serde_arrow_ipc_file.hrl").

-include("serde_arrow_ipc_marks_data.hrl").

Expand All @@ -28,7 +29,7 @@
-define(LargeList, schema(large_list)).

all() ->
[test_decode, test_encode, serialize_message].
[test_decode, test_encode, serialize_message, serialize_footer].

%%%%%%%%%%%%%%%%%
%% test_decode %%
Expand Down Expand Up @@ -121,6 +122,13 @@ serialize_message(_Config) ->
RecordBatchMsg = (?RecordBatchMsg)#message{body = undefined},
?assert(is_binary(arrow_format_nif:serialize_message(RecordBatchMsg))).

%%%%%%%%%%%%%%%%%%%%%%%
%% serialize_footer %%
%%%%%%%%%%%%%%%%%%%%%%%

serialize_footer(_Config) ->
?assert(is_binary(arrow_format_nif:serialize_footer((?File)#file.footer))).

%%%%%%%%%%%
%% Utils %%
%%%%%%%%%%%
Expand Down
6 changes: 5 additions & 1 deletion test/serde_arrow_ipc_file_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ all() ->
valid_schema_on_from_erlang,
valid_dictionaries_on_from_erlang,
valid_record_batches_on_from_erlang,
valid_custom_metadata_on_from_erlang,
valid_body_on_from_erlang,

valid_magic_string_on_to_ipc,
Expand Down Expand Up @@ -44,6 +45,9 @@ valid_record_batches_on_from_erlang(_Config) ->
},
?assertEqual((?File)#file.footer#footer.record_batches, [RecordBatchBlock]).

valid_custom_metadata_on_from_erlang(_Config) ->
?assertEqual((?File)#file.footer#footer.custom_metadata, []).

valid_body_on_from_erlang(_Config) ->
?assertEqual((?File)#file.body, ?Stream).

Expand All @@ -65,7 +69,7 @@ valid_footer_on_to_ipc(_Config) ->
FooterSz = byte_size(?SerializedFile) * 8 - (64 + StreamSz + 32 + 48),
<<_ARROW_MAGIC:64/bitstring, _Stream:StreamSz/bitstring, Footer:FooterSz/bitstring,
_FooterSz:32/signed-little-integer, "ARROW1">> = ?SerializedFile,
?assertEqual(Footer, <<"Footer!">>).
?assertEqual(Footer, arrow_format_nif:serialize_footer((?File)#file.footer)).

valid_stream_on_to_ipc(_Config) ->
Sz = byte_size(?Stream) * 8,
Expand Down

0 comments on commit 53db7e2

Please sign in to comment.