Skip to content

Commit

Permalink
Implement dictionary encoding (#136)
Browse files Browse the repository at this point in the history
* Implement dictionary encoding

* Chunked array dictionary encoding
  • Loading branch information
kylebarron authored Aug 15, 2024
1 parent fcdf5b8 commit 6ee73b7
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 22 deletions.
2 changes: 1 addition & 1 deletion arro3-compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow = { workspace = true, features = ["ffi"] }
pyo3 = { workspace = true, features = ["abi3-py38"] }
thiserror = { workspace = true }
pyo3-arrow = { path = "../pyo3-arrow" }
thiserror = { workspace = true }
19 changes: 19 additions & 0 deletions arro3-compute/python/arro3/compute/_compute.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ def cast(
The casted Arrow data.
"""

@overload
def dictionary_encode(array: types.ArrowArrayExportable) -> core.Array: ...
@overload
def dictionary_encode(array: types.ArrowStreamExportable) -> core.ArrayReader: ...
def dictionary_encode(
array: types.ArrowArrayExportable | types.ArrowStreamExportable,
) -> core.Array | core.ArrayReader:
"""
Dictionary-encode array.
Return a dictionary-encoded version of the input array. This function does nothing if the input is already a dictionary array.
Args:
array: Argument to compute function.
Returns:
The dictionary-encoded array.
"""

@overload
def list_flatten(input: types.ArrowArrayExportable) -> core.Array: ...
@overload
Expand Down
79 changes: 79 additions & 0 deletions arro3-compute/src/dictionary_encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::sync::Arc;

use arrow::array::{AsArray, GenericByteDictionaryBuilder, PrimitiveDictionaryBuilder};
use arrow::datatypes::{
BinaryType, ByteArrayType, Int32Type, LargeBinaryType, LargeUtf8Type, Utf8Type,
};
use arrow::downcast_primitive_array;
use arrow_array::{ArrayRef, ArrowPrimitiveType, GenericByteArray, PrimitiveArray};
use arrow_schema::{ArrowError, DataType, Field};
use pyo3::prelude::*;
use pyo3_arrow::error::PyArrowResult;
use pyo3_arrow::ffi::ArrayIterator;
use pyo3_arrow::input::AnyArray;
use pyo3_arrow::{PyArray, PyArrayReader};

// Note: for chunked array input, each output chunk will not necessarily have the same dictionary
#[pyfunction]
pub(crate) fn dictionary_encode(py: Python, array: AnyArray) -> PyArrowResult<PyObject> {
match array {
AnyArray::Array(array) => {
let (array, _field) = array.into_inner();
let output_array = dictionary_encode_array(array)?;
Ok(PyArray::from_array_ref(output_array).to_arro3(py)?)
}
AnyArray::Stream(stream) => {
let reader = stream.into_reader()?;

let existing_field = reader.field();
let output_data_type = DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(existing_field.data_type().clone()),
);
let output_field = Field::new("", output_data_type, true);

let iter = reader.into_iter().map(move |array| {
let output_array = dictionary_encode_array(array?)?;
Ok(output_array)
});
Ok(
PyArrayReader::new(Box::new(ArrayIterator::new(iter, output_field.into())))
.to_arro3(py)?,
)
}
}
}

fn dictionary_encode_array(array: ArrayRef) -> Result<ArrayRef, ArrowError> {
let array_ref = array.as_ref();
let array = downcast_primitive_array!(
array_ref => {
primitive_dictionary_encode(array_ref)
}
DataType::Utf8 => bytes_dictionary_encode(array.as_bytes::<Utf8Type>()),
DataType::LargeUtf8 => bytes_dictionary_encode(array.as_bytes::<LargeUtf8Type>()),
DataType::Binary => bytes_dictionary_encode(array.as_bytes::<BinaryType>()),
DataType::LargeBinary => bytes_dictionary_encode(array.as_bytes::<LargeBinaryType>()),
DataType::Dictionary(_, _) => array,
d => return Err(ArrowError::ComputeError(format!("{d:?} not supported in rank")))
);
Ok(array)
}

#[inline(never)]
fn primitive_dictionary_encode<T: ArrowPrimitiveType>(array: &PrimitiveArray<T>) -> ArrayRef {
let mut builder = PrimitiveDictionaryBuilder::<Int32Type, T>::new();
for value in array {
builder.append_option(value);
}
Arc::new(builder.finish())
}

#[inline(never)]
fn bytes_dictionary_encode<T: ByteArrayType>(array: &GenericByteArray<T>) -> ArrayRef {
let mut builder = GenericByteDictionaryBuilder::<Int32Type, T>::new();
for value in array {
builder.append_option(value);
}
Arc::new(builder.finish())
}
2 changes: 2 additions & 0 deletions arro3-compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pyo3::prelude::*;

mod cast;
mod concat;
mod dictionary_encode;
mod list_flatten;
mod list_offsets;
mod struct_field;
Expand All @@ -20,6 +21,7 @@ fn _compute(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {

m.add_wrapped(wrap_pyfunction!(cast::cast))?;
m.add_wrapped(wrap_pyfunction!(concat::concat))?;
m.add_wrapped(wrap_pyfunction!(dictionary_encode::dictionary_encode))?;
m.add_wrapped(wrap_pyfunction!(list_flatten::list_flatten))?;
m.add_wrapped(wrap_pyfunction!(list_offsets::list_offsets))?;
m.add_wrapped(wrap_pyfunction!(struct_field::struct_field))?;
Expand Down
12 changes: 12 additions & 0 deletions arro3-core/python/arro3/core/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,23 @@ class ArrayReader:

class ChunkedArray:
"""An Arrow ChunkedArray."""
@overload
def __init__(
self, arrays: ArrowArrayExportable | ArrowStreamExportable, type: None = None
) -> None: ...
@overload
def __init__(
self,
arrays: Sequence[ArrowArrayExportable],
type: ArrowSchemaExportable | None = None,
) -> None: ...
def __init__(
self,
arrays: ArrowArrayExportable
| ArrowStreamExportable
| Sequence[ArrowArrayExportable],
type: ArrowSchemaExportable | None = None,
) -> None: ...
def __array__(self, dtype=None, copy=None) -> NDArray:
"""
An implementation of the Array interface, for interoperability with numpy and
Expand Down
52 changes: 31 additions & 21 deletions pyo3-arrow/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,29 +221,39 @@ impl Display for PyChunkedArray {
#[pymethods]
impl PyChunkedArray {
#[new]
fn init(arrays: Vec<PyArray>, r#type: Option<PyField>) -> PyArrowResult<Self> {
let (chunks, fields): (Vec<_>, Vec<_>) =
arrays.into_iter().map(|arr| arr.into_inner()).unzip();
if !fields
.windows(2)
.all(|w| w[0].data_type().equals_datatype(w[1].data_type()))
{
return Err(PyTypeError::new_err(
"Cannot create a ChunkedArray with differing data types.",
fn init(arrays: &Bound<PyAny>, r#type: Option<PyField>) -> PyArrowResult<Self> {
if let Ok(data) = arrays.extract::<AnyArray>() {
Ok(data.into_chunked_array()?)
} else if let Ok(arrays) = arrays.extract::<Vec<PyArray>>() {
// TODO: move this into from_arrays?
let (chunks, fields): (Vec<_>, Vec<_>) =
arrays.into_iter().map(|arr| arr.into_inner()).unzip();
if !fields
.windows(2)
.all(|w| w[0].data_type().equals_datatype(w[1].data_type()))
{
return Err(PyTypeError::new_err(
"Cannot create a ChunkedArray with differing data types.",
)
.into());
}

let field = r#type
.map(|py_data_type| py_data_type.into_inner())
.unwrap_or_else(|| fields[0].clone());

Ok(PyChunkedArray::try_new(
chunks,
Field::new("", field.data_type().clone(), true)
.with_metadata(field.metadata().clone())
.into(),
)?)
} else {
Err(
PyTypeError::new_err("Expected ChunkedArray-like input or sequence of arrays.")
.into(),
)
.into());
}

let field = r#type
.map(|py_data_type| py_data_type.into_inner())
.unwrap_or_else(|| fields[0].clone());

Ok(PyChunkedArray::try_new(
chunks,
Field::new("", field.data_type().clone(), true)
.with_metadata(field.metadata().clone())
.into(),
)?)
}

#[pyo3(signature = (dtype=None, copy=None))]
Expand Down
45 changes: 45 additions & 0 deletions tests/compute/test_dictionary_encode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import pyarrow as pa
import pyarrow.compute as pc
from arro3.core import ChunkedArray
from arro3.compute import dictionary_encode


def test_dictionary_encode():
arr = pa.array([1, 2, 3, 1, 2, 2, 3, 1, 1, 1], type=pa.uint16())
out = dictionary_encode(arr)
out_pc = pc.dictionary_encode(arr) # type: ignore
assert pa.array(out) == out_pc

arr = pa.array(["1", "2", "3", "1", "2", "2", "3", "1", "1", "1"], type=pa.utf8())
out = dictionary_encode(arr)
out_pc = pc.dictionary_encode(arr) # type: ignore
assert pa.array(out) == out_pc

arr = arr.cast(pa.large_utf8())
out = dictionary_encode(arr)
out_pc = pc.dictionary_encode(arr) # type: ignore
assert pa.array(out) == out_pc

arr = arr.cast(pa.binary())
out = dictionary_encode(arr)
out_pc = pc.dictionary_encode(arr) # type: ignore
assert pa.array(out) == out_pc

arr = arr.cast(pa.large_binary())
out = dictionary_encode(arr)
out_pc = pc.dictionary_encode(arr) # type: ignore
assert pa.array(out) == out_pc


def test_dictionary_encode_chunked():
arr = pa.chunked_array([[3, 2, 3], [1, 2, 2], [3, 1, 1, 1]], type=pa.uint16())
out = ChunkedArray(dictionary_encode(arr))

out_retour = pa.chunked_array(out)
out_pc = pc.dictionary_encode(arr) # type: ignore

# Since these arrays have different dictionaries, array and arrow scalar comparison
# will fail.
assert len(out_retour) == len(out_pc)
for i in range(len(out_retour)):
assert out_retour[i].as_py() == out_pc[i].as_py()

0 comments on commit 6ee73b7

Please sign in to comment.