Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Reading data chunk by chunk #987

Open
ratal opened this issue May 10, 2022 · 9 comments
Open

Reading data chunk by chunk #987

ratal opened this issue May 10, 2022 · 9 comments
Labels
question Further information is requested

Comments

@ratal
Copy link

ratal commented May 10, 2022

I have a running crate extracting data from a file into ndarray later exposed with PyO3 for big data but I would like to migrate it to arrow instead to benefit performance and arrow ecosystem.
The data blocks read are row based and I currently read several columns in parallel into already allocated ndarrays using rayon from a chunk of data in memory (not whole data block in order to avoid consuming too much memory). Types are mainly primitives but also utf8, complex and multi dimensional arrays of primitive or complex. File bytes can be little or big endians.

Can I have your advice on best method to read this data with arrow2 ? A few thoughts and directions I considered so far:

  1. I noticed there is Buffer to create a PrimitiveArray but comparing to official arrow crate, there is not from_bytes() implementation and PrimitiveArray does not vectorise Buffer. How to input data chunk by chunk into a Buffer ? Is there a way to input bytes along with a DataType ?
  2. It seems I could use MutablePrimitiveArray like a Vec with defined capacity but I am not sure this is performing. I am also bit afraid of cost to convert to PrimitiveArray.
  3. Is it still acceptable to simply keep my ndarray implementation and at the end convert everything back to arrow2 ? I did not see any zero copy from ndarray.
@ratal ratal changed the title Writing chunked data Reading data chunk by chunk May 10, 2022
@jorgecarleitao jorgecarleitao added the question Further information is requested label May 11, 2022
@jorgecarleitao
Copy link
Owner

jorgecarleitao commented May 11, 2022

Thanks for reaching out and for the explanation, very interesting!

You understood it absolutely right: MutablePrimitiveArray and the like are useful to add optional item by optional item, Buffer, Bitmap and Vec are for bulk operations of values and validity.

I am assuming that the files are custom formats (e.g. not parquet) and that you know how to read them. In that case, the most performant way to create a primitive array is to read to values: Vec<T> and use PrimitiveArray::new(DataType::..., values.into(), None). To read to Vec efficiently, you can use

fn read_uncompressed_buffer<T: NativeType, R: Read>(
    reader: &mut R,
    length: usize,
    is_little_endian: bool,
) -> Result<Vec<T>> {
    let bytes = length * std::mem::size_of::<T>();

    // it is undefined behavior to call read_exact on un-initialized, https://doc.rust-lang.org/std/io/trait.Read.html#tymethod.read
    // see also https://github.com/MaikKlein/ash/issues/354#issue-781730580
    let mut buffer = vec![T::default(); length];

    if is_native_little_endian() == is_little_endian {
        // fast case where we can just copy the contents as is
        let slice = bytemuck::cast_slice_mut(&mut buffer);
        reader.read_exact(slice)?;
    } else {
        read_swapped(reader, length, &mut buffer, is_little_endian)?;
    }
    Ok(buffer)
}


fn read_swapped<T: NativeType, R: Read>(
    reader: &mut R,
    length: usize,
    buffer: &mut Vec<T>,
    is_little_endian: bool,
) -> Result<()> {
    // slow case where we must reverse bits
    let mut slice = vec![0u8; length * std::mem::size_of::<T>()];
    reader.read_exact(&mut slice)?;

    let chunks = slice.chunks_exact(std::mem::size_of::<T>());
    if !is_little_endian {
        // machine is little endian, file is big endian
        buffer
            .as_mut_slice()
            .iter_mut()
            .zip(chunks)
            .try_for_each(|(slot, chunk)| {
                let a: T::Bytes = match chunk.try_into() {
                    Ok(a) => a,
                    Err(_) => unreachable!(),
                };
                *slot = T::from_be_bytes(a);
                Result::Ok(())
            })?;
    } else {
        // machine is big endian, file is little endian
    }
    Ok(())
}

Vec<T> to Buffer<T> is O(1), since Buffer is basically anArc<Vec<T>>.

In other words, read the data into Vec<T> as you do in Rust and create an array via PrimitiveArray::new(DataType::..., values.into(), None). CPU-wise this operation is essentially an zeroed allocation + a read to that region.

Does it make sense?

For Utf8Array (say [a, bb, ccc]) you need to create two buffers

  • values: Vec<u8> with [a,b,b,c,c,c]
  • offsets: Vec<i32> with [0, 1, 3, 6] (first is always 0; first element of values has a length of 1 = 1-0, second element (bb) has a length of 2 = 3-1 , last (ccc) has a length of 3 = 6-3).

and then use Utf8Array::<i32>::new(DataType::Utf8, values.into(), offsets.into(), None). The creation of these vectors depend on the details of how the strings are stored in disk. Lengths are measured in bytes, values between two offsets must be valid utf8 (we check). Use try_new to error on invalid data (e.g. invalid utf8), use unsafe new_unchecked or unsafe try_new_unchecked to skip some of the checks.

@ratal
Copy link
Author

ratal commented May 11, 2022

Thanks for your answer :)
To correct my intention ; the data blocks I want to read are consecutive records/rows, typically like [record1, record2, record3] with record=[channel1:u8, channel2:u64, channel3:f64]. And I read the data block by chunks: chunk1=[record1..recordn], chunk2=[recordn+1..recordz], etc.
example snippet for reading u16:

// data is a ndarray already allocated, data_chunk is Vec<u8> containing n records of length record_length
if swap_needed {
      for (i, record) in data_chunk.chunks(record_length).enumerate() {
          value =
              &record[pos_byte_beg..pos_byte_beg + std::mem::size_of::<u16>()];
          data[i + previous_index] = u16::from_be_bytes(
              value.try_into().expect("Could not read be u16"),
          );
      }
  } else {
      for (i, record) in data_chunk.chunks(record_length).enumerate() {
          value =
              &record[pos_byte_beg..pos_byte_beg + std::mem::size_of::<u16>()];
          data[i + previous_index] = u16::from_le_bytes(
              value.try_into().expect("Could not read le u16"),
          );
      }
  }

But I will focus based on your advice to read into Vec to more easily save into PrimitiveArray for 1D arrays. I will have to study more the API to convert then into complex and n dimensional arrays.

@jorgecarleitao
Copy link
Owner

A useful way of thinking about arrow2 is:

  • Buffer<T> ~= Arc<Vec<T>>
  • Bitmap ~= Arc<Vec<u8>>
  • PrimitmiveArray<T> ~= (Buffer<T>, Bitmap).

That is why to build a PrimitiveArray we can just use Vec<T>.

The format is row/record based. In that case we always need to perform a transposition of the data, as you are showing, which means that we can't load multiple values in one go as I was describing.

One way we do transpositions in this case is to declare "mutable columns" and push to them item by item in record. This allows us to only transverse all records once. We do something similar for CSV, see here. That is essentially:

let mut all: Vec<Box<dyn Array>> = vec![];
for i, column in columns.enumerate():
     let mut data = vec![];
     for row in rows:
          data.push(row[i])
     let c: Box<dyn Array> = finalize(data);
     all.push(c)

@ratal
Copy link
Author

ratal commented May 24, 2022

Thanks.
I changed my approach by having Vec basically for all columnar data (1D arrays), including complex which I then convert into Arrow PrimitiveArray, FixedSizeListArray (for complex), Utf8Array (for Strings), BinaryArray for variable length binary and FixedSizeBinary for fixed length binary array. Most of them using Buffer::from_slice(), so I expect little performance impact.
The only thing I am not so sure of is about Bitmap creation. I can have boolean array (Vec) that I convert to Bitmap with .from_u8_slice() but it is not clear what those u8 should contain. Looking at code, I could not figure out. Bits should already be packed in u8 to be used by .with from_slice() or is it converting from a rust bool(u8) and pack bits into Bitmap ?

Also the file I am reading can contain ndarrays, changing by timestamp. I looked at tensor in apache/arrow and several issue tickets. It seems it is not well supported and tested yet so I suppose you are not implementing it yet. But could you advise which type is most suitable to have compatibility with Pandas or Polars ? I have seen in Jira FixedSizeBinary but seems weird to use binary if type is already known.

@ratal
Copy link
Author

ratal commented May 31, 2022

While trying to use MutableArray, I am facing issues with missing traits implementation, mainly for MutableFixedSizeListArray with PartialEq and Default.
I am using it to store Vec<Option<Complex>> with MutableFixedSizeListArray<MutablePrimitiveArray>.
PartialEq is not critical, mainly needed for doing tests, I could do reference comparison after conversion to non mutable Array. But for Default, I would like to use a mem::take() of MutableArray to store in Chunks from a complicated structure representing the file format metadata. take() requires Default to be implemented. Is there a reason why it is not implemented like for PrimitiveArray ?
PS: I tried to implement by myself, but seems not allowed.

@jorgecarleitao
Copy link
Owner

Nice, that is interesting! I am also trying to improve the API for COW semantics atm!

wrt to Default and PartialEq - no reason, just forgotten - let's add it! Would you like to do it, or would you like me to?

@jorgecarleitao
Copy link
Owner

wrt to the Vec<bool>, that is a different format than Bitmap, which contains Vec<u8> where every element is a bit. One way to convert one to the other is via Bitmap::from_trusted_len_iter(vec.into_iter()) (it is still a bit expensive).

@ratal
Copy link
Author

ratal commented Jun 16, 2022

Hi
Sorry for the delayed answer. I am not sure if I am comfortable enough to implement Default and PartialEq but I can try. I am still at learning phase of Rust.

@ratal
Copy link
Author

ratal commented Jan 16, 2024

I am currently integrating further arrow2 into mdfr
But I am facing issues with mutable reference issues for arrays that are not primitive. In relation with #1601 where it is simple to add get_mut_values() for fixedsizedbinary, it becomes more complicated for arrays including offset (Utf8, Largebinary, arrow2 fork ). Getting also complicated for fixedsizelistarray.
So COW seems practicable for primitives but is it applicable also for other arrays ? Did I miss something ?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants