Skip to content

Commit

Permalink
replace arrowrs with arrow2 (#224)
Browse files Browse the repository at this point in the history
Co-authored-by: Lei Xu <lei@eto.ai>
  • Loading branch information
Renkai and eddyxu authored Oct 19, 2022
1 parent 41e96ac commit 3e457ba
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 49 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ python/wheels
logs
*.ckpt

# Rust
Cargo.lock
target

docs/_build
docs/api/python

Expand Down
3 changes: 2 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ name = "lance"
crate-type=["staticlib", "cdylib", "lib"]

[dependencies]
arrow = { version = "20.0", default-features = false }
arrow2 = {version = "0.14.2", features = ["compute_arithmetics", "compute_take"]}
clap = { version = "3", features = ["derive"] }
prost = "0.11.0"
byteorder = "1.4"
bytemuck = "1.12.1"

[build-dependencies]
prost-build = { version = "0.11" }
5 changes: 4 additions & 1 deletion rust/src/bin/lance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ fn main() {
let f = File::open(path).unwrap();
let reader = FileReader::new(f).unwrap();
println!("Number of RecordBatch: {}", reader.num_chunks());
println!("Schema: {}\n", reader.schema())
println!("Schema: {}\n", reader.schema());
use std::any::TypeId;
let is_little_endian = TypeId::of::<byteorder::NativeEndian>() == TypeId::of::<byteorder::LittleEndian>();
println!("Is little endian {:?}", is_little_endian)
}
}
}
13 changes: 7 additions & 6 deletions rust/src/encodings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

use std::fmt;
use std::io::Result;
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, Int32Array};
use arrow::datatypes::ArrowPrimitiveType;
use arrow2::array::{Array, Int32Array};
use arrow2::types::NativeType;

pub mod plain;

Expand Down Expand Up @@ -46,12 +47,12 @@ pub trait Encoder {
}

/// Decoder.
pub trait Decoder<T: ArrowPrimitiveType> {
pub trait Decoder<T: NativeType> {
type ArrowType;

fn decode(&mut self, offset: i32, length: &Option<i32>) -> Result<ArrayRef>;
fn decode(&mut self, offset: i32, length: &Option<i32>) -> Result<Box<dyn Array>> ;

fn take(&mut self, indices: &Int32Array) -> Result<ArrayRef>;
fn take(&mut self, indices: &Int32Array) -> Result<Box<dyn Array>>;

fn value(&self, i: usize) -> Result<T::Native>;
fn value(&self, i: usize) -> Result<T>;
}
78 changes: 45 additions & 33 deletions rust/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@

//! Plain encoding

use arrow::array::{make_array, new_empty_array, ArrayDataBuilder, ArrayRef, Int32Array};
use arrow::buffer::MutableBuffer;
use arrow::compute::{subtract_scalar_dyn, take};
use arrow::datatypes::{ArrowPrimitiveType, Int32Type};
use std::any::TypeId;
use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom};

use arrow2::array::{Array, MutableArray, MutablePrimitiveArray, PrimitiveArray};
use arrow2::array::new_empty_array;
use arrow2::array::Int32Array;
use arrow2::compute::arithmetics::basic::sub_scalar;
use arrow2::compute::take::take;
use arrow2::types::NativeType;

use crate::encodings::Decoder;

/// Plain Decoder
Expand All @@ -39,50 +43,58 @@ impl<'a, R: Read + Seek> PlainDecoder<'a, R> {
}
}

impl<'a, R: Read + Seek, T: ArrowPrimitiveType> Decoder<T> for PlainDecoder<'a, R> {
impl<'a, R: Read + Seek, T: NativeType> Decoder<T> for PlainDecoder<'a, R> {
type ArrowType = T;

fn decode(&mut self, offset: i32, length: &Option<i32>) -> Result<ArrayRef> {
fn decode(&mut self, offset: i32, length: &Option<i32>) -> Result<Box<dyn Array>> {
let read_len = length.unwrap_or((self.page_length - (offset as i64)) as i32) as usize;
(*self.file).seek(SeekFrom::Start(self.position + offset as u64))?;
let mut mutable_buf = MutableBuffer::new(read_len * T::get_byte_width());
(*self.file).read_exact(mutable_buf.as_slice_mut())?;
let builder = ArrayDataBuilder::new(T::DATA_TYPE).buffers(vec![mutable_buf.into()]);
let data = match builder.build() {
Ok(d) => d,
Err(e) => {
return Err(Error::new(
ErrorKind::InvalidData,
format!("Invalid builder: {}", e),
))
}
};
Ok(make_array(data))
let mut buffer = vec![T::default(); read_len];

if TypeId::of::<byteorder::NativeEndian>() == TypeId::of::<byteorder::LittleEndian>() {
let slice = bytemuck::cast_slice_mut(&mut buffer);
(*self.file).read_exact(slice)?;
let arr = PrimitiveArray::from_vec(buffer);
Ok(Box::new(arr))
} else {
let mut slice = vec![0u8; read_len * std::mem::size_of::<T>()];
(*self.file).read_exact(&mut slice)?;
let chunks = slice.chunks_exact(std::mem::size_of::<T>());
buffer
.as_mut_slice()
.iter_mut()
.zip(chunks)
.try_for_each(|(slot, chunk)| {
let a: T::Bytes = match chunk.try_into() {
Ok(a) => a,
Err(_) => unreachable!(),
};
*slot = T::from_le_bytes(a);
Ok(())
})?;
let arr = PrimitiveArray::from_vec(buffer);
Ok(Box::new(arr))
}
}

fn take(&mut self, indices: &Int32Array) -> Result<ArrayRef> {
if indices.is_empty() {
return Ok(new_empty_array(&T::DATA_TYPE));
fn take(&mut self, indices: &Int32Array) -> Result<Box<dyn Array>> {
if indices.len() == 0 {
return Ok(new_empty_array(T::PRIMITIVE.into()));
}

let start = indices.value(0);
let length = indices.values().last().map(|i| i - start);
// Not sure why it needs cast.
let values = <PlainDecoder<'a, R> as Decoder<T>>::decode(self, start, &length)?;
let reset_indices = match subtract_scalar_dyn::<Int32Type>(&values, start) {
Ok(arr) => arr,
Err(e) => return Err(Error::new(ErrorKind::InvalidData, e.to_string())),
};
match take(
&values,
reset_indices.as_any().downcast_ref::<Int32Array>().unwrap(),
None,
) {
let reset_indices = sub_scalar(&indices, &start);

let res = take(values.as_ref(), &reset_indices);
match res {
Ok(arr) => Ok(arr),
Err(e) => Err(Error::new(ErrorKind::InvalidData, format!("Error take indices: {}", e)))
}
}

fn value(&self, i: usize) -> Result<T::Native> {
fn value(&self, i: usize) -> Result<T> {
todo!()
}
}
73 changes: 65 additions & 8 deletions rust/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
//! Lance Dataset Schema

use std::fmt;
use std::fmt::Debug;

use arrow::datatypes::DataType;
use arrow2::datatypes::{DataType, TimeUnit};

use crate::encodings::Encoding;
use crate::format::pb;
Expand All @@ -38,19 +39,19 @@ pub struct Field {
}

impl Field {
pub fn new(field: &arrow::datatypes::Field) -> Field {
pub fn new(field: &arrow2::datatypes::Field) -> Field {
Field {
id: -1,
parent_id: -1,
name: field.name().clone(),
logical_type: field.data_type().to_string(),
name: field.name.clone(),
logical_type: Self::type_str(field.data_type().to_logical_type()),
extension_name: String::new(),
encoding: match field.data_type() {
t if DataType::is_numeric(t) => Some(Encoding::Plain),
t if Self::is_numeric(t) => Some(Encoding::Plain),
DataType::Binary | DataType::Utf8 | DataType::LargeBinary | DataType::LargeUtf8 => {
Some(Encoding::VarBinary)
}
DataType::Dictionary(_, _) => Some(Encoding::Dictionary),
DataType::Dictionary(_, _, _) => Some(Encoding::Dictionary),
_ => None,
},
node_type: 0,
Expand Down Expand Up @@ -100,6 +101,53 @@ impl Field {
}
}

/// Return Arrow Data Type name.
pub fn type_str(t: &DataType) -> String {
match t {
DataType::Boolean => "bool",
DataType::UInt8 => "uint8",
DataType::Int8 => "int8",
DataType::UInt16 => "uint16",
DataType::Int16 => "int16",
DataType::UInt32 => "uint32",
DataType::Int32 => "int32",
DataType::UInt64 => "uint64",
DataType::Int64 => "int64",
DataType::Float16 => "halffloat",
DataType::Float32 => "float",
DataType::Float64 => "double",
DataType::Date32 => "date32:day",
DataType::Date64 => "date64:ms",
DataType::Time32(unit) => format!("time32:{}", to_str(unit)),
DataType::Time64(unit) => format!("time64:{}", to_str(unit)),
DataType::Timestamp(unit, _) => format!("timestamp:{}", to_str(unit)),
DataType::Binary => "binary",
DataType::Utf8 => "string",
DataType::LargeBinary => "largebinary",
DataType::LargeUtf8 => "largestring",
DataType::FixedSizeBinary(len) => format!("fixed_size_binary:{}", len),
DataType::FixedSizeList(v, len) => format!("fixed_size_list:{}:{}", Self::type_str(v.data_type()), len),
_ => panic!(),
}.to_string()
}

fn is_numeric(t: &DataType) -> bool {
use DataType::*;
matches!(
t,
UInt8
| UInt16
| UInt32
| UInt64
| Int8
| Int16
| Int32
| Int64
| Float32
| Float64
)
}

fn insert(&mut self, child: Field) {
self.children.push(child)
}
Expand All @@ -118,6 +166,15 @@ impl Field {
}
}

fn to_str(unit: &TimeUnit) -> &'static str {
match unit {
TimeUnit::Second => { "s" }
TimeUnit::Millisecond => { "ms" }
TimeUnit::Microsecond => { "us" }
TimeUnit::Nanosecond => { "ns" }
}
}

impl fmt::Display for Field {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(
Expand Down Expand Up @@ -147,10 +204,10 @@ pub struct Schema {

impl Schema {
/// Create a Schema from arrow schema.
pub fn new(schema: &arrow::datatypes::Schema) -> Schema {
pub fn new(schema: &arrow2::datatypes::Schema) -> Schema {
Schema {
fields: schema
.fields()
.fields
.iter()
.map(Field::new)
.collect(),
Expand Down

0 comments on commit 3e457ba

Please sign in to comment.