Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize Message Metadata into Flatbuffers #32

Merged
merged 15 commits into from
Sep 21, 2023
310 changes: 170 additions & 140 deletions native/arrow_format_nif/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use rustler::Atom;
use rustler::types::Binary;
use rustler::{Atom, Env};

mod message;
mod utils;

use message::record_batch::{Buffer, Compression, FieldNode, RecordBatch};
use message::schema::field::{Field, Name};
use message::schema::types::{FixedSizeList, Int, Type};
use message::schema::{Endianness, Feature, Schema};
use message::{Header, Message, Version};
use message::Message;

mod atoms {
rustler::atoms! {
Expand Down Expand Up @@ -49,10 +46,156 @@ mod atoms {
}
}

#[cfg(test)]
pub mod test {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you could have this extracted to a test.rs file in the src directory. This is a non-blocker, though :)

pub mod fixtures {
use arrow_format::ipc;

pub fn arrow_schema() -> ipc::Message {
ipc::Message {
version: ipc::MetadataVersion::V5,
header: Some(ipc::MessageHeader::Schema(Box::new(ipc::Schema {
endianness: ipc::Endianness::Little,
fields: Some(vec![
ipc::Field {
name: Some("id".to_string()),
nullable: true,
type_: Some(ipc::Type::Int(Box::new(ipc::Int {
bit_width: 8,
is_signed: true,
}))),
dictionary: None,
children: None,
custom_metadata: None,
},
ipc::Field {
name: Some("name".to_string()),
nullable: true,
type_: Some(ipc::Type::LargeBinary(Box::new(ipc::LargeBinary {}))),
dictionary: None,
children: None,
custom_metadata: None,
},
ipc::Field {
name: Some("age".to_string()),
nullable: true,
type_: Some(ipc::Type::Int(Box::new(ipc::Int {
bit_width: 8,
is_signed: false,
}))),
dictionary: None,
children: None,
custom_metadata: None,
},
ipc::Field {
name: Some("annual_marks".to_string()),
nullable: true,
type_: Some(ipc::Type::FixedSizeList(Box::new(ipc::FixedSizeList {
list_size: 3,
}))),
dictionary: None,
children: Some(vec![ipc::Field {
name: None,
nullable: true,
type_: Some(ipc::Type::Int(Box::new(ipc::Int {
bit_width: 8,
is_signed: false,
}))),
dictionary: None,
children: None,
custom_metadata: None,
}]),
custom_metadata: None,
},
]),
custom_metadata: None,
features: Some(vec![ipc::Feature::Unused]),
}))),
body_length: 0i64,
custom_metadata: None,
}
}

pub fn arrow_record_batch() -> ipc::Message {
ipc::Message {
version: ipc::MetadataVersion::V5,
header: Some(ipc::MessageHeader::RecordBatch(Box::new(
ipc::RecordBatch {
length: 4,
nodes: Some(vec![
ipc::FieldNode {
length: 4,
null_count: 1,
},
ipc::FieldNode {
length: 4,
null_count: 1,
},
ipc::FieldNode {
length: 4,
null_count: 1,
},
ipc::FieldNode {
length: 4,
null_count: 1,
},
]),
buffers: Some(vec![
ipc::Buffer {
offset: 0,
length: 1,
},
ipc::Buffer {
offset: 8,
length: 4,
},
ipc::Buffer {
offset: 16,
length: 1,
},
ipc::Buffer {
offset: 24,
length: 20,
},
ipc::Buffer {
offset: 48,
length: 15,
},
ipc::Buffer {
offset: 64,
length: 1,
},
ipc::Buffer {
offset: 72,
length: 4,
},
ipc::Buffer {
offset: 80,
length: 1,
},
ipc::Buffer {
offset: 88,
length: 2,
},
ipc::Buffer {
offset: 96,
length: 10,
},
]),
compression: None,
},
))),
body_length: 0i64,
custom_metadata: None,
}
}
}
}

/// Returns `:ok` on successful decodes.
///
/// This function tests Message's (and its components')
/// `rustler::types::Decoder` implementation(s), and has only been for testing
/// `rustler::types::Decoder` implementation(s), and is only used for testing
/// purposes.
#[rustler::nif]
fn test_decode(_msg: Message) -> Atom {
Expand All @@ -62,144 +205,31 @@ fn test_decode(_msg: Message) -> Atom {
/// Returns an example `Message` of a `Schema` or a `RecordBatch`.
///
/// This function tests Message's (and its components')
/// `rustler::types::Encoder` implementation(s), and has only been for testing
/// `rustler::types::Encoder` implementation(s), and is only used for testing
/// purposes.
#[rustler::nif]
fn test_encode(msg_type: Atom) -> Message {
if msg_type == atoms::schema() {
Message {
version: Version::V5,
header: Header::Schema(Schema {
endianness: Endianness::Little,
fields: vec![
Field {
name: Name::from("id"),
nullable: true,
r#type: Type::Int(Int {
bit_width: 8,
is_signed: true,
}),
dictionary: atoms::undefined(),
children: vec![],
custom_metadata: vec![],
},
Field {
name: Name::from("name"),
nullable: true,
r#type: Type::LargeBinary,
dictionary: atoms::undefined(),
children: vec![],
custom_metadata: vec![],
},
Field {
name: Name::from("age"),
nullable: true,
r#type: Type::Int(Int {
bit_width: 8,
is_signed: false,
}),
dictionary: atoms::undefined(),
children: vec![],
custom_metadata: vec![],
},
Field {
name: Name::from("annual_marks"),
nullable: true,
r#type: Type::FixedSizeList(FixedSizeList { list_size: 3 }),
dictionary: atoms::undefined(),
children: vec![Field {
name: Name::Atom(atoms::undefined()),
nullable: true,
r#type: Type::Int(Int {
bit_width: 8,
is_signed: false,
}),
dictionary: atoms::undefined(),
children: vec![],
custom_metadata: vec![],
}],
custom_metadata: vec![],
},
],
custom_metadata: vec![],
features: vec![Feature::Unused],
}),
body_length: 0,
custom_metadata: vec![],
body: atoms::undefined(),
}
utils::schema()
} else {
Message {
version: Version::V5,
header: Header::RecordBatch(RecordBatch {
length: 4,
nodes: vec![
FieldNode {
length: 4,
null_count: 1,
},
FieldNode {
length: 4,
null_count: 1,
},
FieldNode {
length: 4,
null_count: 1,
},
FieldNode {
length: 4,
null_count: 1,
},
],
buffers: vec![
Buffer {
offset: 0,
length: 1,
},
Buffer {
offset: 8,
length: 4,
},
Buffer {
offset: 16,
length: 1,
},
Buffer {
offset: 24,
length: 20,
},
Buffer {
offset: 48,
length: 15,
},
Buffer {
offset: 64,
length: 1,
},
Buffer {
offset: 72,
length: 4,
},
Buffer {
offset: 80,
length: 1,
},
Buffer {
offset: 88,
length: 2,
},
Buffer {
offset: 96,
length: 10,
},
],
compression: Compression::Undefined,
}),
body_length: 640,
custom_metadata: vec![],
body: atoms::undefined(),
}
utils::record_batch()
}
}

rustler::init!("arrow_format_nif", [test_decode, test_encode]);
/// Serializes a message into its correspondding flatbuffers.
///
/// This function serializes a message into its correspondding flatbuffers and
/// returns a binary
#[rustler::nif]
fn serialize_message(env: Env, message: Message) -> Binary {
let flatbuffers = message.serialize_to_ipc();

let mut erl_bin = rustler::types::OwnedBinary::new(flatbuffers.len()).unwrap();
erl_bin.as_mut_slice().copy_from_slice(&flatbuffers);
erl_bin.release(env)
}

rustler::init!(
"arrow_format_nif",
[test_decode, test_encode, serialize_message]
);
Loading