diff --git a/native/arrow_format_nif/src/file.rs b/native/arrow_format_nif/src/file.rs new file mode 100644 index 0000000..d51a64e --- /dev/null +++ b/native/arrow_format_nif/src/file.rs @@ -0,0 +1,104 @@ +use super::message::schema::Schema; +use super::message::Version; +use crate::utils::CustomMetadata; + +use arrow_format::ipc; +use arrow_format::ipc::planus::Builder; +use rustler::NifRecord; + +#[derive(Debug, NifRecord)] +#[tag = "footer"] +pub struct Footer { + pub version: Version, + pub schema: Schema, + pub dictionaries: Vec, + pub record_batches: Vec, + pub custom_metadata: CustomMetadata, +} + +#[derive(Debug, NifRecord)] +#[tag = "block"] +pub struct Block { + pub offset: i64, + pub metadata_length: i32, + pub body_length: i64, +} + +impl Footer { + // A Word on Terminology: + // + // The function `serialize_to_ipc` is what actually serializes a footer to + // its final binary/flatbuffer form. + // + // The function `serialize` just "serializes" (in this case converts) our + // footer struct to `arrow_format`'s file struct. A similar "naming + // convention" was followed in all the other traits which implement + // `serialize` + + pub fn serialize_to_ipc(&self) -> Vec { + let file = self.serialize(); + + let mut builder = Builder::new(); + builder.finish(file, None).to_vec() + } + + pub fn serialize(&self) -> ipc::Footer { + ipc::Footer { + version: self.version.serialize(), + schema: Some(Box::new(self.schema.serialize())), + dictionaries: self + .dictionaries + .iter() + .map(|dictionary| dictionary.serialize()) + .collect::>() + .into(), + record_batches: self + .record_batches + .iter() + .map(|record_batch| record_batch.serialize()) + .collect::>() + .into(), + custom_metadata: None, + } + } +} + +impl Block { + pub fn serialize(&self) -> ipc::Block { + ipc::Block { + offset: self.offset, + meta_data_length: self.metadata_length, + body_length: self.body_length, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::test::fixtures; + // use crate::utils; + use arrow_format::ipc; + + #[test] + fn test_block_serialize() { + let block = Block { + offset: 10, + metadata_length: 10, + body_length: 100, + }; + let arrow_block = ipc::Block { + offset: 10, + meta_data_length: 10, + body_length: 100, + }; + assert_eq!(block.serialize(), arrow_block); + } + + #[test] + fn test_footer_serialize() { + let footer = fixtures::footer(); + let arrow_footer = fixtures::arrow_footer(); + assert_eq!(footer.serialize(), arrow_footer); + } +} diff --git a/native/arrow_format_nif/src/lib.rs b/native/arrow_format_nif/src/lib.rs index 4eedd41..d814ac2 100644 --- a/native/arrow_format_nif/src/lib.rs +++ b/native/arrow_format_nif/src/lib.rs @@ -1,9 +1,11 @@ use rustler::types::Binary; use rustler::{Atom, Env}; +mod file; mod message; mod utils; +use file::Footer; use message::Message; mod atoms { @@ -49,6 +51,9 @@ mod atoms { #[cfg(test)] pub mod test { pub mod fixtures { + use crate::file::{Block, Footer}; + use crate::message::{Header, Version}; + use crate::utils; use arrow_format::ipc; pub fn arrow_schema() -> ipc::Message { @@ -189,6 +194,40 @@ pub mod test { custom_metadata: None, } } + + pub fn footer() -> Footer { + let Header::Schema(schema) = utils::schema().header else { + panic!("Record Batch!") + }; + Footer { + version: Version::V5, + schema: schema, + dictionaries: vec![], + record_batches: vec![Block { + offset: 128, + metadata_length: 32, + body_length: 512, + }], + custom_metadata: vec![], + } + } + + pub fn arrow_footer() -> ipc::Footer { + let ipc::MessageHeader::Schema(schema) = arrow_schema().header.unwrap() else { + panic!("This is not gonna panic!") + }; + ipc::Footer { + version: ipc::MetadataVersion::V5, + schema: Some(schema), + dictionaries: vec![].into(), + record_batches: Some(vec![ipc::Block { + offset: 128, + meta_data_length: 32, + body_length: 512, + }]), + custom_metadata: None, + } + } } } @@ -229,7 +268,25 @@ fn serialize_message(env: Env, message: Message) -> Binary { erl_bin.release(env) } +/// Serializes a footer into its correspondding flatbuffers. +/// +/// This function serializes a footer into its correspondding flatbuffers and +/// returns a binary +#[rustler::nif] +fn serialize_footer(env: Env, footer: Footer) -> Binary { + let flatbuffers = footer.serialize_to_ipc(); + + let mut erl_bin = rustler::types::OwnedBinary::new(flatbuffers.len()).unwrap(); + erl_bin.as_mut_slice().copy_from_slice(&flatbuffers); + erl_bin.release(env) +} + rustler::init!( "arrow_format_nif", - [test_decode, test_encode, serialize_message] + [ + test_decode, + test_encode, + serialize_message, + serialize_footer + ] ); diff --git a/native/arrow_format_nif/src/message.rs b/native/arrow_format_nif/src/message.rs index 60e2075..bab0fd3 100644 --- a/native/arrow_format_nif/src/message.rs +++ b/native/arrow_format_nif/src/message.rs @@ -43,7 +43,7 @@ pub enum Body { impl Message { // A Word on Terminology: // - // The function `serialize_to_ipc` is what actually serializes a function to + // The function `serialize_to_ipc` is what actually serializes a message to // its final binary/flatbuffer form. // // The function `serialize` just "serializes" (in this case converts) our diff --git a/src/arrow_format_nif.erl b/src/arrow_format_nif.erl index 6c819b7..5c3c533 100644 --- a/src/arrow_format_nif.erl +++ b/src/arrow_format_nif.erl @@ -1,5 +1,5 @@ -module(arrow_format_nif). --export([test_decode/1, test_encode/1, serialize_message/1]). +-export([test_decode/1, test_encode/1, serialize_message/1, serialize_footer/1]). -include("cargo.hrl"). -on_load(init/0). @@ -18,6 +18,9 @@ test_encode(_) -> serialize_message(_) -> ?NOT_LOADED. +serialize_footer(_) -> + ?NOT_LOADED. + %%%=================================================================== %%% NIF %%%=================================================================== diff --git a/src/serde_arrow_ipc_file.erl b/src/serde_arrow_ipc_file.erl index 4d1100e..0cde04f 100644 --- a/src/serde_arrow_ipc_file.erl +++ b/src/serde_arrow_ipc_file.erl @@ -54,7 +54,7 @@ blocks(Offset, [H | T], Blocks, EMFs) -> %% @doc Serializes a file into the IPC File Format -spec to_ipc(File :: #file{}) -> SerializedFile :: binary(). to_ipc(File) -> - Footer = <<"Footer!">>, + Footer = arrow_format_nif:serialize_footer(File#file.footer), Sz = byte_size(Footer), <<"ARROW1", "00", (File#file.body)/binary, Footer/bitstring, Sz:4/little-signed-integer-unit:8, "ARROW1">>. diff --git a/src/serde_arrow_ipc_file.hrl b/src/serde_arrow_ipc_file.hrl index 184f055..a8f9f1d 100644 --- a/src/serde_arrow_ipc_file.hrl +++ b/src/serde_arrow_ipc_file.hrl @@ -8,7 +8,8 @@ version = v5 :: serde_arrow_ipc_message:metadata_version(), schema :: #schema{}, dictionaries = [] :: [#block{}], - record_batches :: [#block{}] + record_batches :: [#block{}], + custom_metadata = [] :: [serde_arrow_ipc_message:key_value()] }). -record(file, {footer :: #footer{}, body :: binary()}). diff --git a/test/arrow_format_nif_SUITE.erl b/test/arrow_format_nif_SUITE.erl index 5ed9b24..0280b94 100644 --- a/test/arrow_format_nif_SUITE.erl +++ b/test/arrow_format_nif_SUITE.erl @@ -6,6 +6,7 @@ -include_lib("stdlib/include/assert.hrl"). -include("serde_arrow_ipc_message.hrl"). +-include("serde_arrow_ipc_file.hrl"). -include("serde_arrow_ipc_marks_data.hrl"). @@ -28,7 +29,7 @@ -define(LargeList, schema(large_list)). all() -> - [test_decode, test_encode, serialize_message]. + [test_decode, test_encode, serialize_message, serialize_footer]. %%%%%%%%%%%%%%%%% %% test_decode %% @@ -121,6 +122,13 @@ serialize_message(_Config) -> RecordBatchMsg = (?RecordBatchMsg)#message{body = undefined}, ?assert(is_binary(arrow_format_nif:serialize_message(RecordBatchMsg))). +%%%%%%%%%%%%%%%%%%%%%%% +%% serialize_footer %% +%%%%%%%%%%%%%%%%%%%%%%% + +serialize_footer(_Config) -> + ?assert(is_binary(arrow_format_nif:serialize_footer((?File)#file.footer))). + %%%%%%%%%%% %% Utils %% %%%%%%%%%%% diff --git a/test/serde_arrow_ipc_file_SUITE.erl b/test/serde_arrow_ipc_file_SUITE.erl index 33e4e81..0d14ca8 100644 --- a/test/serde_arrow_ipc_file_SUITE.erl +++ b/test/serde_arrow_ipc_file_SUITE.erl @@ -16,6 +16,7 @@ all() -> valid_schema_on_from_erlang, valid_dictionaries_on_from_erlang, valid_record_batches_on_from_erlang, + valid_custom_metadata_on_from_erlang, valid_body_on_from_erlang, valid_magic_string_on_to_ipc, @@ -44,6 +45,9 @@ valid_record_batches_on_from_erlang(_Config) -> }, ?assertEqual((?File)#file.footer#footer.record_batches, [RecordBatchBlock]). +valid_custom_metadata_on_from_erlang(_Config) -> + ?assertEqual((?File)#file.footer#footer.custom_metadata, []). + valid_body_on_from_erlang(_Config) -> ?assertEqual((?File)#file.body, ?Stream). @@ -65,7 +69,7 @@ valid_footer_on_to_ipc(_Config) -> FooterSz = byte_size(?SerializedFile) * 8 - (64 + StreamSz + 32 + 48), <<_ARROW_MAGIC:64/bitstring, _Stream:StreamSz/bitstring, Footer:FooterSz/bitstring, _FooterSz:32/signed-little-integer, "ARROW1">> = ?SerializedFile, - ?assertEqual(Footer, <<"Footer!">>). + ?assertEqual(Footer, arrow_format_nif:serialize_footer((?File)#file.footer)). valid_stream_on_to_ipc(_Config) -> Sz = byte_size(?Stream) * 8,