Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into 15231-memory-usage-te…
Browse files Browse the repository at this point in the history
…st-infrastructure
  • Loading branch information
pythonspeed committed Mar 27, 2024
2 parents d82679e + 03c5f73 commit 2cb081d
Show file tree
Hide file tree
Showing 200 changed files with 4,589 additions and 2,830 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub struct BinaryViewArrayGeneric<T: ViewType + ?Sized> {

impl<T: ViewType + ?Sized> PartialEq for BinaryViewArrayGeneric<T> {
fn eq(&self, other: &Self) -> bool {
self.into_iter().zip(other).all(|(l, r)| l == r)
self.len() == other.len() && self.into_iter().zip(other).all(|(l, r)| l == r)
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/polars-arrow/src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use polars_error::{polars_bail, polars_err, PolarsResult};
use super::nested::*;
use super::util;
use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::*;
use crate::record_batch::RecordBatch;
use crate::types::months_days_ns;
use crate::with_match_primitive_type;

Expand Down Expand Up @@ -461,7 +461,7 @@ fn skip_item<'a>(
Ok(block)
}

/// Deserializes a [`Block`] assumed to be encoded according to [`AvroField`] into [`Chunk`],
/// Deserializes a [`Block`] assumed to be encoded according to [`AvroField`] into [`RecordBatch`],
/// using `projection` to ignore `avro_fields`.
/// # Panics
/// `fields`, `avro_fields` and `projection` must have the same length.
Expand All @@ -470,7 +470,7 @@ pub fn deserialize(
fields: &[Field],
avro_fields: &[AvroField],
projection: &[bool],
) -> PolarsResult<Chunk<Box<dyn Array>>> {
) -> PolarsResult<RecordBatch<Box<dyn Array>>> {
assert_eq!(fields.len(), avro_fields.len());
assert_eq!(fields.len(), projection.len());

Expand Down Expand Up @@ -508,7 +508,7 @@ pub fn deserialize(
}?
}
}
Chunk::try_new(
RecordBatch::try_new(
arrays
.iter_mut()
.zip(projection.iter())
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ mod util;
pub use schema::infer_schema;

use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Field;
use crate::record_batch::RecordBatch;

/// Single threaded, blocking reader of Avro; [`Iterator`] of [`Chunk`].
/// Single threaded, blocking reader of Avro; [`Iterator`] of [`RecordBatch`].
pub struct Reader<R: Read> {
iter: BlockStreamingIterator<R>,
avro_fields: Vec<AvroField>,
Expand Down Expand Up @@ -53,7 +53,7 @@ impl<R: Read> Reader<R> {
}

impl<R: Read> Iterator for Reader<R> {
type Item = PolarsResult<Chunk<Box<dyn Array>>>;
type Item = PolarsResult<RecordBatch<Box<dyn Array>>>;

fn next(&mut self) -> Option<Self::Item> {
let fields = &self.fields[..];
Expand Down
14 changes: 7 additions & 7 deletions crates/polars-arrow/src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ use super::ipc::read::Dictionaries;
pub use super::ipc::write::default_ipc_fields;
use super::ipc::{IpcField, IpcSchema};
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::*;
pub use crate::io::ipc::write::common::WriteOptions;
use crate::io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData};
use crate::io::ipc::{read, write};
use crate::record_batch::RecordBatch;

/// Serializes [`Chunk`] to a vector of [`FlightData`] representing the serialized dictionaries
/// Serializes [`RecordBatch`] to a vector of [`FlightData`] representing the serialized dictionaries
/// and a [`FlightData`] representing the batch.
/// # Errors
/// This function errors iff `fields` is not consistent with `columns`
pub fn serialize_batch(
chunk: &Chunk<Box<dyn Array>>,
chunk: &RecordBatch<Box<dyn Array>>,
fields: &[IpcField],
options: &WriteOptions,
) -> PolarsResult<(Vec<FlightData>, FlightData)> {
Expand Down Expand Up @@ -110,13 +110,13 @@ pub fn deserialize_schemas(bytes: &[u8]) -> PolarsResult<(ArrowSchema, IpcSchema
read::deserialize_schema(bytes)
}

/// Deserializes [`FlightData`] representing a record batch message to [`Chunk`].
/// Deserializes [`FlightData`] representing a record batch message to [`RecordBatch`].
pub fn deserialize_batch(
data: &FlightData,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &read::Dictionaries,
) -> PolarsResult<Chunk<Box<dyn Array>>> {
) -> PolarsResult<RecordBatch<Box<dyn Array>>> {
// check that the data_header is a record batch message
let message = arrow_format::ipc::MessageRef::read_as_root(&data.data_header)
.map_err(|_err| polars_err!(oos = "Unable to get root as message: {err:?}"))?;
Expand Down Expand Up @@ -178,14 +178,14 @@ pub fn deserialize_dictionary(
Ok(())
}

/// Deserializes [`FlightData`] into either a [`Chunk`] (when the message is a record batch)
/// Deserializes [`FlightData`] into either a [`RecordBatch`] (when the message is a record batch)
/// or by upserting into `dictionaries` (when the message is a dictionary)
pub fn deserialize_message(
data: &FlightData,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> PolarsResult<Option<Chunk<Box<dyn Array>>>> {
) -> PolarsResult<Option<RecordBatch<Box<dyn Array>>>> {
let FlightData {
data_header,
data_body,
Expand Down
14 changes: 7 additions & 7 deletions crates/polars-arrow/src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use polars_error::{polars_bail, polars_err, PolarsResult};
use super::deserialize::{read, skip};
use super::Dictionaries;
use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{ArrowDataType, Field};
use crate::io::ipc::read::OutOfSpecKind;
use crate::io::ipc::{IpcField, IpcSchema};
use crate::record_batch::RecordBatch;

#[derive(Debug, Eq, PartialEq, Hash)]
enum ProjectionResult<A> {
Expand Down Expand Up @@ -70,7 +70,7 @@ impl<'a, A, I: Iterator<Item = A>> Iterator for ProjectionIter<'a, A, I> {
}
}

/// Returns a [`Chunk`] from a reader.
/// Returns a [`RecordBatch`] from a reader.
/// # Panic
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
#[allow(clippy::too_many_arguments)]
Expand All @@ -86,7 +86,7 @@ pub fn read_record_batch<R: Read + Seek>(
block_offset: u64,
file_size: u64,
scratch: &mut Vec<u8>,
) -> PolarsResult<Chunk<Box<dyn Array>>> {
) -> PolarsResult<RecordBatch<Box<dyn Array>>> {
assert_eq!(fields.len(), ipc_schema.fields.len());
let buffers = batch
.buffers()
Expand Down Expand Up @@ -185,7 +185,7 @@ pub fn read_record_batch<R: Read + Seek>(
})
.collect::<PolarsResult<Vec<_>>>()?
};
Chunk::try_new(columns)
RecordBatch::try_new(columns)
}

fn find_first_dict_field_d<'a>(
Expand Down Expand Up @@ -338,17 +338,17 @@ pub fn prepare_projection(
}

pub fn apply_projection(
chunk: Chunk<Box<dyn Array>>,
chunk: RecordBatch<Box<dyn Array>>,
map: &AHashMap<usize, usize>,
) -> Chunk<Box<dyn Array>> {
) -> RecordBatch<Box<dyn Array>> {
// re-order according to projection
let arrays = chunk.into_arrays();
let mut new_arrays = arrays.clone();

map.iter()
.for_each(|(old, new)| new_arrays[*new] = arrays[*old].clone());

Chunk::new(new_arrays)
RecordBatch::new(new_arrays)
}

#[cfg(test)]
Expand Down
44 changes: 8 additions & 36 deletions crates/polars-arrow/src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use super::common::*;
use super::schema::fb_to_schema;
use super::{Dictionaries, OutOfSpecKind};
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::ArrowSchemaRef;
use crate::io::ipc::IpcSchema;
use crate::record_batch::RecordBatch;

/// Metadata of an Arrow IPC file, written in the footer of the file.
#[derive(Debug, Clone)]
Expand All @@ -36,32 +36,6 @@ pub struct FileMetadata {
pub size: u64,
}

fn read_dictionary_message<R: Read + Seek>(
reader: &mut R,
offset: u64,
data: &mut Vec<u8>,
) -> PolarsResult<()> {
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(offset))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
};
let message_length = i32::from_le_bytes(message_size);

let message_length: usize = message_length
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;

data.clear();
data.try_reserve(message_length)?;
reader
.by_ref()
.take(message_length as u64)
.read_to_end(data)?;

Ok(())
}
/// Read the row count by summing the length of the of the record batches
pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
let mut message_scratch: Vec<u8> = Default::default();
Expand All @@ -72,7 +46,7 @@ pub fn get_row_count<R: Read + Seek>(reader: &mut R) -> PolarsResult<i64> {
blocks
.into_iter()
.map(|block| {
let message = get_message_from_block(reader, block, &mut message_scratch)?;
let message = get_message_from_block(reader, &block, &mut message_scratch)?;
let record_batch = get_record_batch(message)?;
record_batch.length().map_err(|e| e.into())
})
Expand Down Expand Up @@ -100,20 +74,18 @@ fn read_dictionary_block<R: Read + Seek>(
message_scratch: &mut Vec<u8>,
dictionary_scratch: &mut Vec<u8>,
) -> PolarsResult<()> {
let message = get_message_from_block(reader, block, message_scratch)?;
let batch = get_dictionary_batch(&message)?;

let offset: u64 = block
.offset
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;

let length: u64 = block
.meta_data_length
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?;
read_dictionary_message(reader, offset, message_scratch)?;

let message = arrow_format::ipc::MessageRef::read_as_root(message_scratch.as_ref())
.map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?;

let batch = get_dictionary_batch(&message)?;

read_dictionary(
batch,
Expand Down Expand Up @@ -299,7 +271,7 @@ fn get_message_from_block_offset<'a, R: Read + Seek>(

fn get_message_from_block<'a, R: Read + Seek>(
reader: &mut R,
block: arrow_format::ipc::Block,
block: &arrow_format::ipc::Block,
message_scratch: &'a mut Vec<u8>,
) -> PolarsResult<arrow_format::ipc::MessageRef<'a>> {
let offset: u64 = block
Expand Down Expand Up @@ -327,7 +299,7 @@ pub fn read_batch<R: Read + Seek>(
index: usize,
message_scratch: &mut Vec<u8>,
data_scratch: &mut Vec<u8>,
) -> PolarsResult<Chunk<Box<dyn Array>>> {
) -> PolarsResult<RecordBatch<Box<dyn Array>>> {
let block = metadata.blocks[index];

let offset: u64 = block
Expand Down
10 changes: 5 additions & 5 deletions crates/polars-arrow/src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ use super::common::{apply_projection, prepare_projection, read_dictionary, read_
use super::file::{deserialize_footer, get_record_batch};
use super::{Dictionaries, FileMetadata, OutOfSpecKind};
use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{ArrowSchema, Field};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC_V2, CONTINUATION_MARKER};
use crate::record_batch::RecordBatch;

/// Async reader for Arrow IPC files
pub struct FileStream<'a> {
stream: BoxStream<'a, PolarsResult<Chunk<Box<dyn Array>>>>,
stream: BoxStream<'a, PolarsResult<RecordBatch<Box<dyn Array>>>>,
schema: Option<ArrowSchema>,
metadata: FileMetadata,
}
Expand Down Expand Up @@ -72,7 +72,7 @@ impl<'a> FileStream<'a> {
metadata: FileMetadata,
projection: Option<(Vec<usize>, AHashMap<usize, usize>)>,
limit: Option<usize>,
) -> BoxStream<'a, PolarsResult<Chunk<Box<dyn Array>>>>
) -> BoxStream<'a, PolarsResult<RecordBatch<Box<dyn Array>>>>
where
R: AsyncRead + AsyncSeek + Unpin + Send + 'a,
{
Expand Down Expand Up @@ -113,7 +113,7 @@ impl<'a> FileStream<'a> {
}

impl<'a> Stream for FileStream<'a> {
type Item = PolarsResult<Chunk<Box<dyn Array>>>;
type Item = PolarsResult<RecordBatch<Box<dyn Array>>>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn read_batch<R>(
meta_buffer: &mut Vec<u8>,
block_buffer: &mut Vec<u8>,
scratch: &mut Vec<u8>,
) -> PolarsResult<Chunk<Box<dyn Array>>>
) -> PolarsResult<RecordBatch<Box<dyn Array>>>
where
R: AsyncRead + AsyncSeek + Unpin,
{
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use polars_error::PolarsResult;
use super::common::*;
use super::{read_batch, read_file_dictionaries, Dictionaries, FileMetadata};
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::ArrowSchema;
use crate::record_batch::RecordBatch;

/// An iterator of [`Chunk`]s from an Arrow IPC file.
/// An iterator of [`RecordBatch`]s from an Arrow IPC file.
pub struct FileReader<R: Read + Seek> {
reader: R,
metadata: FileMetadata,
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<R: Read + Seek> FileReader<R> {
}

impl<R: Read + Seek> Iterator for FileReader<R> {
type Item = PolarsResult<Chunk<Box<dyn Array>>>;
type Item = PolarsResult<RecordBatch<Box<dyn Array>>>;

fn next(&mut self) -> Option<Self::Item> {
// get current block
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use super::common::*;
use super::schema::deserialize_stream_metadata;
use super::{Dictionaries, OutOfSpecKind};
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::ArrowSchema;
use crate::io::ipc::IpcSchema;
use crate::record_batch::RecordBatch;

/// Metadata of an Arrow IPC stream, written at the start of the stream
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -67,7 +67,7 @@ pub enum StreamState {
/// A live stream without data
Waiting,
/// Next item in the stream
Some(Chunk<Box<dyn Array>>),
Some(RecordBatch<Box<dyn Array>>),
}

impl StreamState {
Expand All @@ -76,7 +76,7 @@ impl StreamState {
/// # Panics
///
/// If the `StreamState` was `Waiting`.
pub fn unwrap(self) -> Chunk<Box<dyn Array>> {
pub fn unwrap(self) -> RecordBatch<Box<dyn Array>> {
if let StreamState::Some(batch) = self {
batch
} else {
Expand Down
Loading

0 comments on commit 2cb081d

Please sign in to comment.