Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Prepare directly to object stores #475

Merged
merged 7 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/sparrow-main/src/serve/preparation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn prepare_data(
temp_file.path(),
)
.await?;
let (_prepared_metadata, prepared_files) = prepare_file(
let prepared_files = prepare_file(
&object_store_registry,
&source_data,
&prepare_request.output_path_prefix,
Expand Down
36 changes: 36 additions & 0 deletions crates/sparrow-runtime/src/metadata/prepared_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::path::Path;
use std::sync::Arc;

use arrow::array::TimestampNanosecondArray;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::ValueStatistics;
use sparrow_api::kaskada::v1alpha::PreparedFile;
use sparrow_arrow::downcast::downcast_primitive_array;
use sparrow_core::TableSchema;

use crate::metadata::file_from_path;
Expand Down Expand Up @@ -67,6 +70,39 @@ fn get_time_statistics(
}

impl PreparedMetadata {
pub fn try_from_data(
data_path: String,
data: &RecordBatch,
metadata_path: String,
) -> anyhow::Result<Self> {
let prepared_schema = data.schema();

anyhow::ensure!(
prepared_schema.field(0).name() == "_time",
"First column of prepared files must be '_time'"
);

// Compute the time statistics directly from the data.
//
// TODO: We could instead just get this from the parquet metadata.
let time = data.column(0);
let time: &TimestampNanosecondArray = downcast_primitive_array(time.as_ref())?;

let num_rows = data.num_rows() as i64;
anyhow::ensure!(num_rows > 0, "Data should be non-empty");
let min_time = *time.values().iter().min().expect("non-empty");
let max_time = *time.values().iter().max().expect("non-empty");
bjchambers marked this conversation as resolved.
Show resolved Hide resolved

Self::try_from_prepared_schema(
data_path,
prepared_schema,
min_time,
max_time,
num_rows,
metadata_path,
)
}

/// Create a `PreparedMetadata` from the path to a parquet file.
pub fn try_from_local_parquet_path(
parquet_path: &Path,
Expand Down
251 changes: 65 additions & 186 deletions crates/sparrow-runtime/src/prepare.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use std::collections::hash_map::DefaultHasher;
use std::fmt;
use std::fs::File;
use std::hash::{Hash, Hasher};
use std::io::{BufReader, Cursor};
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::{fmt, path};

use arrow::record_batch::RecordBatch;
use error_stack::{IntoReport, IntoReportCompat, ResultExt};
use futures::stream::BoxStream;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;

use serde_yaml;
use sha2::Digest;
use sparrow_api::kaskada::v1alpha::{
slice_plan, source_data, PreparedFile, PulsarSubscription, SourceData, TableConfig,
Expand All @@ -30,9 +29,7 @@ mod slice_preparer;
pub use error::*;
pub(crate) use prepare_metadata::*;
use sparrow_api::kaskada::v1alpha::slice_plan::Slice;
use tracing::Instrument;

use crate::stores::object_store_url::ObjectStoreKey;
use crate::stores::{ObjectStoreRegistry, ObjectStoreUrl};
use crate::{streams, PreparedMetadata, RawMetadata};

Expand Down Expand Up @@ -140,192 +137,92 @@ impl<'a> fmt::Display for SourceDataWrapper<'a> {

/// Prepare the given file and return the list of prepared files.
pub async fn prepare_file(
object_store_registry: &ObjectStoreRegistry,
object_stores: &ObjectStoreRegistry,
source_data: &SourceData,
output_path_prefix: &str,
output_file_prefix: &str,
table_config: &TableConfig,
slice: &Option<slice_plan::Slice>,
) -> error_stack::Result<(Vec<PreparedMetadata>, Vec<PreparedFile>), Error> {
) -> error_stack::Result<Vec<PreparedFile>, Error> {
let output_url = ObjectStoreUrl::from_str(output_path_prefix)
.change_context_lazy(|| Error::InvalidUrl(output_path_prefix.to_owned()))?;
let output_key = output_url
.key()
.change_context_lazy(|| Error::InvalidUrl(format!("{}", output_url)))?;

let temp_dir = tempfile::tempdir()
.into_report()
let object_store = object_stores
.object_store(output_url.key().change_context(Error::Internal)?)
.change_context(Error::Internal)?;
let temp_dir = temp_dir.path().to_str().ok_or(Error::Internal)?;

let path = format!(
"/{}",
output_url
.path()
.change_context_lazy(|| Error::InvalidUrl(format!("{}", output_url)))?
);

// If the output path is to a local destination, we'll use that. If it's to
// a remote destination, we'll first write to a local temp file, then upload
// that to the remote destination.
let local_output_prefix = match output_key {
ObjectStoreKey::Local => path::Path::new(&path),
_ => path::Path::new(temp_dir),
};
let mut prepare_stream = prepared_batches(source_data, table_config, slice)
.await?
.enumerate();

let prepare_stream = prepared_batches(source_data, table_config, slice).await?;
let batch_count = std::sync::atomic::AtomicUsize::new(0);

let write_results = |output_ordinal: usize, records: RecordBatch, metadata: RecordBatch| {
// Defines the local path to the result file
let local_result_path =
local_output_prefix.join(format!("{output_file_prefix}-{output_ordinal}.parquet"));
let local_result_file = create_file(&local_result_path)?;

// Defines the local path to the metadata file
let local_metadata_output = local_output_prefix.join(format!(
"{output_file_prefix}-{output_ordinal}-metadata.parquet"
));
let local_metadata_file = create_file(&local_metadata_output)?;

// Defines the local path to the metadata yaml file
let metadata_yaml_output = local_output_prefix.join(format!(
"{output_file_prefix}-{output_ordinal}-metadata.yaml"
));
let metadata_yaml_output_file = create_file(&metadata_yaml_output)?;

// Write batches to the local output files
write_batch(local_result_file, records).change_context(Error::WriteParquetData)?;
write_batch(local_metadata_file, metadata).change_context(Error::WriteMetadata)?;

let prepared_metadata = PreparedMetadata::try_from_local_parquet_path(
&local_result_path,
&local_metadata_output,
)
.into_report()
.change_context(Error::DetermineMetadata)?;
let mut prepared_files = Vec::new();
let mut uploads = FuturesUnordered::new();
while let Some((n, next)) = prepare_stream.next().await {
let (data, metadata) = next?;

let prepared_file: PreparedFile = prepared_metadata
.to_owned()
.try_into()
let data_url = output_url
.join(&format!("{output_file_prefix}-{n}.parquet"))
.change_context(Error::Internal)?;

tracing::info!("Prepared batch {output_ordinal} with metadata {prepared_metadata:?}");

serde_yaml::to_writer(metadata_yaml_output_file, &prepared_file)
.into_report()
let metadata_url = output_url
.join(&format!("{output_file_prefix}-{n}-metadata.parquet"))
.change_context(Error::Internal)?;
tracing::info!("Wrote metadata yaml to {:?}", metadata_yaml_output);

Ok((prepared_metadata, prepared_file))
};

let results = prepare_stream.filter_map(|r| {
let n = batch_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
async move {
match r {
Ok((records, metadata)) => {
tracing::debug!("Prepared batch {n} has {} rows", records.num_rows());

if records.num_rows() == 0 {
return None;
}
Some(write_results(n, records, metadata))
}
Err(e) => Some(Err(e.change_context(Error::Internal))),
}
}
});

// unzip the stream of (prepared_file, prepared_metadata) into two vectors
let (pm, pf): (Vec<_>, Vec<_>) = results.try_collect::<Vec<_>>().await?.into_iter().unzip();
match output_key {
ObjectStoreKey::Local | ObjectStoreKey::Memory => {
// Prepared files are stored locally
Ok((pm, pf))
}
_ => {
// Upload the local prepared files to the remote destination
let upload_results = upload_prepared_files(
object_store_registry,
&pm,
output_path_prefix,
output_file_prefix,
)
.await?;
Ok((pm, upload_results))
}
}
}

async fn upload_prepared_files(
object_store_registry: &ObjectStoreRegistry,
prepared_metadata: &[PreparedMetadata],
output_path_prefix: &str,
output_file_prefix: &str,
) -> error_stack::Result<Vec<PreparedFile>, Error> {
let span = tracing::info_span!(
"Uploading prepared files to object store",
?output_path_prefix
);
let _enter = span.enter();

let mut parquet_uploads: Vec<_> = Vec::new();
let mut prepared_files: Vec<_> = Vec::new();
{
for (batch_count, prepared_metadata) in prepared_metadata.iter().enumerate() {
tracing::debug!(
"Prepared batch {batch_count} has {} rows",
prepared_metadata.num_rows
);

tracing::debug!(
"Prepared batch {batch_count} with metadata {:?}",
prepared_metadata
);

let prepared_url =
format!("{output_path_prefix}/{output_file_prefix}-{batch_count}.parquet");
let metadata_prepared_url =
format!("{output_path_prefix}/{output_file_prefix}-{batch_count}-metadata.parquet");

let prepare_object_store_url = ObjectStoreUrl::from_str(prepared_url.as_str())
.change_context_lazy(|| Error::InvalidUrl(prepared_url.as_str().to_string()))?;
let metadata_object_store_url =
ObjectStoreUrl::from_str(metadata_prepared_url.as_str()).change_context_lazy(
|| Error::InvalidUrl(metadata_prepared_url.as_str().to_string()),
)?;

parquet_uploads.push(
object_store_registry
.upload(prepare_object_store_url, Path::new(&prepared_metadata.path)),
);

parquet_uploads.push(object_store_registry.upload(
metadata_object_store_url,
Path::new(&prepared_metadata.metadata_path),
));

let result_prepared_file: PreparedFile = prepared_metadata
.to_owned()
.with_path(prepared_url)
.with_metadata_path(metadata_prepared_url)
// Create the prepared file via PreparedMetadata.
// TODO: We could probably do this directly, eliminating the PreparedMetadata struct.
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
let prepared_file: PreparedFile =
PreparedMetadata::try_from_data(data_url.to_string(), &data, metadata_url.to_string())
.into_report()
.change_context(Error::Internal)?
.try_into()
.change_context(Error::Internal)?;
prepared_files.push(result_prepared_file)
}
prepared_files.push(prepared_file);

uploads.push(write_parquet(data, data_url, object_store.clone()));
uploads.push(write_parquet(metadata, metadata_url, object_store.clone()));
}

// TODO: We could (instead) use a loop and select, which would allow us
// to fail early if anything failed. But it is more book-keeping.
let parquet_uploads = futures::future::join_all(parquet_uploads).in_current_span();
for result in parquet_uploads.await {
result.change_context(Error::UploadResult)?;
// Wait for the uploads.
while let Some(upload) = uploads.try_next().await? {
tracing::info!("Finished uploading {upload}");
}

Ok(prepared_files)
}

async fn write_parquet(
batch: RecordBatch,
url: ObjectStoreUrl,
object_store: Arc<dyn ObjectStore>,
) -> error_stack::Result<ObjectStoreUrl, Error> {
let path = url
.path()
.change_context_lazy(|| Error::Write(url.url().clone()))?;
let (upload_id, writer) = object_store
.put_multipart(&path)
.await
.into_report()
.change_context_lazy(|| Error::Write(url.url().clone()))?;
tracing::info!("Multipart upload to {url} started with ID {upload_id}");

let mut writer = parquet::arrow::AsyncArrowWriter::try_new(writer, batch.schema(), 1024, None)
bjchambers marked this conversation as resolved.
Show resolved Hide resolved
.into_report()
.change_context_lazy(|| Error::Write(url.url().clone()))?;

writer
.write(&batch)
.await
.into_report()
.change_context_lazy(|| Error::Write(url.url().clone()))?;

writer
.close()
.await
.into_report()
.change_context_lazy(|| Error::Write(url.url().clone()))?;

Ok(url)
}

fn open_file(path: impl AsRef<Path>) -> error_stack::Result<File, Error> {
fn inner(path: &Path) -> error_stack::Result<File, Error> {
File::open(path)
Expand All @@ -337,24 +234,6 @@ fn open_file(path: impl AsRef<Path>) -> error_stack::Result<File, Error> {
inner(path.as_ref())
}

fn create_file(path: &Path) -> error_stack::Result<File, Error> {
File::create(path)
.into_report()
.change_context_lazy(|| Error::OpenFile {
path: path.to_owned(),
})
}

fn write_batch(
file: File,
record_batch: RecordBatch,
) -> error_stack::Result<(), parquet::errors::ParquetError> {
let mut writer = ArrowWriter::try_new(file, record_batch.schema(), None)?;
writer.write(&record_batch)?;
writer.close()?;
Ok(())
}

fn get_prepare_hash(source_data: &SourceData) -> error_stack::Result<u64, Error> {
let source = source_data.source.as_ref().ok_or(Error::Internal)?;
let hex_encoding = match source {
Expand Down
8 changes: 4 additions & 4 deletions crates/sparrow-runtime/src/prepare/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::path::PathBuf;

use url::Url;

#[derive(derive_more::Display, Debug)]
pub enum Error {
#[display(fmt = "unable to open '{path:?}'")]
Expand Down Expand Up @@ -28,10 +30,8 @@ pub enum Error {
DetermineMetadata,
#[display(fmt = "invalid schema provided")]
ReadSchema,
#[display(fmt = "failed to write Parquet file")]
WriteParquetData,
#[display(fmt = "failed to write metadata file")]
WriteMetadata,
#[display(fmt = "failed to write to '{_0}")]
Write(Url),
#[display(fmt = "prepare request missing '{_0}'")]
MissingField(&'static str),
#[display(
Expand Down
Loading
Loading