Skip to content

Commit

Permalink
feat: experimental objectstore wrapper with IO runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 17, 2024
1 parent eea53f4 commit 5726f77
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 29 deletions.
46 changes: 28 additions & 18 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use deltalake_core::storage::object_store::{
ObjectStore, PutOptions, PutResult, Result as ObjectStoreResult,
};
use deltalake_core::storage::{
limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
deltars_store_handler, limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef,
StorageOptions,
};
use deltalake_core::{DeltaResult, ObjectStoreError, Path};
use futures::stream::BoxStream;
Expand Down Expand Up @@ -79,25 +80,34 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
}),
)?;

let store = limit_store_handler(inner, &options);

// If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
Ok((store, prefix))
} else {
let s3_options = S3StorageOptions::from_map(&storage_options.0)?;
let store = deltars_store_handler(
aws_storage_handler(limit_store_handler(inner, &options), &options)?,
&storage_options,
);

let store = S3StorageBackend::try_new(
store,
Some("dynamodb") == s3_options.locking_provider.as_deref()
|| s3_options.allow_unsafe_rename,
)?;
Ok((store, prefix))
}
}

Ok((Arc::new(store), prefix))
}
fn aws_storage_handler(
store: ObjectStoreRef,
options: &StorageOptions,
) -> DeltaResult<ObjectStoreRef> {
// If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
Ok(store)
} else {
let s3_options = S3StorageOptions::from_map(&options.0)?;

let store = S3StorageBackend::try_new(
store,
Some("dynamodb") == s3_options.locking_provider.as_deref()
|| s3_options.allow_unsafe_rename,
)?;
Ok(Arc::new(store))
}
}

Expand Down
9 changes: 6 additions & 3 deletions crates/azure/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::sync::Arc;

use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{
factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef,
StorageOptions,
deltars_store_handler, factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory,
ObjectStoreRef, StorageOptions,
};
use deltalake_core::{DeltaResult, Path};
use object_store::azure::AzureConfigKey;
Expand Down Expand Up @@ -44,7 +44,10 @@ impl ObjectStoreFactory for AzureFactory {
) -> DeltaResult<(ObjectStoreRef, Path)> {
let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?;
let (inner, prefix) = parse_url_opts(url, config)?;
let store = limit_store_handler(url_prefix_handler(inner, prefix.clone()), options);
let store = deltars_store_handler(
limit_store_handler(url_prefix_handler(inner, prefix.clone()), options),
options,
);
Ok((store, prefix))
}
}
Expand Down
11 changes: 11 additions & 0 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
//! Abstractions and implementations for writing data to delta tables
use std::collections::HashMap;
use std::sync::{Arc, OnceLock};

use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
use bytes::Bytes;
use delta_kernel::expressions::Scalar;
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use indexmap::IndexMap;
use object_store::{path::Path, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
use tokio::runtime::Runtime;
use tracing::debug;

use crate::crate_version;
Expand Down Expand Up @@ -121,6 +125,12 @@ impl WriterConfig {
}
}

/// static write runtime
fn write_rt() -> &'static Runtime {
static WRITE_RT: OnceLock<Runtime> = OnceLock::new();
WRITE_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime for writing."))
}

#[derive(Debug)]
/// A parquet writer implementation tailored to the needs of writing data to a delta table.
pub struct DeltaWriter {
Expand Down Expand Up @@ -369,6 +379,7 @@ impl PartitionWriter {

// write file to object store
self.object_store.put(&path, buffer.into()).await?;

self.files_written.push(
create_add(
&self.config.partition_values,
Expand Down
Loading

0 comments on commit 5726f77

Please sign in to comment.