Skip to content

Commit

Permalink
[PERF]: local json reader (#2264)
Browse files Browse the repository at this point in the history
closes #2196
  • Loading branch information
universalmind303 committed May 21, 2024
1 parent ad6640e commit 6ba59f0
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 10 deletions.
16 changes: 14 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions src/daft-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,23 @@ impl Schema {
.map(|f| f.dtype.estimate_size_bytes().unwrap_or(0.))
.sum()
}

/// Returns a new schema with only the specified columns in the new schema
pub fn project<S: AsRef<str>>(self: Arc<Self>, columns: &[S]) -> DaftResult<Schema> {
let new_fields = columns
.iter()
.map(|i| {
let key = i.as_ref();
self.fields.get(key).cloned().ok_or_else(|| {
DaftError::SchemaMismatch(format!(
"Column {} not found in schema: {:?}",
key, self.fields
))
})
})
.collect::<DaftResult<Vec<_>>>()?;
Self::new(new_fields)
}
}

impl Eq for Schema {}
Expand Down
3 changes: 3 additions & 0 deletions src/daft-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ futures = {workspace = true}
indexmap = {workspace = true}
lexical-core = {version = "0.8"}
log = {workspace = true}
memchr = "2.7.2"
memmap2 = "0.9.4"
num-traits = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true, optional = true}
rayon = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true, features = ["raw_value"]}
simd-json = {version = "0.13", features = ["known-key"]}
simdutf8 = "0.1.3"
snafu = {workspace = true}
Expand Down
4 changes: 2 additions & 2 deletions src/daft-json/src/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub(crate) fn deserialize_records<'a, A: Borrow<BorrowedValue<'a>>>(
Ok(results.into_values().map(|mut ma| ma.as_box()).collect())
}

fn allocate_array(f: &Field, length: usize) -> Box<dyn MutableArray> {
pub(crate) fn allocate_array(f: &Field, length: usize) -> Box<dyn MutableArray> {
match f.data_type() {
DataType::Null => Box::new(MutableNullArray::new(DataType::Null, 0)),
DataType::Int8 => Box::new(MutablePrimitiveArray::<i8>::with_capacity(length)),
Expand Down Expand Up @@ -123,7 +123,7 @@ fn allocate_array(f: &Field, length: usize) -> Box<dyn MutableArray> {
}

/// Deserialize `rows` by extending them into the given `target`
fn deserialize_into<'a, A: Borrow<BorrowedValue<'a>>>(
pub(crate) fn deserialize_into<'a, A: Borrow<BorrowedValue<'a>>>(
target: &mut Box<dyn MutableArray>,
rows: &[A],
) {
Expand Down
4 changes: 3 additions & 1 deletion src/daft-json/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![feature(async_closure)]
#![feature(let_chains)]
#![feature(trait_alias)]
#![feature(trait_upcasting)]
use common_error::DaftError;
use futures::stream::TryChunksError;
Expand All @@ -9,6 +8,7 @@ use snafu::Snafu;
mod decoding;
mod deserializer;
mod inference;
pub mod local;

pub mod options;
#[cfg(feature = "python")]
Expand Down Expand Up @@ -45,6 +45,8 @@ pub enum Error {
OneShotRecvError {
source: tokio::sync::oneshot::error::RecvError,
},
#[snafu(display("Error creating rayon threadpool: {}", source))]
RayonThreadPoolError { source: rayon::ThreadPoolBuildError },
}

impl From<Error> for DaftError {
Expand Down
Loading

0 comments on commit 6ba59f0

Please sign in to comment.