diff --git a/Cargo.toml b/Cargo.toml index 63a17924bd..9305aa31bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,4 +49,4 @@ uuid = { version = "1" } async-trait = { version = "0.1" } futures = { version = "0.3" } tokio = { version = "1" } -num_cpus = { version = "1" } +num_cpus = { version = "1" } \ No newline at end of file diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index a89cbb3fc5..a9859b2c9b 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -124,6 +124,8 @@ pub(crate) fn configure_store( try_configure_s3(url, options) } else if host.contains("dfs.core.windows.net") || host.contains("blob.core.windows.net") + || host.contains("dfs.fabric.microsoft.com") + || host.contains("blob.fabric.microsoft.com") { try_configure_azure(url, options) } else { diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index a11bff8c17..352a46d2b3 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -5,6 +5,7 @@ use chrono::Utc; use fs_extra::dir::{copy, CopyOptions}; use object_store::DynObjectStore; use serde_json::json; +use std::env; use std::sync::Arc; use tempdir::TempDir; @@ -33,8 +34,29 @@ impl IntegrationContext { // create a fresh bucket in every context. THis is done via CLI... let bucket = match integration { StorageIntegration::Local => tmp_dir.as_ref().to_str().unwrap().to_owned(), + StorageIntegration::Onelake => { + let account_name = + env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); + let container_name = + env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); + format!( + "{0}.dfs.fabric.microsoft.com/{1}", + account_name, container_name + ) + } + StorageIntegration::OnelakeAbfs => { + let account_name = + env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); + let container_name = + env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); + format!( + "{0}@{1}.dfs.fabric.microsoft.com", + container_name, account_name + ) + } _ => format!("test-delta-table-{}", Utc::now().timestamp()), }; + if let StorageIntegration::Google = integration { gs_cli::prepare_env(); let base_url = std::env::var("GOOGLE_BASE_URL")?; @@ -46,10 +68,13 @@ impl IntegrationContext { account_path.as_path().to_str().unwrap(), ); } + integration.create_bucket(&bucket)?; let store_uri = match integration { StorageIntegration::Amazon => format!("s3://{}", &bucket), StorageIntegration::Microsoft => format!("az://{}", &bucket), + StorageIntegration::Onelake => format!("https://{}", &bucket), + StorageIntegration::OnelakeAbfs => format!("abfss://{}", &bucket), StorageIntegration::Google => format!("gs://{}", &bucket), StorageIntegration::Local => format!("file://{}", &bucket), StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &bucket), @@ -84,6 +109,8 @@ impl IntegrationContext { match self.integration { StorageIntegration::Amazon => format!("s3://{}", &self.bucket), StorageIntegration::Microsoft => format!("az://{}", &self.bucket), + StorageIntegration::Onelake => format!("https://{}", &self.bucket), + StorageIntegration::OnelakeAbfs => format!("abfss://{}", &self.bucket), StorageIntegration::Google => format!("gs://{}", &self.bucket), StorageIntegration::Local => format!("file://{}", &self.bucket), StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &self.bucket), @@ -149,6 +176,8 @@ impl Drop for IntegrationContext { StorageIntegration::Google => { gs_cli::delete_bucket(&self.bucket).unwrap(); } + StorageIntegration::Onelake => (), + StorageIntegration::OnelakeAbfs => (), StorageIntegration::Local => (), StorageIntegration::Hdfs => { hdfs_cli::delete_dir(&self.bucket).unwrap(); @@ -161,17 +190,21 @@ impl Drop for IntegrationContext { pub enum StorageIntegration { Amazon, Microsoft, + Onelake, Google, Local, Hdfs, + OnelakeAbfs, } impl StorageIntegration { fn prepare_env(&self) { match self { Self::Microsoft => az_cli::prepare_env(), + Self::Onelake => onelake_cli::prepare_env(), Self::Amazon => s3_cli::prepare_env(), Self::Google => gs_cli::prepare_env(), + Self::OnelakeAbfs => onelake_cli::prepare_env(), Self::Local => (), Self::Hdfs => (), } @@ -183,6 +216,8 @@ impl StorageIntegration { az_cli::create_container(name)?; Ok(()) } + Self::Onelake => Ok(()), + Self::OnelakeAbfs => Ok(()), Self::Amazon => { s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?; set_env_if_not_set( @@ -264,6 +299,22 @@ pub fn set_env_if_not_set(key: impl AsRef, value: impl AsRef) { }; } +//cli for onelake +pub mod onelake_cli { + use super::set_env_if_not_set; + /// prepare_env + pub fn prepare_env() { + let token = "jwt-token"; + set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "0"); + set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "daily-onelake"); + set_env_if_not_set( + "AZURE_STORAGE_CONTAINER_NAME", + "86bc63cf-5086-42e0-b16d-6bc580d1dc87", + ); + set_env_if_not_set("AZURE_STORAGE_TOKEN", token); + } +} + /// small wrapper around az cli pub mod az_cli { use super::set_env_if_not_set; diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index 986b7b4026..1530442095 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -367,7 +367,9 @@ pub(crate) fn divide_by_partition_values( .map(|c| Ok(take(values.column(schema.index_of(c)?), &indices, None)?)) .collect::, DeltaWriterError>>()?; - let partition_ranges = partition(sorted_partition_columns.as_slice())?; + #[allow(warnings)] + let partition_ranges = + arrow::compute::lexicographical_partition_ranges(sorted_partition_columns.as_slice())?; for range in partition_ranges.ranges().into_iter() { // get row indices for current partition diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index 6fd31d759b..f75a31b020 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -22,6 +22,24 @@ async fn test_object_store_azure() -> TestResult { Ok(()) } +#[cfg(feature = "azure")] +#[tokio::test] +#[serial] +async fn test_object_store_onelake() -> TestResult { + let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv"); + read_write_test_onelake(StorageIntegration::Onelake, &path).await?; + Ok(()) +} + +#[cfg(feature = "azure")] +#[tokio::test] +#[serial] +async fn test_object_store_onelake_abfs() -> TestResult { + let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv"); + read_write_test_onelake(StorageIntegration::OnelakeAbfs, &path).await?; + Ok(()) +} + #[cfg(feature = "s3")] #[tokio::test] #[serial] @@ -48,6 +66,31 @@ async fn test_object_store_hdfs() -> TestResult { Ok(()) } +async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) -> TestResult { + let context = IntegrationContext::new(integration)?; + + //println!("line 102-{:#?}",context.root_uri()); + + let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) + .with_allow_http(true) + .build_storage()?; + + //println!("{:#?}",delta_store); + + let expected = Bytes::from_static(b"test world from delta-rs on friday"); + + delta_store.put(path, expected.clone()).await.unwrap(); + let fetched = delta_store.get(path).await.unwrap().bytes().await.unwrap(); + assert_eq!(expected, fetched); + + for range in [0..10, 3..5, 0..expected.len()] { + let data = delta_store.get_range(path, range.clone()).await.unwrap(); + assert_eq!(&data[..], &expected[range]) + } + + Ok(()) +} + async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> TestResult { let context = IntegrationContext::new(integration)?; let delta_store = DeltaTableBuilder::from_uri(&context.root_uri())