Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Images] [3/N] Add image decoding for uint8 images. #981

Merged
merged 4 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pandas==1.3.5; python_version < '3.8'
pandas==2.0.2; python_version >= '3.8'
xxhash>=3.0.0
Pillow==9.5.0
opencv-python==4.7.0.72

# Ray
ray[data, default]==2.4.0
Expand Down
11 changes: 9 additions & 2 deletions src/array/from.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;

use crate::datatypes::{
BinaryArray, BooleanArray, DaftNumericType, DaftPhysicalType, DataType, Field, Utf8Array,
Utf8Type,
BinaryArray, BooleanArray, DaftNumericType, DaftPhysicalType, DataType, Field, NullArray,
Utf8Array, Utf8Type,
};

use crate::array::DataArray;
Expand All @@ -17,6 +17,13 @@ impl<T: DaftNumericType> From<(&str, Box<arrow2::array::PrimitiveArray<T::Native
}
}

impl From<(&str, Box<arrow2::array::NullArray>)> for NullArray {
fn from(item: (&str, Box<arrow2::array::NullArray>)) -> Self {
let (name, array) = item;
DataArray::new(Field::new(name, DataType::Null).into(), array).unwrap()
}
}

impl From<(&str, Box<arrow2::array::Utf8Array<i64>>)> for Utf8Array {
fn from(item: (&str, Box<arrow2::array::Utf8Array<i64>>)) -> Self {
let (name, array) = item;
Expand Down
198 changes: 129 additions & 69 deletions src/array/ops/image.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::borrow::Cow;
use std::vec;

use image::ImageBuffer;
use image::{ColorType, DynamicImage, ImageBuffer};

use crate::datatypes::logical::ImageArray;
use crate::datatypes::{DataType, Field, ImageMode};
use crate::error::DaftResult;
use crate::datatypes::{BinaryArray, DataType, Field, ImageMode};
use crate::error::{DaftError, DaftResult};
use image::{Luma, LumaA, Rgb, Rgba};

use super::as_arrow::AsArrow;
Expand All @@ -15,7 +15,7 @@ use std::ops::Deref;

#[allow(clippy::upper_case_acronyms, dead_code)]
#[derive(Debug)]
enum DaftImageBuffer<'a> {
pub enum DaftImageBuffer<'a> {
L(ImageBuffer<Luma<u8>, Cow<'a, [u8]>>),
LA(ImageBuffer<LumaA<u8>, Cow<'a, [u8]>>),
RGB(ImageBuffer<Rgb<u8>, Cow<'a, [u8]>>),
Expand Down Expand Up @@ -58,17 +58,7 @@ impl<'a> DaftImageBuffer<'a> {
with_method_on_image_buffer!(self, width)
}

pub fn channels(&self) -> u16 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why delete this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have ImageMode::num_chanels(), trying to keep a single source of truth there.

use DaftImageBuffer::*;
match self {
L(..) | L16(..) => 1,
LA(..) | LA16(..) => 2,
RGB(..) | RGB16(..) | RGB32F(..) => 3,
RGBA(..) | RGBA16(..) | RGBA32F(..) => 4,
}
}

pub fn get_as_u8(&'a self) -> &'a [u8] {
pub fn as_u8_slice(&'a self) -> &'a [u8] {
use DaftImageBuffer::*;
match self {
L(img) => img.as_raw(),
Expand All @@ -79,22 +69,32 @@ impl<'a> DaftImageBuffer<'a> {
}
}

pub fn mode(&self) -> ImageMode {
pub fn color(&self) -> ColorType {
use DaftImageBuffer::*;
match self {
L(..) => ImageMode::L,
LA(..) => ImageMode::LA,
RGB(..) => ImageMode::RGB,
RGBA(..) => ImageMode::RGBA,
L16(..) => ImageMode::L16,
LA16(..) => ImageMode::LA16,
RGB16(..) => ImageMode::RGB16,
RGBA16(..) => ImageMode::RGBA16,
RGB32F(..) => ImageMode::RGB32F,
RGBA32F(..) => ImageMode::RGBA32F,
L(..) => ColorType::L8,
LA(..) => ColorType::La8,
RGB(..) => ColorType::Rgb8,
RGBA(..) => ColorType::Rgba8,
L16(..) => ColorType::L16,
LA16(..) => ColorType::La16,
RGB16(..) => ColorType::Rgb16,
RGBA16(..) => ColorType::Rgba16,
RGB32F(..) => ColorType::Rgb32F,
RGBA32F(..) => ColorType::Rgba32F,
}
}

pub fn mode(&self) -> ImageMode {
self.color().try_into().unwrap()
}

pub fn decode(bytes: &[u8]) -> DaftResult<Self> {
image::load_from_memory(bytes)
.map(|v| v.into())
.map_err(|e| DaftError::ValueError(format!("Decoding image from bytes failed: {}", e)))
}

pub fn resize(&self, w: u32, h: u32) -> Self {
use DaftImageBuffer::*;
match self {
Expand All @@ -118,7 +118,7 @@ impl<'a> DaftImageBuffer<'a> {
image::imageops::resize(imgbuf, w, h, image::imageops::FilterType::Triangle);
DaftImageBuffer::RGBA(image_buffer_vec_to_cow(result))
}
_ => unimplemented!("mode not implemented"),
_ => unimplemented!("Mode {self:?} not implemented"),
}
}
}
Expand All @@ -136,6 +136,44 @@ where
ImageBuffer::from_raw(w, h, owned).unwrap()
}

impl<'a> From<DynamicImage> for DaftImageBuffer<'a> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you intending to take ownership of the buffer of DynamicImage or Borrow the buffer? Right now it's taking ownership.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ownership, since we should only be doing this conversion on ingest (decoding), right? I can change this to borrow the buffer if we think that will be needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine then, we can implement a From<&'a DynamicImage> down the road when/if we need to borrow.

fn from(dyn_img: DynamicImage) -> Self {
match dyn_img {
DynamicImage::ImageLuma8(img_buf) => {
DaftImageBuffer::<'a>::L(image_buffer_vec_to_cow(img_buf))
}
DynamicImage::ImageLumaA8(img_buf) => {
DaftImageBuffer::<'a>::LA(image_buffer_vec_to_cow(img_buf))
}
DynamicImage::ImageRgb8(img_buf) => {
DaftImageBuffer::<'a>::RGB(image_buffer_vec_to_cow(img_buf))
}
DynamicImage::ImageRgba8(img_buf) => {
DaftImageBuffer::<'a>::RGBA(image_buffer_vec_to_cow(img_buf))
}
DynamicImage::ImageLuma16(img_buf) => {
DaftImageBuffer::<'a>::L16(image_buffer_vec_to_cow(img_buf))
}
DynamicImage::ImageLumaA16(img_buf) => {
DaftImageBuffer::<'a>::LA16(image_buffer_vec_to_cow(img_buf))
}
DynamicImage::ImageRgb16(img_buf) => {
DaftImageBuffer::<'a>::RGB16(image_buffer_vec_to_cow(img_buf))
}
DynamicImage::ImageRgba16(img_buf) => {
DaftImageBuffer::<'a>::RGBA16(image_buffer_vec_to_cow(img_buf))
}
DynamicImage::ImageRgb32F(img_buf) => {
DaftImageBuffer::<'a>::RGB32F(image_buffer_vec_to_cow(img_buf))
}
DynamicImage::ImageRgba32F(img_buf) => {
DaftImageBuffer::<'a>::RGBA32F(image_buffer_vec_to_cow(img_buf))
}
_ => unimplemented!("{dyn_img:?} not implemented"),
}
}
}

impl ImageArray {
fn image_mode(&self) -> &Option<ImageMode> {
match self.logical_type() {
Expand Down Expand Up @@ -227,7 +265,6 @@ impl ImageArray {

assert_eq!(result.height(), h);
assert_eq!(result.width(), w);
assert_eq!(result.channels(), c);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets keep this validation in

Copy link
Contributor Author

@clarkzinzow clarkzinzow Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This DaftImageBuffer::channels() method was duplicating the ImageMode::num_channels() logic, so I'd like to consolidate that to one place if possible. We're also no longer pulling the channels from the DaftImageBuffer in from_daft_image_buffer() (we're pulling from ImageMode::num_channels() instead), so I don't see motivation for continuing to have a DaftImageBuffer::channels() API.

Some(result)
}

Expand All @@ -236,11 +273,10 @@ impl ImageArray {
.map(|i| self.as_image_obj(i))
.map(|img| img.map(|img| img.resize(w, h)))
.collect::<Vec<_>>();

Self::from_daft_image_buffers(self.name(), result.as_slice(), self.image_mode())
}

fn from_daft_image_buffers(
pub fn from_daft_image_buffers(
name: &str,
inputs: &[Option<DaftImageBuffer<'_>>],
image_mode: &Option<ImageMode>,
Expand All @@ -258,70 +294,58 @@ impl ImageArray {
let mut modes = Vec::with_capacity(inputs.len());
let mut widths = Vec::with_capacity(inputs.len());
let mut offsets = Vec::with_capacity(inputs.len() + 1);
let mut is_valid = Vec::with_capacity(inputs.len());
offsets.push(0i64);
let mut validity = arrow2::bitmap::MutableBitmap::with_capacity(inputs.len());

for ib in inputs {
if let Some(ib) = ib {
heights.push(ib.height());
widths.push(ib.width());
channels.push(ib.channels());

let buffer = ib.get_as_u8();
data_ref.push(buffer);
offsets.push(buffer.len() as i64 + offsets.last().unwrap());
modes.push(ib.mode() as u8);
is_valid.push(true);
} else {
heights.push(0u32);
widths.push(0u32);
channels.push(0u16);
offsets.push(*offsets.last().unwrap());
modes.push(0);
is_valid.push(false);
}
validity.push(ib.is_some());
let (height, width, mode, buffer) = match ib {
Some(ib) => (ib.height(), ib.width(), ib.mode(), ib.as_u8_slice()),
None => (0u32, 0u32, ImageMode::L, &[] as &[u8]),
};
heights.push(height);
widths.push(width);
modes.push(mode as u8);
channels.push(mode.num_channels());
data_ref.push(buffer);
offsets.push(offsets.last().unwrap() + buffer.len() as i64);
}

let collected_data = data_ref.concat();
let offsets = arrow2::offset::OffsetsBuffer::try_from(offsets)?;
let data_type = crate::datatypes::DataType::Image(
Box::new(crate::datatypes::DataType::UInt8),
*image_mode,
);
let value_dtype = DataType::UInt8;
let data_type = DataType::Image(Box::new(value_dtype.clone()), *image_mode);

let validity = arrow2::bitmap::Bitmap::from(is_valid);
let validity: Option<arrow2::bitmap::Bitmap> = match validity.unset_bits() {
0 => None,
_ => Some(validity.into()),
};
let arrow_dtype = value_dtype.to_arrow()?;

let list_datatype = arrow2::datatypes::DataType::LargeList(Box::new(
arrow2::datatypes::Field::new("data", arrow2::datatypes::DataType::UInt8, true),
arrow2::datatypes::Field::new("data", arrow_dtype, true),
));
let data_array = Box::new(arrow2::array::ListArray::<i64>::new(
list_datatype,
offsets,
Box::new(arrow2::array::UInt8Array::from_vec(collected_data)),
Some(validity.clone()),
Box::new(arrow2::array::PrimitiveArray::from_vec(collected_data)),
validity.clone(),
));

let values: Vec<Box<dyn arrow2::array::Array>> = vec![
data_array,
Box::new(
arrow2::array::UInt16Array::from_vec(channels)
.with_validity(Some(validity.clone())),
),
Box::new(
arrow2::array::UInt32Array::from_vec(heights).with_validity(Some(validity.clone())),
),
Box::new(
arrow2::array::UInt32Array::from_vec(widths).with_validity(Some(validity.clone())),
),
Box::new(
arrow2::array::UInt8Array::from_vec(modes).with_validity(Some(validity.clone())),
arrow2::array::UInt16Array::from_vec(channels).with_validity(validity.clone()),
),
Box::new(arrow2::array::UInt32Array::from_vec(heights).with_validity(validity.clone())),
Box::new(arrow2::array::UInt32Array::from_vec(widths).with_validity(validity.clone())),
Box::new(arrow2::array::UInt8Array::from_vec(modes).with_validity(validity.clone())),
];
let physical_type = data_type.to_physical();
let struct_array = Box::new(arrow2::array::StructArray::new(
physical_type.to_arrow()?,
values,
Some(validity),
validity,
));

let daft_struct_array = crate::datatypes::StructArray::new(
Expand All @@ -334,3 +358,39 @@ impl ImageArray {
))
}
}

impl BinaryArray {
pub fn image_decode(&self) -> DaftResult<ImageArray> {
let arrow_array = self
.data()
.as_any()
.downcast_ref::<arrow2::array::BinaryArray<i64>>()
.unwrap();
let mut img_bufs = Vec::<Option<DaftImageBuffer>>::with_capacity(arrow_array.len());
let mut cached_dtype: Option<DataType> = None;
// Load images from binary buffers.
// Confirm that all images have the same value dtype.
for row in arrow_array.iter() {
let img_buf = row.map(DaftImageBuffer::decode).transpose()?;
let dtype = img_buf.as_ref().map(|im| im.mode().get_dtype());
match (dtype.as_ref(), cached_dtype.as_ref()) {
(Some(t1), Some(t2)) => {
if t1 != t2 {
return Err(DaftError::ValueError(format!("All images in a column must have the same dtype, but got: {:?} and {:?}", t1, t2)));
}
}
(Some(t1), None) => {
cached_dtype = Some(t1.clone());
}
(None, _) => {}
}
img_bufs.push(img_buf);
}
// Series::image_decode() guarantees that we have at least one non-None element in this array.
let cached_dtype = cached_dtype.unwrap();
match cached_dtype {
DataType::UInt8 => Ok(ImageArray::from_daft_image_buffers(self.name(), img_bufs.as_slice(), &None)?),
_ => unimplemented!("Decoding images of dtype {cached_dtype:?} is not supported, only uint8 images are supported."),
}
}
}
2 changes: 1 addition & 1 deletion src/array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod full;
pub(crate) mod groups;
mod hash;
mod if_else;
mod image;
pub(crate) mod image;
mod len;
mod list;
mod list_agg;
Expand Down
29 changes: 29 additions & 0 deletions src/datatypes/image_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,35 @@ impl ImageMode {
[L, LA, RGB, RGBA, L16, LA16, RGB16, RGBA16, RGB32F, RGBA32F];
MODES.iter()
}
pub fn get_dtype(&self) -> DataType {
self.into()
}
}

impl TryFrom<image::ColorType> for ImageMode {
type Error = DaftError;

fn try_from(color: image::ColorType) -> DaftResult<Self> {
use image::ColorType;
use ImageMode::*;

match color {
ColorType::L8 => Ok(L),
ColorType::La8 => Ok(LA),
ColorType::Rgb8 => Ok(RGB),
ColorType::Rgba8 => Ok(RGBA),
ColorType::L16 => Ok(L16),
ColorType::La16 => Ok(LA16),
ColorType::Rgb16 => Ok(RGB16),
ColorType::Rgba16 => Ok(RGBA16),
ColorType::Rgb32F => Ok(RGB32F),
ColorType::Rgba32F => Ok(RGBA32F),
_ => Err(DaftError::ValueError(format!(
"Color type {:?} is not supported.",
color
))),
}
}
}

impl FromStr for ImageMode {
Expand Down
Loading