Skip to content

Commit

Permalink
Create prepared dir in session
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanrfrazier committed Oct 3, 2023
1 parent 584d31e commit c3cb4d7
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 21 additions & 14 deletions crates/sparrow-runtime/src/prepare/preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,16 @@ impl Preparer {
/// - This adds or casts columns as needed.
/// - This produces multiple parts if the input file is large.
/// - This produces metadata files alongside data files.
/// Parameters:
/// - `to_prepare`: The path to the parquet file to prepare.
/// - `prepared_dir`: The directory to write the prepared files to.
pub async fn prepare_parquet(
&self,
path: &std::path::Path,
to_prepare: &std::path::Path,
prepared_dir: &std::path::Path,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
// TODO: Support Slicing

let output_path_prefix = self.prepared_output_prefix();
let output_path_prefix = self.prepared_output_prefix(prepared_dir);
let output_url = ObjectStoreUrl::from_str(&output_path_prefix)
.change_context_lazy(|| Error::InvalidUrl(output_path_prefix))?;

Expand All @@ -110,7 +113,7 @@ impl Preparer {

let source_data = SourceData {
source: Some(
SourceData::try_from_local(path)
SourceData::try_from_local(to_prepare)
.into_report()
.change_context(Error::Internal)?,
),
Expand Down Expand Up @@ -180,17 +183,21 @@ impl Preparer {
self.time_multiplier.as_ref(),
)
}
// Prepared files are stored in the following format:
// file:///<temp_dir>/<KASKADA_PATH>/tables/<table_uuid>/prepared/<uuid>/part-<n>.parquet
pub fn prepared_output_prefix(&self) -> String {

/// Creates the local output prefix to use for prepared files.
///
/// e.g. for osx: file:///<dir>/<KASKADA_PATH>/tables/<table_uuid>/prepared/<uuid>
pub fn prepared_output_prefix(&self, dir: &std::path::Path) -> String {
let uuid = Uuid::new_v4();
let temp_dir = tempfile::tempdir().expect("failed to create temp dir");
format!(
"file:///{}/{}/tables/{}/prepare/{uuid}/",
temp_dir.path().display(),
KASKADA_PATH,
self.table_config.uuid
)

// Construct the path using PathBuf to handle platform-specific path separators.
let mut buf = std::path::PathBuf::new();
buf.push(dir);
buf.push(KASKADA_PATH);
buf.push("tables");
buf.push("prepare");
buf.push(uuid.to_string());
buf.to_string_lossy().to_string()
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/sparrow-session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ sparrow-runtime = { path = "../sparrow-runtime" }
sparrow-syntax = { path = "../sparrow-syntax" }
sparrow-instructions = { path = "../sparrow-instructions" }
static_init.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
uuid.workspace = true
Expand Down
10 changes: 9 additions & 1 deletion crates/sparrow-session/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,24 @@ pub struct Session {
udfs: HashMap<Uuid, Arc<dyn Udf>>,
object_store_registry: Arc<ObjectStoreRegistry>,
rt: tokio::runtime::Runtime,
/// Temporary directory to hold prepared files for this session.
///
/// Creating in the session ensures it won't be dropped until the
/// session is closed.
prepared_dir: tempfile::TempDir,
}

impl Default for Session {
fn default() -> Self {
let prepared_dir = tempfile::tempdir().expect("failed to create temp dir");
Self {
data_context: Default::default(),
dfg: Default::default(),
key_hash_inverse: Default::default(),
udfs: Default::default(),
object_store_registry: Default::default(),
rt: tokio::runtime::Runtime::new().expect("tokio runtime"),
prepared_dir,
}
}
}
Expand Down Expand Up @@ -125,7 +132,7 @@ impl Session {
grouping_name: Option<&str>,
time_unit: Option<&str>,
source: Option<&str>,
) -> error_stack::Result<Table, Error> {
) -> error_stack::Result<Table<'_>, Error> {
let uuid = Uuid::new_v4();
let schema_proto = sparrow_api::kaskada::v1alpha::Schema::try_from(schema.as_ref())
.into_report()
Expand Down Expand Up @@ -188,6 +195,7 @@ impl Session {
time_unit,
self.object_store_registry.clone(),
source,
self.prepared_dir.path(),
)
}

Expand Down
9 changes: 6 additions & 3 deletions crates/sparrow-session/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use sparrow_runtime::{key_hash_inverse::ThreadSafeKeyHashInverse, stores::Object

use crate::{Error, Expr};

pub struct Table {
pub struct Table<'a> {
pub expr: Expr,
preparer: Preparer,
key_column: usize,
key_hash_inverse: Arc<ThreadSafeKeyHashInverse>,
source: Source,
registry: Arc<ObjectStoreRegistry>,
prepared_dir: &'a std::path::Path,
}

#[derive(Debug)]
Expand All @@ -32,7 +33,7 @@ enum Source {
Parquet(Arc<FileSets>),
}

impl Table {
impl<'a> Table<'a> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
table_info: &mut TableInfo,
Expand All @@ -43,6 +44,7 @@ impl Table {
time_unit: Option<&str>,
object_stores: Arc<ObjectStoreRegistry>,
source: Option<&str>,
prepared_dir: &'a std::path::Path,
) -> error_stack::Result<Self, Error> {
let prepared_fields: Fields = KEY_FIELDS
.iter()
Expand Down Expand Up @@ -99,6 +101,7 @@ impl Table {
key_column: key_column + KEY_FIELDS.len(),
source,
registry: object_stores,
prepared_dir,
})
}

Expand Down Expand Up @@ -145,7 +148,7 @@ impl Table {

let prepared = self
.preparer
.prepare_parquet(path)
.prepare_parquet(path, self.prepared_dir)
.await
.change_context(Error::Prepare)?;

Expand Down

0 comments on commit c3cb4d7

Please sign in to comment.