Skip to content

Commit

Permalink
Implement Table.from_arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron committed Jul 30, 2024
1 parent 1685f9c commit 1fbd537
Showing 1 changed file with 55 additions and 21 deletions.
76 changes: 55 additions & 21 deletions pyo3-arrow/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,9 @@ impl PyTable {
mapping: HashMap<String, AnyArray>,
schema: Option<PySchema>,
metadata: Option<MetadataInput>,
) -> PyResult<Self> {
) -> PyArrowResult<Self> {
let (names, arrays): (Vec<_>, Vec<_>) = mapping.into_iter().unzip();
Self::from_arrays(cls, arrays, Some(names), schema, metadata)
// TODO: Construct record batches from Vec<PyChunkedArray>
// Can I reuse from_pylist here? I.e. this func only unwraps the dict to a list of column anmes and a list of chunked arrays, and then that passes in to from_arrays
// I probably want a helper to rechunk as necessary
// todo!()
}

#[classmethod]
Expand All @@ -164,27 +160,65 @@ impl PyTable {
names: Option<Vec<String>>,
schema: Option<PySchema>,
metadata: Option<MetadataInput>,
) -> PyResult<Self> {
) -> PyArrowResult<Self> {
let columns = arrays
.into_iter()
.map(|array| array.into_chunked_array())
.collect::<PyArrowResult<Vec<_>>>()?;

// let schema = schema.map(|schema| schema.into_inner()).unwrap_or_else(|| {
// let fields = columns
// .iter()
// .zip(names.iter())
// .map(|(array, name)| {
// Field::new(name.clone(), array.field().data_type().clone(), true)
// })
// .collect::<Vec<_>>();
// Arc::new(
// Schema::new(fields)
// .with_metadata(metadata.unwrap_or_default().into_string_hashmap().unwrap()),
// )
// });

todo!()
let schema: SchemaRef = if let Some(schema) = schema {
schema.into_inner()
} else {
let names = names.ok_or(PyValueError::new_err(
"names must be passed if schema is not passed.",
))?;

let fields = columns
.iter()
.zip(names.iter())
.map(|(array, name)| Field::new(name.clone(), array.data_type().clone(), true))
.collect::<Vec<_>>();
Arc::new(
Schema::new(fields)
.with_metadata(metadata.unwrap_or_default().into_string_hashmap().unwrap()),
)
};

if columns.is_empty() {
return Ok(Self::new(vec![], schema));
}

let column_chunk_lengths = columns
.iter()
.map(|column| {
let chunk_lengths = column
.chunks()
.iter()
.map(|chunk| chunk.len())
.collect::<Vec<_>>();
chunk_lengths
})
.collect::<Vec<_>>();
if !column_chunk_lengths.windows(2).all(|w| w[0] == w[1]) {
return Err(
PyValueError::new_err("All columns must have the same chunk lengths").into(),
);
}
let num_batches = column_chunk_lengths[0].len();

let mut batches = vec![];
for batch_idx in 0..num_batches {
let batch = RecordBatch::try_new(
schema.clone(),
columns
.iter()
.map(|column| column.chunks()[batch_idx].clone())
.collect(),
)?;
batches.push(batch);
}

Ok(Self::new(batches, schema))
}

pub fn add_column(
Expand Down

0 comments on commit 1fbd537

Please sign in to comment.