Skip to content

Commit

Permalink
feat: Use object_store to write files (#492)
Browse files Browse the repository at this point in the history
Also introduces paged file output.

This does a hacky CSV write by buffering a batch at a time.

This is related to #486.
This is part of #465.
This is part of #466.
  • Loading branch information
bjchambers authored Jul 12, 2023
2 parents 119c15b + c719314 commit b0c86bf
Show file tree
Hide file tree
Showing 19 changed files with 313 additions and 309 deletions.
4 changes: 3 additions & 1 deletion crates/sparrow-main/tests/e2e/decoration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ async fn test_sum_i64_final_at_time() {
#[tokio::test]
async fn test_sum_i64_all_filtered_final_at_time() {
let datetime = NaiveDateTime::new(date_for_test(1970, 12, 20), time_for_test(0, 39, 58));
insta::assert_snapshot!(QueryFixture::new("{ sum_field: sum(Numbers.m) }").with_final_results_at_time(datetime).run_to_csv(&i64_data_fixture().await).await.unwrap(), @"");
insta::assert_snapshot!(QueryFixture::new("{ sum_field: sum(Numbers.m) }").with_final_results_at_time(datetime).run_to_csv(&i64_data_fixture().await).await.unwrap(), @r###"
_time,_subsort,_key_hash,_key,sum_field
"###);
}

async fn shift_data_fixture_at_time() -> DataFixture {
Expand Down
4 changes: 2 additions & 2 deletions crates/sparrow-runtime/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub async fn execute(
None
};

let object_stores = ObjectStoreRegistry::default();
let object_stores = Arc::new(ObjectStoreRegistry::default());

let primary_grouping_key_type = plan
.primary_grouping_key_type
Expand Down Expand Up @@ -300,7 +300,7 @@ pub async fn materialize(
.into_report()
.change_context(Error::internal_msg("get primary grouping ID"))?;

let object_stores = ObjectStoreRegistry::default();
let object_stores = Arc::new(ObjectStoreRegistry::default());
key_hash_inverse
.add_from_data_context(&data_context, primary_group_id, &object_stores)
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/execute/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use crate::Batch;
pub(crate) struct OperationContext {
pub plan: ComputePlan,
pub plan_hash: PlanHash,
pub object_stores: ObjectStoreRegistry,
pub object_stores: Arc<ObjectStoreRegistry>,
pub data_context: DataContext,
pub compute_store: Option<Arc<ComputeStore>>,
/// The key hash inverse to produce the output results, if one exists.
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/execute/operation/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ mod tests {
..ComputePlan::default()
},
plan_hash: PlanHash::default(),
object_stores: ObjectStoreRegistry::default(),
object_stores: Arc::new(ObjectStoreRegistry::default()),
data_context,
compute_store: None,
key_hash_inverse,
Expand Down
4 changes: 2 additions & 2 deletions crates/sparrow-runtime/src/execute/operation/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub(super) async fn run_operation(
..ComputePlan::default()
},
plan_hash: PlanHash::default(),
object_stores: ObjectStoreRegistry::default(),
object_stores: Arc::new(ObjectStoreRegistry::default()),
data_context: DataContext::default(),
compute_store: None,
key_hash_inverse,
Expand Down Expand Up @@ -240,7 +240,7 @@ pub(super) async fn run_operation_json(
..ComputePlan::default()
},
plan_hash: PlanHash::default(),
object_stores: ObjectStoreRegistry::default(),
object_stores: Arc::new(ObjectStoreRegistry::default()),
data_context: DataContext::default(),
compute_store: None,
key_hash_inverse,
Expand Down
24 changes: 12 additions & 12 deletions crates/sparrow-runtime/src/execute/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use crate::execute::operation::OperationContext;
use crate::execute::progress_reporter::ProgressUpdate;
use crate::Batch;

mod csv;
mod object_store;
mod parquet;
mod redis;

pub mod pulsar;
Expand Down Expand Up @@ -49,7 +47,7 @@ pub(super) fn write(
batches: BoxStream<'static, Batch>,
progress_updates_tx: tokio::sync::mpsc::Sender<ProgressUpdate>,
destination: v1alpha::Destination,
) -> error_stack::Result<impl Future<Output = Result<(), Error>>, Error> {
) -> error_stack::Result<impl Future<Output = Result<(), Error>> + 'static, Error> {
let sink_schema = determine_output_schema(context)?;

// Clone things that need to move into the async stream.
Expand Down Expand Up @@ -89,15 +87,17 @@ pub(super) fn write(
.destination
.ok_or(Error::UnspecifiedDestination)?;
match destination {
Destination::ObjectStore(store) => {
Ok(
object_store::write(store, sink_schema, progress_updates_tx, batches)
.change_context(Error::WritingToDestination {
dest_name: "object_store".to_owned(),
})
.boxed(),
)
}
Destination::ObjectStore(destination) => Ok(object_store::write(
context.object_stores.clone(),
destination,
sink_schema,
progress_updates_tx,
batches,
)
.change_context(Error::WritingToDestination {
dest_name: "object_store".to_owned(),
})
.boxed()),
Destination::Redis(redis) => {
Ok(
redis::write(redis, sink_schema, progress_updates_tx, batches)
Expand Down
47 changes: 0 additions & 47 deletions crates/sparrow-runtime/src/execute/output/csv.rs

This file was deleted.

Loading

0 comments on commit b0c86bf

Please sign in to comment.