Skip to content

Commit

Permalink
feat: Read directly during compute (#471)
Browse files Browse the repository at this point in the history
This is part of #465 -- specifically, reading directly using
`object_store` during compute.
  • Loading branch information
bjchambers authored Jul 5, 2023
2 parents 8e86e21 + 343606c commit 7858a62
Show file tree
Hide file tree
Showing 21 changed files with 318 additions and 560 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ bigdecimal = { version = "0.3.1", features = ["serde"] }
bincode = "1.3.3"
bit-set = "0.5.3"
bitvec = { version = "1.0.1", features = ["serde"] }
bytes = { version = "1.4.0" }
chrono = "0.4.24"
chronoutil = "0.2.3"
clap = { version = "4.2.0", features = ["derive", "env"] }
Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ aws-sdk-s3.workspace = true
aws-types.workspace = true
bit-set.workspace = true
bitvec.workspace = true
bytes.workspace = true
chrono.workspace = true
clap.workspace = true
data-encoding.workspace = true
Expand Down
288 changes: 0 additions & 288 deletions crates/sparrow-runtime/src/data_manager.rs

This file was deleted.

7 changes: 3 additions & 4 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ use sparrow_compiler::{hash_compute_plan_proto, DataContext};
use sparrow_instructions::ComputeStore;
use sparrow_qfr::kaskada::sparrow::v1alpha::FlightRecordHeader;

use crate::data_manager::DataManager;
use crate::execute::key_hash_inverse::{KeyHashInverse, ThreadSafeKeyHashInverse};
use crate::execute::operation::OperationContext;
use crate::s3::S3Helper;
use crate::stores::ObjectStoreRegistry;
use crate::RuntimeOptions;

mod compute_executor;
mod error;
mod input_prefetch;
pub(crate) mod key_hash_inverse;
pub(crate) mod operation;
pub mod output;
Expand Down Expand Up @@ -203,7 +202,7 @@ pub async fn execute(
let context = OperationContext {
plan,
plan_hash,
data_manager: DataManager::new(s3_helper.clone()),
object_stores: ObjectStoreRegistry::default(),
data_context,
compute_store,
key_hash_inverse,
Expand Down Expand Up @@ -316,7 +315,7 @@ pub async fn materialize(
let context = OperationContext {
plan,
plan_hash,
data_manager: DataManager::new(s3_helper.clone()),
object_stores: ObjectStoreRegistry::default(),
data_context,
compute_store: snapshot_compute_store,
key_hash_inverse,
Expand Down
12 changes: 0 additions & 12 deletions crates/sparrow-runtime/src/execute/compute_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,6 @@ impl ComputeExecutor {
);
}

// Spawn a task to pre-fetch the data files.
//
// This currently tries to eagerly fetch all of the data files ordered by
// min event time, which should correspond to the order they are needed.
// If we want to be able to "page" these files out, we may want to have a
// separate process to
crate::execute::input_prefetch::spawn_prefetch(
&context.data_manager,
&mut spawner,
output_tx,
);

Ok(Self {
compute_store: context.compute_store,
plan_hash: context.plan_hash,
Expand Down
Loading

0 comments on commit 7858a62

Please sign in to comment.