diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 4625bb6be9..cb9723f6de 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -9,7 +9,8 @@ use deltalake_core::storage::object_store::{ ObjectStore, PutOptions, PutResult, Result as ObjectStoreResult, }; use deltalake_core::storage::{ - limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions, + deltars_store_handler, limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, + StorageOptions, }; use deltalake_core::{DeltaResult, ObjectStoreError, Path}; use futures::stream::BoxStream; @@ -79,25 +80,34 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { }), )?; - let store = limit_store_handler(inner, &options); - - // If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename. - if options - .0 - .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) - { - Ok((store, prefix)) - } else { - let s3_options = S3StorageOptions::from_map(&storage_options.0)?; + let store = deltars_store_handler( + aws_storage_handler(limit_store_handler(inner, &options), &options)?, + &storage_options, + ); - let store = S3StorageBackend::try_new( - store, - Some("dynamodb") == s3_options.locking_provider.as_deref() - || s3_options.allow_unsafe_rename, - )?; + Ok((store, prefix)) + } +} - Ok((Arc::new(store), prefix)) - } +fn aws_storage_handler( + store: ObjectStoreRef, + options: &StorageOptions, +) -> DeltaResult { + // If the copy-if-not-exists env var is set, we don't need to instantiate a locking client or check for allow-unsafe-rename. + if options + .0 + .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) + { + Ok(store) + } else { + let s3_options = S3StorageOptions::from_map(&options.0)?; + + let store = S3StorageBackend::try_new( + store, + Some("dynamodb") == s3_options.locking_provider.as_deref() + || s3_options.allow_unsafe_rename, + )?; + Ok(Arc::new(store)) } } diff --git a/crates/azure/src/lib.rs b/crates/azure/src/lib.rs index 7782f69f43..fc48116cea 100644 --- a/crates/azure/src/lib.rs +++ b/crates/azure/src/lib.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; use deltalake_core::storage::{ - factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, - StorageOptions, + deltars_store_handler, factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, + ObjectStoreRef, StorageOptions, }; use deltalake_core::{DeltaResult, Path}; use object_store::azure::AzureConfigKey; @@ -44,7 +44,10 @@ impl ObjectStoreFactory for AzureFactory { ) -> DeltaResult<(ObjectStoreRef, Path)> { let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?; let (inner, prefix) = parse_url_opts(url, config)?; - let store = limit_store_handler(url_prefix_handler(inner, prefix.clone()), options); + let store = deltars_store_handler( + limit_store_handler(url_prefix_handler(inner, prefix.clone()), options), + options, + ); Ok((store, prefix)) } } diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index 8fd4273c9f..5b09553975 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -1,16 +1,20 @@ //! Abstractions and implementations for writing data to delta tables use std::collections::HashMap; +use std::sync::{Arc, OnceLock}; use arrow_array::RecordBatch; use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef}; use bytes::Bytes; use delta_kernel::expressions::Scalar; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; use indexmap::IndexMap; use object_store::{path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; +use tokio::runtime::Runtime; use tracing::debug; use crate::crate_version; @@ -121,6 +125,12 @@ impl WriterConfig { } } +/// static write runtime +fn write_rt() -> &'static Runtime { + static WRITE_RT: OnceLock = OnceLock::new(); + WRITE_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime for writing.")) +} + #[derive(Debug)] /// A parquet writer implementation tailored to the needs of writing data to a delta table. pub struct DeltaWriter { @@ -369,6 +379,7 @@ impl PartitionWriter { // write file to object store self.object_store.put(&path, buffer.into()).await?; + self.files_written.push( create_add( &self.config.partition_values, diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index ceac5e1436..05c6901d73 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -2,24 +2,32 @@ use std::collections::HashMap; use std::sync::{Arc, OnceLock}; +use crate::{DeltaResult, DeltaTableError}; use dashmap::DashMap; +use futures::future::BoxFuture; +use futures::FutureExt; +use futures::TryFutureExt; use lazy_static::lazy_static; use object_store::limit::LimitStore; use object_store::local::LocalFileSystem; use object_store::memory::InMemory; use object_store::prefix::PrefixStore; +use object_store::{GetOptions, PutOptions, PutPayload, PutResult}; use serde::{Deserialize, Serialize}; +use tokio::runtime::Runtime; use url::Url; -use crate::{DeltaResult, DeltaTableError}; - +use bytes::Bytes; +use futures::stream::BoxStream; pub use object_store; pub use object_store::path::{Path, DELIMITER}; +use object_store::{prefix, MultipartUpload, PutMultipartOpts}; pub use object_store::{ DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result as ObjectStoreResult, }; pub use retry_ext::ObjectStoreRetryExt; +use std::ops::Range; pub use utils::*; pub mod file; @@ -30,6 +38,217 @@ lazy_static! { static ref DELTA_LOG_PATH: Path = Path::from("_delta_log"); } +/// static io runtime +fn io_rt() -> &'static Runtime { + static IO_RT: OnceLock = OnceLock::new(); + IO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime for IO.")) +} + +/// spawn taks on IO runtime +pub fn spawn_io_rt( + f: F, + store: &Arc, + path: Path, +) -> BoxFuture<'_, ObjectStoreResult> +where + F: for<'a> FnOnce(&'a Arc, &'a Path) -> BoxFuture<'a, ObjectStoreResult> + + Send + + 'static, + O: Send + 'static, +{ + let store = Arc::clone(store); + let fut = io_rt().spawn(async move { f(&store, &path).await }); + fut.unwrap_or_else(|e| match e.try_into_panic() { + Ok(p) => std::panic::resume_unwind(p), + Err(e) => Err(ObjectStoreError::JoinError { source: e }), + }) + .boxed() +} + +/// spawn taks on IO runtime +pub fn spawn_io_rt_from_to( + f: F, + store: &Arc, + from: Path, + to: Path, +) -> BoxFuture<'_, ObjectStoreResult> +where + F: for<'a> FnOnce( + &'a Arc, + &'a Path, + &'a Path, + ) -> BoxFuture<'a, ObjectStoreResult> + + Send + + 'static, + O: Send + 'static, +{ + let store = Arc::clone(store); + let fut = io_rt().spawn(async move { f(&store, &from, &to).await }); + fut.unwrap_or_else(|e| match e.try_into_panic() { + Ok(p) => std::panic::resume_unwind(p), + Err(e) => Err(ObjectStoreError::JoinError { source: e }), + }) + .boxed() +} + +/// Wraps any object store and runs IO in it's own runtime [EXPERIMENTAL] +pub struct DeltaIOStorageBackend { + inner: ObjectStoreRef, +} + +impl DeltaIOStorageBackend { + /// create wrapped object store which spawns tasks in own runtime + pub fn new(storage: ObjectStoreRef) -> Self { + Self { inner: storage } + } +} + +impl std::fmt::Debug for DeltaIOStorageBackend { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "DeltaIOStorageBackend") + } +} + +impl std::fmt::Display for DeltaIOStorageBackend { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "DeltaIOStorageBackend") + } +} + +#[async_trait::async_trait] +impl ObjectStore for DeltaIOStorageBackend { + async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult { + spawn_io_rt( + |store, path| store.put(path, bytes), + &self.inner, + location.clone(), + ) + .await + } + + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + options: PutOptions, + ) -> ObjectStoreResult { + spawn_io_rt( + |store, path| store.put_opts(path, bytes, options), + &self.inner, + location.clone(), + ) + .await + } + + async fn get(&self, location: &Path) -> ObjectStoreResult { + spawn_io_rt(|store, path| store.get(path), &self.inner, location.clone()).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + spawn_io_rt( + |store, path| store.get_opts(path, options), + &self.inner, + location.clone(), + ) + .await + } + + async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { + spawn_io_rt( + |store, path| store.get_range(path, range), + &self.inner, + location.clone(), + ) + .await + } + + async fn head(&self, location: &Path) -> ObjectStoreResult { + spawn_io_rt( + |store, path| store.head(path), + &self.inner, + location.clone(), + ) + .await + } + + async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + spawn_io_rt( + |store, path| store.delete(path), + &self.inner, + location.clone(), + ) + .await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'_, ObjectStoreResult> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + spawn_io_rt_from_to( + |store, from_path, to_path| store.copy(from_path, to_path), + &self.inner, + from.clone(), + to.clone(), + ) + .await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + spawn_io_rt_from_to( + |store, from_path, to_path| store.copy_if_not_exists(from_path, to_path), + &self.inner, + from.clone(), + to.clone(), + ) + .await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + spawn_io_rt_from_to( + |store, from_path, to_path| store.rename_if_not_exists(from_path, to_path), + &self.inner, + from.clone(), + to.clone(), + ) + .await + } + + async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> { + spawn_io_rt( + |store, path| store.put_multipart(path), + &self.inner, + location.clone(), + ) + .await + } + + async fn put_multipart_opts( + &self, + location: &Path, + options: PutMultipartOpts, + ) -> ObjectStoreResult> { + spawn_io_rt( + |store, path| store.put_multipart_opts(path, options), + &self.inner, + location.clone(), + ) + .await + } +} + /// Sharable reference to [`ObjectStore`] pub type ObjectStoreRef = Arc; @@ -178,6 +397,21 @@ pub fn limit_store_handler(store: T, options: &StorageOptions) - } } +/// Simple function to wrap the given [ObjectStore] in a [DeltaIOStorageBackend] if USE_EXPERIMENTAL_IO_RUNTIME is enabled +pub fn deltars_store_handler(store: ObjectStoreRef, options: &StorageOptions) -> ObjectStoreRef { + let use_exerimental_rt = options + .0 + .get(storage_constants::EXPERIMENTAL_RT) + .and_then(|v| v.parse::().ok()) + .unwrap_or_default(); + + if use_exerimental_rt { + Arc::new(DeltaIOStorageBackend::new(store)) + } else { + Arc::new(store) + } +} + /// Storage option keys to use when creating [ObjectStore]. /// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. /// Must be implemented for a given storage provider @@ -186,6 +420,8 @@ pub mod storage_constants { /// The number of concurrent connections the underlying object store can create /// Reference [LimitStore](https://docs.rs/object_store/latest/object_store/limit/struct.LimitStore.html) for more information pub const OBJECT_STORE_CONCURRENCY_LIMIT: &str = "OBJECT_STORE_CONCURRENCY_LIMIT"; + /// Whether to use experemenital IO runtime + pub const EXPERIMENTAL_RT: &str = "USE_EXPERIMENTAL_IO_RUNTIME"; } #[cfg(test)] @@ -221,4 +457,34 @@ mod tests { format!("{limited}") ); } + #[test] + fn test_deltars_store_handler_enabled() { + let store = Box::new(InMemory::new()) as Box; + + let options = StorageOptions(HashMap::from_iter(vec![( + "USE_EXPERIMENTAL_IO_RUNTIME".into(), + "true".into(), + )])); + + let experimental_io_store = deltars_store_handler(store.into(), &options); + + assert_eq!( + String::from("DeltaIOStorageBackend"), + format!("{experimental_io_store}") + ); + } + + #[test] + fn test_deltars_store_handler_disabled() { + let store = Box::new(InMemory::new()) as Box; + + let options = StorageOptions(HashMap::from_iter(vec![( + "USE_EXPERIMENTAL_IO_RUNTIME".into(), + "false".into(), + )])); + + let experimental_io_store = deltars_store_handler(store.into(), &options); + + assert_eq!(String::from("InMemory"), format!("{experimental_io_store}")); + } } diff --git a/crates/gcp/src/lib.rs b/crates/gcp/src/lib.rs index e50681ed30..07689be549 100644 --- a/crates/gcp/src/lib.rs +++ b/crates/gcp/src/lib.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; use deltalake_core::storage::{ - factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, - StorageOptions, + deltars_store_handler, factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, + ObjectStoreRef, StorageOptions, }; use deltalake_core::{DeltaResult, Path}; use object_store::gcp::GoogleConfigKey; @@ -46,7 +46,10 @@ impl ObjectStoreFactory for GcpFactory { let config = config::GcpConfigHelper::try_new(options.as_gcp_options())?.build()?; let (inner, prefix) = parse_url_opts(url, config)?; let gcs_backend = crate::storage::GcsStorageBackend::try_new(Arc::new(inner))?; - let store = limit_store_handler(url_prefix_handler(gcs_backend, prefix.clone()), options); + let store = deltars_store_handler( + limit_store_handler(url_prefix_handler(gcs_backend, prefix.clone()), options), + options, + ); Ok((store, prefix)) } } diff --git a/crates/hdfs/src/lib.rs b/crates/hdfs/src/lib.rs index 45b14740b7..392620207a 100644 --- a/crates/hdfs/src/lib.rs +++ b/crates/hdfs/src/lib.rs @@ -2,7 +2,8 @@ use std::sync::Arc; use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; use deltalake_core::storage::{ - factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions, + deltars_store_handler, factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, + StorageOptions, }; use deltalake_core::{DeltaResult, Path}; use hdfs_native_object_store::HdfsObjectStore; @@ -22,7 +23,10 @@ impl ObjectStoreFactory for HdfsFactory { options.0.clone(), )?); let prefix = Path::parse(url.path())?; - Ok((url_prefix_handler(store, prefix.clone()), prefix)) + Ok(( + deltars_store_handler(url_prefix_handler(store, prefix.clone()), options), + prefix, + )) } } diff --git a/python/src/lib.rs b/python/src/lib.rs index 0f2a8151db..be45a2156e 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -24,7 +24,6 @@ use deltalake::datafusion::catalog::TableProvider; use deltalake::datafusion::datasource::memory::MemTable; use deltalake::datafusion::physical_plan::ExecutionPlan; use deltalake::datafusion::prelude::SessionContext; -use deltalake::delta_datafusion::cdf::FileAction; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; use deltalake::kernel::{