Skip to content

Commit

Permalink
firts arrow-rs parquet export compiling
Browse files Browse the repository at this point in the history
  • Loading branch information
«ratal» committed Mar 7, 2024
1 parent ab2415a commit cf4f152
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 923 deletions.
5 changes: 2 additions & 3 deletions src/export/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Module to export mdf files to other file formats.
pub mod numpy;
pub mod python_arrow_helpers;
//pub mod parquet;
pub mod parquet;
pub mod polars;
// pub mod tensor;
pub mod python_arrow_helpers;
272 changes: 103 additions & 169 deletions src/export/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,197 +1,87 @@
//! Exporting mdf to Parquet files.
use arrow::{
array::Array,
datatypes::DataType,
datatypes::{Field, Metadata, Schema},
error::{Error, Result},
io::parquet::write::{
array_to_columns, compress, to_parquet_schema, CompressedPage, CompressionOptions, DynIter,
DynStreamingIterator, Encoding, FallibleStreamingIterator, FileWriter, Version,
WriteOptions,
},
io::parquet::{read::ParquetError, write::transverse},
array::{Array, RecordBatch},
datatypes::{Field, Schema, SchemaBuilder},
error::Result,
};
use codepage::to_encoding;
use encoding_rs::Encoding as EncodingRs;
use rayon::iter::{
IndexedParallelIterator, IntoParallelRefIterator, ParallelExtend, ParallelIterator,
};

use crate::{
mdfinfo::{
mdfinfo4::{Cn4, MdfInfo4},
MdfInfo,
use parquet::{
arrow::arrow_writer::ArrowWriter,
basic::{BrotliLevel, Compression, Encoding, GzipLevel, ZstdLevel},
file::{
metadata::KeyValue,
properties::{WriterProperties, WriterVersion},
},
mdfreader::Mdf,
};
use rayon::iter::ParallelExtend;

use std::collections::{HashSet, VecDeque};
use std::path::Path;

struct Bla {
columns: VecDeque<CompressedPage>,
current: Option<CompressedPage>,
}
use crate::{mdfinfo::MdfInfo, mdfreader::Mdf};

impl Bla {
pub fn new(columns: VecDeque<CompressedPage>) -> Self {
Self {
columns,
current: None,
}
}
}

impl FallibleStreamingIterator for Bla {
type Item = CompressedPage;
type Error = Error;

fn advance(&mut self) -> Result<()> {
self.current = self.columns.pop_front();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
self.current.as_ref()
}
}
use std::path::Path;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

/// writes mdf into parquet file
pub fn export_to_parquet(mdf: &Mdf, file_name: &str, compression: Option<&str>) -> Result<()> {
//let _ = data_type;
// Create file
let path = Path::new(file_name);

let options = WriteOptions {
write_statistics: true,
version: Version::V2,
compression: parquet_compression_from_string(compression),
data_pagesize_limit: None,
};

// No other encoding yet implemented, to be reviewed later if needed.
let encoding_map = |_data_type: &DataType| Encoding::Plain;
let options = WriterProperties::builder()
.set_compression(parquet_compression_from_string(compression))
.set_writer_version(WriterVersion::PARQUET_1_0)
.set_encoding(Encoding::PLAIN)
.set_key_value_metadata(Some(vec![KeyValue::new(
"file_name".to_string(),
file_name.to_string(),
)]))
.build();

let (arrow_data, mut arrow_schema) = mdf_data_to_arrow(mdf);
arrow_schema
.metadata
.insert("file_name".to_string(), file_name.to_string());

// declare encodings
let encodings = (arrow_schema.fields)
.par_iter()
.map(|f| transverse(&f.data_type, encoding_map))
.collect::<Vec<_>>();

// derive the parquet schema (physical types) from arrow's schema.
let parquet_schema =
to_parquet_schema(&arrow_schema).expect("Failed to create SchemaDescriptor from Schema");

let row_groups = arrow_data.iter().map(|batch| {
// write batch to pages; parallelized by rayon
let columns = batch
.par_iter()
.zip(parquet_schema.fields().to_vec())
.zip(encodings.par_iter())
.flat_map(move |((array, type_), encoding)| {
let encoded_columns = array_to_columns(array, type_, options, encoding)
.expect("Could not convert arrow array to column");
encoded_columns
.into_iter()
.map(|encoded_pages| {
let encoded_pages = DynIter::new(encoded_pages.into_iter().map(|x| {
x.map_err(|e| ParquetError::FeatureNotSupported(e.to_string()))
}));
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
})
.collect::<Result<VecDeque<_>>>()
})
.collect::<Vec<_>>()
})
.collect::<Result<Vec<VecDeque<CompressedPage>>>>()?;

let row_group = DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
Result::Ok(row_group)
});

let file = std::io::BufWriter::new(std::fs::File::create(path).expect("Failed to create file"));
let mut writer = FileWriter::try_new(file, arrow_schema.clone(), options)
let mut writer = ArrowWriter::try_new(file, Arc::new(arrow_schema.clone()), Some(options))
.expect("Failed to write parquet file");

// write data in file
for group in row_groups {
writer.write(group?)?;
for group in arrow_data {
writer.write(&group)?;
}
writer.end(None).expect("Failed to write footer");
writer.close().expect("Failed to write footer");
Ok(())
}

/// converts a clap compression string into a CompressionOptions enum
pub fn parquet_compression_from_string(compression_option: Option<&str>) -> CompressionOptions {
pub fn parquet_compression_from_string(compression_option: Option<&str>) -> Compression {
match compression_option {
Some(option) => match option {
"snappy" => CompressionOptions::Snappy,
"gzip" => CompressionOptions::Gzip(None),
"lzo" => CompressionOptions::Lzo,
"brotli" => CompressionOptions::Brotli(None),
"lz4" => CompressionOptions::Lz4,
"lz4raw" => CompressionOptions::Lz4Raw,
_ => CompressionOptions::Uncompressed,
"snappy" => Compression::SNAPPY,
"gzip" => Compression::GZIP(GzipLevel::try_new(6).expect("Wrong Gzip level")),
"lzo" => Compression::LZO,
"brotli" => Compression::BROTLI(BrotliLevel::try_new(1).expect("Wrong Brotli level")),
"lz4" => Compression::LZ4,
"lz4raw" => Compression::LZ4_RAW,
"zstd" => Compression::ZSTD(ZstdLevel::try_new(1).expect("Wrong Zstd level")),
_ => Compression::UNCOMPRESSED,
},
None => CompressionOptions::Uncompressed,
None => Compression::UNCOMPRESSED,
}
}

/// returns arrow field from cn
#[inline]
fn cn4_field(mdfinfo4: &MdfInfo4, cn: &Cn4, data_type: DataType, is_nullable: bool) -> Field {
let field = Field::new(cn.unique_name.clone(), data_type, is_nullable);
let mut metadata = Metadata::new();
if let Ok(Some(unit)) = mdfinfo4.sharable.get_tx(cn.block.cn_md_unit) {
if !unit.is_empty() {
metadata.insert("unit".to_string(), unit);
}
};
if let Ok(Some(desc)) = mdfinfo4.sharable.get_tx(cn.block.cn_md_comment) {
if !desc.is_empty() {
metadata.insert("description".to_string(), desc);
}
};
if let Some((Some(master_channel_name), _dg_pos, (_cg_pos, _rec_idd), (_cn_pos, _rec_pos))) =
mdfinfo4.channel_names_set.get(&cn.unique_name)
{
if !master_channel_name.is_empty() {
metadata.insert(
"master_channel".to_string(),
master_channel_name.to_string(),
);
}
}
if cn.block.cn_type == 4 {
metadata.insert(
"sync_channel".to_string(),
cn.block.cn_sync_type.to_string(),
);
}
field.with_metadata(metadata)
}

/// takes data of channel set from MdfInfo structure and stores in mdf.arrow_data
fn mdf_data_to_arrow(mdf: &Mdf) -> (Vec<Vec<Box<dyn Array>>>, Schema) {
fn mdf_data_to_arrow(mdf: &Mdf) -> (Vec<RecordBatch>, Schema) {
let mut chunk_index: usize = 0;
let mut array_index: usize = 0;
let mut field_index: usize = 0;
let mut arrow_schema = Schema::default();
match &mdf.mdf_info {
MdfInfo::V4(mdfinfo4) => {
let mut arrow_data: Vec<Vec<Box<dyn Array>>> = Vec::with_capacity(mdfinfo4.dg.len());
arrow_schema.fields = Vec::<Field>::with_capacity(mdfinfo4.channel_names_set.len());
let mut arrow_schema = SchemaBuilder::with_capacity(mdfinfo4.channel_names_set.len());
let mut arrow_data: Vec<RecordBatch> = Vec::with_capacity(mdfinfo4.dg.len());
for (_dg_block_position, dg) in mdfinfo4.dg.iter() {
let mut channel_names_present_in_dg = HashSet::new();
for channel_group in dg.cg.values() {
Expand All @@ -201,43 +91,84 @@ fn mdf_data_to_arrow(mdf: &Mdf) -> (Vec<Vec<Box<dyn Array>>>, Schema) {
if !channel_names_present_in_dg.is_empty() {
dg.cg.iter().for_each(|(_rec_id, cg)| {
let mut columns =
Vec::<Box<dyn Array>>::with_capacity(cg.channel_names.len());
Vec::<Arc<dyn Array>>::with_capacity(cg.channel_names.len());
let mut fields = SchemaBuilder::with_capacity(cg.channel_names.len());
cg.cn.iter().for_each(|(_rec_pos, cn)| {
if !cn.data.is_empty() {
arrow_schema.fields.push(cn4_field(
mdfinfo4,
cn,
let field = Field::new(
cn.unique_name.clone(),
cn.data.arrow_data_type().clone(),
cn.data.validity().is_some(),
));
columns.push(cn.data.boxed());
);
let mut metadata = HashMap::<String, String>::new();
if let Ok(Some(unit)) =
mdfinfo4.sharable.get_tx(cn.block.cn_md_unit)
{
if !unit.is_empty() {
metadata.insert("unit".to_string(), unit);
}
};
if let Ok(Some(desc)) =
mdfinfo4.sharable.get_tx(cn.block.cn_md_comment)
{
if !desc.is_empty() {
metadata.insert("description".to_string(), desc);
}
};
if let Some((
Some(master_channel_name),
_dg_pos,
(_cg_pos, _rec_idd),
(_cn_pos, _rec_pos),
)) = mdfinfo4.channel_names_set.get(&cn.unique_name)
{
if !master_channel_name.is_empty() {
metadata.insert(
"master_channel".to_string(),
master_channel_name.to_string(),
);
}
}
if cn.block.cn_type == 4 {
metadata.insert(
"sync_channel".to_string(),
cn.block.cn_sync_type.to_string(),
);
}
arrow_schema.push(field.clone());
fields.push(field.with_metadata(metadata));
columns.push(cn.data.finish_cloned());
array_index += 1;
field_index += 1;
}
});
arrow_data.push(columns);
arrow_data.push(
RecordBatch::try_new(Arc::new(fields.finish()), columns)
.expect("Failed creating recordbatch"),
);
chunk_index += 1;
array_index = 0;
});
}
}
(arrow_data, arrow_schema)
(arrow_data, arrow_schema.finish())
}
MdfInfo::V3(mdfinfo3) => {
let mut arrow_data: Vec<Vec<Box<dyn Array>>> = Vec::with_capacity(mdfinfo3.dg.len());
arrow_schema.fields = Vec::<Field>::with_capacity(mdfinfo3.channel_names_set.len());
let mut arrow_schema = SchemaBuilder::with_capacity(mdfinfo3.channel_names_set.len());
let mut arrow_data: Vec<RecordBatch> = Vec::with_capacity(mdfinfo3.dg.len());
for (_dg_block_position, dg) in mdfinfo3.dg.iter() {
for (_rec_id, cg) in dg.cg.iter() {
let mut columns = Vec::<Box<dyn Array>>::with_capacity(cg.channel_names.len());
let mut columns = Vec::<Arc<dyn Array>>::with_capacity(cg.channel_names.len());
let mut fields = SchemaBuilder::with_capacity(cg.channel_names.len());
for (_rec_pos, cn) in cg.cn.iter() {
if !cn.data.is_empty() {
let field = Field::new(
cn.unique_name.clone(),
cn.data.arrow_data_type().clone(),
false,
);
columns.push(cn.data.boxed());
let mut metadata = Metadata::new();
columns.push(cn.data.finish_cloned());
let mut metadata = HashMap::<String, String>::new();
if let Some(array) =
mdfinfo3.sharable.cc.get(&cn.block1.cn_cc_conversion)
{
Expand All @@ -264,18 +195,21 @@ fn mdf_data_to_arrow(mdf: &Mdf) -> (Vec<Vec<Box<dyn Array>>>, Schema) {
master_channel_name.to_string(),
);
}
let field = field.with_metadata(metadata);
arrow_schema.fields.push(field);
arrow_schema.push(field.clone());
fields.push(field.with_metadata(metadata));
array_index += 1;
field_index += 1;
}
}
arrow_data.push(columns);
arrow_data.push(
RecordBatch::try_new(Arc::new(fields.finish()), columns)
.expect("Failed creating recordbatch"),
);
chunk_index += 1;
array_index = 0;
}
}
(arrow_data, arrow_schema)
(arrow_data, arrow_schema.finish())
}
}
}
Loading

0 comments on commit cf4f152

Please sign in to comment.