From 1ac0418557d7da066db923545a316f8c7db3c3c0 Mon Sep 17 00:00:00 2001 From: Emil Ernerfeldt Date: Mon, 15 Jan 2024 15:19:03 +0100 Subject: [PATCH] Remove tests of odbc and parquet --- examples/io_odbc.rs | 83 --------------------------------- examples/parquet_read.rs | 48 ------------------- examples/parquet_read_async.rs | 61 ------------------------ examples/parquet_write.rs | 59 ----------------------- examples/parquet_write_async.rs | 57 ---------------------- 5 files changed, 308 deletions(-) delete mode 100644 examples/io_odbc.rs delete mode 100644 examples/parquet_read.rs delete mode 100644 examples/parquet_read_async.rs delete mode 100644 examples/parquet_write.rs delete mode 100644 examples/parquet_write_async.rs diff --git a/examples/io_odbc.rs b/examples/io_odbc.rs deleted file mode 100644 index d103b5ac4c..0000000000 --- a/examples/io_odbc.rs +++ /dev/null @@ -1,83 +0,0 @@ -//! Demo of how to write to, and read from, an ODBC connector -//! -//! On an Ubuntu, you need to run the following (to install the driver): -//! ```bash -//! sudo apt install libsqliteodbc sqlite3 unixodbc-dev -//! sudo sed --in-place 's/libsqlite3odbc.so/\/usr\/lib\/x86_64-linux-gnu\/odbc\/libsqlite3odbc.so/' /etc/odbcinst.ini -//! ``` -use re_arrow2::array::{Array, Int32Array, Utf8Array}; -use re_arrow2::chunk::Chunk; -use re_arrow2::datatypes::{DataType, Field}; -use re_arrow2::error::Result; -use re_arrow2::io::odbc::api; -use re_arrow2::io::odbc::api::Cursor; -use re_arrow2::io::odbc::read; -use re_arrow2::io::odbc::write; - -fn main() -> Result<()> { - let connector = "Driver={SQLite3};Database=sqlite-test.db"; - let env = api::Environment::new()?; - let connection = env.connect_with_connection_string(connector)?; - - // let's create an empty table with a schema - connection.execute("DROP TABLE IF EXISTS example;", ())?; - connection.execute("CREATE TABLE example (c1 INT, c2 TEXT);", ())?; - - // and now let's write some data into it (from arrow arrays!) - // first, we prepare the statement - let query = "INSERT INTO example (c1, c2) VALUES (?, ?)"; - let prepared = connection.prepare(query).unwrap(); - - // secondly, we initialize buffers from odbc-api - let fields = vec![ - // (for now) the types here must match the tables' schema - Field::new("unused", DataType::Int32, true), - Field::new("unused", DataType::LargeUtf8, true), - ]; - - // third, we initialize the writer - let mut writer = write::Writer::try_new(prepared, fields)?; - - // say we have (or receive from a channel) a chunk: - let chunk = Chunk::new(vec![ - Box::new(Int32Array::from_slice([1, 2, 3])) as Box, - Box::new(Utf8Array::::from([Some("Hello"), None, Some("World")])), - ]); - - // we write it like this - writer.write(&chunk)?; - - // and we can later read from it - let chunks = read(&connection, "SELECT c1 FROM example")?; - - // and the result should be the same - assert_eq!(chunks[0].columns()[0], chunk.columns()[0]); - - Ok(()) -} - -/// Reads chunks from a query done against an ODBC connection -pub fn read(connection: &api::Connection<'_>, query: &str) -> Result>>> { - let mut a = connection.prepare(query)?; - let fields = read::infer_schema(&a)?; - - let max_batch_size = 100; - let buffer = read::buffer_from_metadata(&a, max_batch_size)?; - - let cursor = a.execute(())?.unwrap(); - let mut cursor = cursor.bind_buffer(buffer)?; - - let mut chunks = vec![]; - while let Some(batch) = cursor.fetch()? { - let arrays = (0..batch.num_cols()) - .zip(fields.iter()) - .map(|(index, field)| { - let column_view = batch.column(index); - read::deserialize(column_view, field.data_type.clone()) - }) - .collect::>(); - chunks.push(Chunk::new(arrays)); - } - - Ok(chunks) -} diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs deleted file mode 100644 index 4b839bb20a..0000000000 --- a/examples/parquet_read.rs +++ /dev/null @@ -1,48 +0,0 @@ -use std::fs::File; -use std::time::SystemTime; - -use re_arrow2::error::Error; -use re_arrow2::io::parquet::read; - -fn main() -> Result<(), Error> { - // say we have a file - use std::env; - let args: Vec = env::args().collect(); - let file_path = &args[1]; - let mut reader = File::open(file_path)?; - - // we can read its metadata: - let metadata = read::read_metadata(&mut reader)?; - - // and infer a [`Schema`] from the `metadata`. - let schema = read::infer_schema(&metadata)?; - - // we can filter the columns we need (here we select all) - let schema = schema.filter(|_index, _field| true); - - // we can read the statistics of all parquet's row groups (here for each field) - for field in &schema.fields { - let statistics = read::statistics::deserialize(field, &metadata.row_groups)?; - println!("{statistics:#?}"); - } - - // say we found that we only need to read the first two row groups, "0" and "1" - let row_groups = metadata - .row_groups - .into_iter() - .enumerate() - .filter(|(index, _)| *index == 0 || *index == 1) - .map(|(_, row_group)| row_group) - .collect(); - - // we can then read the row groups into chunks - let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None, None); - - let start = SystemTime::now(); - for maybe_chunk in chunks { - let chunk = maybe_chunk?; - assert!(!chunk.is_empty()); - } - println!("took: {} ms", start.elapsed().unwrap().as_millis()); - Ok(()) -} diff --git a/examples/parquet_read_async.rs b/examples/parquet_read_async.rs deleted file mode 100644 index a431f6b596..0000000000 --- a/examples/parquet_read_async.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::time::SystemTime; - -use futures::future::BoxFuture; -use tokio::fs::File; -use tokio::io::BufReader; -use tokio_util::compat::*; - -use re_arrow2::error::Result; -use re_arrow2::io::parquet::read::{self, RowGroupDeserializer}; - -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<()> { - let start = SystemTime::now(); - - use std::env; - let args: Vec = env::args().collect(); - let file_path = Box::new(args[1].clone()); - - // # Read metadata - let mut reader = BufReader::new(File::open(file_path.as_ref()).await?).compat(); - - // this operation is usually done before reading the data, during planning. - // This is a mix of IO and CPU-bounded tasks but both of them are O(1) - let metadata = read::read_metadata_async(&mut reader).await?; - let schema = read::infer_schema(&metadata)?; - - // This factory yields one file descriptor per column and is used to read columns concurrently. - // They do not need to be buffered since we execute exactly 1 seek and 1 read on them. - let factory = || { - Box::pin(async { Ok(File::open(file_path.clone().as_ref()).await?.compat()) }) - as BoxFuture<_> - }; - - // This is the row group loop. Groups can be skipped based on the statistics they carry. - for row_group in &metadata.row_groups { - // A row group is consumed in two steps: the first step is to read the (compressed) - // columns into memory, which is IO-bounded. - let column_chunks = read::read_columns_many_async( - factory, - row_group, - schema.fields.clone(), - None, - None, - None, - ) - .await?; - - // the second step is to iterate over the columns in chunks. - // this operation is CPU-bounded and should be sent to a separate thread pool (e.g. `tokio_rayon`) to not block - // the runtime. - // Furthermore, this operation is trivially paralellizable e.g. via rayon, as each iterator - // can be advanced in parallel (parallel decompression and deserialization). - let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows(), None); - for maybe_chunk in chunks { - let chunk = maybe_chunk?; - println!("{}", chunk.len()); - } - } - println!("took: {} ms", start.elapsed().unwrap().as_millis()); - Ok(()) -} diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs deleted file mode 100644 index 6b816a51b4..0000000000 --- a/examples/parquet_write.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::fs::File; - -use re_arrow2::{ - array::{Array, Int32Array}, - chunk::Chunk, - datatypes::{Field, Schema}, - error::Result, - io::parquet::write::{ - transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version, - WriteOptions, - }, -}; - -fn write_chunk(path: &str, schema: Schema, chunk: Chunk>) -> Result<()> { - let options = WriteOptions { - write_statistics: true, - compression: CompressionOptions::Uncompressed, - version: Version::V2, - data_pagesize_limit: None, - }; - - let iter = vec![Ok(chunk)]; - - let encodings = schema - .fields - .iter() - .map(|f| transverse(&f.data_type, |_| Encoding::Plain)) - .collect(); - - let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?; - - // Create a new empty file - let file = File::create(path)?; - - let mut writer = FileWriter::try_new(file, schema, options)?; - - for group in row_groups { - writer.write(group?)?; - } - let _size = writer.end(None)?; - Ok(()) -} - -fn main() -> Result<()> { - let array = Int32Array::from(&[ - Some(0), - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - ]); - let field = Field::new("c1", array.data_type().clone(), true); - let schema = Schema::from(vec![field]); - let chunk = Chunk::new(vec![array.boxed()]); - - write_chunk("test.parquet", schema, chunk) -} diff --git a/examples/parquet_write_async.rs b/examples/parquet_write_async.rs deleted file mode 100644 index 772d486b88..0000000000 --- a/examples/parquet_write_async.rs +++ /dev/null @@ -1,57 +0,0 @@ -use futures::SinkExt; -use tokio::fs::File; - -use re_arrow2::{ - array::{Array, Int32Array}, - chunk::Chunk, - datatypes::{Field, Schema}, - error::Result, - io::parquet::write::{ - transverse, CompressionOptions, Encoding, FileSink, Version, WriteOptions, - }, -}; -use tokio_util::compat::TokioAsyncReadCompatExt; - -async fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Result<()> { - let options = WriteOptions { - write_statistics: true, - compression: CompressionOptions::Uncompressed, - version: Version::V2, - data_pagesize_limit: None, - }; - - let mut stream = futures::stream::iter(vec![Ok(columns)].into_iter()); - - // Create a new empty file - let file = File::create(path).await?.compat(); - - let encodings = schema - .fields - .iter() - .map(|f| transverse(&f.data_type, |_| Encoding::Plain)) - .collect(); - - let mut writer = FileSink::try_new(file, schema, encodings, options)?; - - writer.send_all(&mut stream).await?; - writer.close().await?; - Ok(()) -} - -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<()> { - let array = Int32Array::from(&[ - Some(0), - Some(1), - Some(2), - Some(3), - Some(4), - Some(5), - Some(6), - ]); - let field = Field::new("c1", array.data_type().clone(), true); - let schema = Schema::from(vec![field]); - let columns = Chunk::new(vec![array.boxed()]); - - write_batch("test.parquet", schema, columns).await -}