Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize Message Metadata into Flatbuffers #32

Merged
merged 15 commits into from
Sep 21, 2023
164 changes: 24 additions & 140 deletions native/arrow_format_nif/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use rustler::Atom;
use rustler::types::Binary;
use rustler::{Atom, Env};

mod message;
mod utils;

use message::record_batch::{Buffer, Compression, FieldNode, RecordBatch};
use message::schema::field::{Field, Name};
use message::schema::types::{FixedSizeList, Int, Type};
use message::schema::{Endianness, Feature, Schema};
use message::{Header, Message, Version};
use message::Message;

mod atoms {
rustler::atoms! {
Expand Down Expand Up @@ -52,7 +49,7 @@ mod atoms {
/// Returns `:ok` on successful decodes.
///
/// This function tests Message's (and its components')
/// `rustler::types::Decoder` implementation(s), and has only been for testing
/// `rustler::types::Decoder` implementation(s), and is only used for testing
/// purposes.
#[rustler::nif]
fn test_decode(_msg: Message) -> Atom {
Expand All @@ -62,144 +59,31 @@ fn test_decode(_msg: Message) -> Atom {
/// Returns an example `Message` of a `Schema` or a `RecordBatch`.
///
/// This function tests Message's (and its components')
/// `rustler::types::Encoder` implementation(s), and has only been for testing
/// `rustler::types::Encoder` implementation(s), and is only used for testing
/// purposes.
#[rustler::nif]
fn test_encode(msg_type: Atom) -> Message {
if msg_type == atoms::schema() {
Message {
version: Version::V5,
header: Header::Schema(Schema {
endianness: Endianness::Little,
fields: vec![
Field {
name: Name::from("id"),
nullable: true,
r#type: Type::Int(Int {
bit_width: 8,
is_signed: true,
}),
dictionary: atoms::undefined(),
children: vec![],
custom_metadata: vec![],
},
Field {
name: Name::from("name"),
nullable: true,
r#type: Type::LargeBinary,
dictionary: atoms::undefined(),
children: vec![],
custom_metadata: vec![],
},
Field {
name: Name::from("age"),
nullable: true,
r#type: Type::Int(Int {
bit_width: 8,
is_signed: false,
}),
dictionary: atoms::undefined(),
children: vec![],
custom_metadata: vec![],
},
Field {
name: Name::from("annual_marks"),
nullable: true,
r#type: Type::FixedSizeList(FixedSizeList { list_size: 3 }),
dictionary: atoms::undefined(),
children: vec![Field {
name: Name::Atom(atoms::undefined()),
nullable: true,
r#type: Type::Int(Int {
bit_width: 8,
is_signed: false,
}),
dictionary: atoms::undefined(),
children: vec![],
custom_metadata: vec![],
}],
custom_metadata: vec![],
},
],
custom_metadata: vec![],
features: vec![Feature::Unused],
}),
body_length: 0,
custom_metadata: vec![],
body: atoms::undefined(),
}
utils::schema()
} else {
Message {
version: Version::V5,
header: Header::RecordBatch(RecordBatch {
length: 4,
nodes: vec![
FieldNode {
length: 4,
null_count: 1,
},
FieldNode {
length: 4,
null_count: 1,
},
FieldNode {
length: 4,
null_count: 1,
},
FieldNode {
length: 4,
null_count: 1,
},
],
buffers: vec![
Buffer {
offset: 0,
length: 1,
},
Buffer {
offset: 8,
length: 4,
},
Buffer {
offset: 16,
length: 1,
},
Buffer {
offset: 24,
length: 20,
},
Buffer {
offset: 48,
length: 15,
},
Buffer {
offset: 64,
length: 1,
},
Buffer {
offset: 72,
length: 4,
},
Buffer {
offset: 80,
length: 1,
},
Buffer {
offset: 88,
length: 2,
},
Buffer {
offset: 96,
length: 10,
},
],
compression: Compression::Undefined,
}),
body_length: 640,
custom_metadata: vec![],
body: atoms::undefined(),
}
utils::record_batch()
}
}

rustler::init!("arrow_format_nif", [test_decode, test_encode]);
/// Serializes a message into its correspondding flatbuffers.
///
/// This function serializes a message into its correspondding flatbuffers and
/// returns a binary
#[rustler::nif]
fn serialize_message(env: Env, message: Message) -> Binary {
let flatbuffers = message.serialize_to_ipc();

let mut erl_bin = rustler::types::OwnedBinary::new(flatbuffers.len()).unwrap();
erl_bin.as_mut_slice().copy_from_slice(&flatbuffers);
erl_bin.release(env)
}

rustler::init!(
"arrow_format_nif",
[test_decode, test_encode, serialize_message]
);
104 changes: 102 additions & 2 deletions native/arrow_format_nif/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::utils::CustomMetadata;
use rustler::{Atom, NifRecord, NifUnitEnum, NifUntaggedEnum};

use arrow_format::ipc;
use arrow_format::ipc::planus::Builder;
use rustler::{NifRecord, NifUnitEnum, NifUntaggedEnum};

pub mod record_batch;
pub mod schema;
Expand All @@ -14,7 +17,7 @@ pub struct Message {
pub header: Header,
pub body_length: i32,
pub custom_metadata: CustomMetadata,
pub body: Atom,
pub body: Body,
}

#[derive(Debug, NifUnitEnum)]
Expand All @@ -31,3 +34,100 @@ pub enum Header {
Schema(Schema),
RecordBatch(RecordBatch),
}

#[derive(Debug, NifUnitEnum)]
pub enum Body {
Undefined,
}

impl Message {
// A Word on Terminology:
//
// The function `serialize_to_ipc` is what actually serializes a function to
// its final binary/flatbuffer form.
//
// The function `serialize` just "serializes" (in this case converts) our
// message struct to `arrow_format`'s message struct. A similar "naming
// convention" was followed in all the other traits which implement
// `serialize`

pub fn serialize_to_ipc(&self) -> Vec<u8> {
let message = self.serialize();

let mut builder = Builder::new();
builder.finish(message, None).to_vec()
}

pub fn serialize(&self) -> ipc::Message {
ipc::Message {
version: self.version.serialize(),
header: Some(self.header.serialize()),
body_length: 0i64,
custom_metadata: None,
}
}
}

impl Version {
pub fn serialize(&self) -> ipc::MetadataVersion {
match self {
Version::V1 => ipc::MetadataVersion::V1,
Version::V2 => ipc::MetadataVersion::V2,
Version::V3 => ipc::MetadataVersion::V3,
Version::V4 => ipc::MetadataVersion::V4,
Version::V5 => ipc::MetadataVersion::V5,
}
}
}

impl Header {
pub fn serialize(&self) -> ipc::MessageHeader {
match self {
Header::Schema(schema) => ipc::MessageHeader::Schema(Box::new(schema.serialize())),
Header::RecordBatch(record_batch) => {
ipc::MessageHeader::RecordBatch(Box::new(record_batch.serialize()))
}
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::utils;

#[test]
fn test_message_serialize() {
assert_eq!(utils::schema().serialize(), utils::arrow_schema());
assert_eq!(
utils::record_batch().serialize(),
utils::arrow_record_batch()
);
}

#[test]
fn test_version_serialize() {
assert_eq!(Version::V1.serialize(), ipc::MetadataVersion::V1);
assert_eq!(Version::V2.serialize(), ipc::MetadataVersion::V2);
assert_eq!(Version::V3.serialize(), ipc::MetadataVersion::V3);
assert_eq!(Version::V4.serialize(), ipc::MetadataVersion::V4);
assert_eq!(Version::V5.serialize(), ipc::MetadataVersion::V5);
}

#[test]
fn test_header_serialize() {
// Schema

let schema = utils::schema().header;
let arrow_schema = utils::arrow_schema().header.unwrap();

assert_eq!(schema.serialize(), arrow_schema);

// RecordBatch

let record_batch = utils::record_batch().header;
let arrow_record_batch = utils::arrow_record_batch().header.unwrap();

assert_eq!(record_batch.serialize(), arrow_record_batch);
}
}
Loading