From a4f0c085584db91d8c4f9b54e7d55902b1a3d7f1 Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Thu, 27 Jul 2023 01:46:21 +0530 Subject: [PATCH 01/11] Changes for one-lake fix --- rust/src/storage/config.rs | 4 ++- rust/src/test_utils.rs | 46 ++++++++++++++++++++++++-- rust/tests/integration_object_store.rs | 43 ++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 3 deletions(-) diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index a89cbb3fc5..b309a6c7a2 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -124,6 +124,8 @@ pub(crate) fn configure_store( try_configure_s3(url, options) } else if host.contains("dfs.core.windows.net") || host.contains("blob.core.windows.net") + || host.contains("dfs.fabric.microsoft.com") + || host.contains("blob.fabric.microsoft.com") { try_configure_azure(url, options) } else { @@ -179,7 +181,7 @@ fn try_configure_azure( options: &StorageOptions, ) -> DeltaResult> { let store = MicrosoftAzureBuilder::from_env() - .with_url(storage_url.as_ref()) + .with_url(storage_url.as_ref()) .try_with_options(&options.as_azure_options())? .with_allow_http(options.allow_http()) .build()?; diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index a11bff8c17..7fd18ab246 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -7,6 +7,7 @@ use object_store::DynObjectStore; use serde_json::json; use std::sync::Arc; use tempdir::TempDir; +use std::env; pub type TestResult = Result<(), Box>; @@ -33,8 +34,19 @@ impl IntegrationContext { // create a fresh bucket in every context. THis is done via CLI... let bucket = match integration { StorageIntegration::Local => tmp_dir.as_ref().to_str().unwrap().to_owned(), - _ => format!("test-delta-table-{}", Utc::now().timestamp()), + StorageIntegration::Onelake => { + let account_name = env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); + let container_name = env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); + format!("{0}.dfs.fabric.microsoft.com/{1}", account_name,container_name) + }, + StorageIntegration::OnelakeAbfs =>{ + let account_name = env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); + let container_name = env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); + format!("{0}@{1}.dfs.fabric.microsoft.com",container_name,account_name) + } + _ => format!("test-delta-table-{}", Utc::now().timestamp()), }; + if let StorageIntegration::Google = integration { gs_cli::prepare_env(); let base_url = std::env::var("GOOGLE_BASE_URL")?; @@ -46,10 +58,13 @@ impl IntegrationContext { account_path.as_path().to_str().unwrap(), ); } + integration.create_bucket(&bucket)?; let store_uri = match integration { StorageIntegration::Amazon => format!("s3://{}", &bucket), StorageIntegration::Microsoft => format!("az://{}", &bucket), + StorageIntegration::Onelake => format!("https://{}", &bucket), + StorageIntegration::OnelakeAbfs => format!("abfss://{}", &bucket), StorageIntegration::Google => format!("gs://{}", &bucket), StorageIntegration::Local => format!("file://{}", &bucket), StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &bucket), @@ -80,10 +95,12 @@ impl IntegrationContext { } /// Get the URI for initializing a store at the root - pub fn root_uri(&self) -> String { + pub fn root_uri(&self) -> String { match self.integration { StorageIntegration::Amazon => format!("s3://{}", &self.bucket), StorageIntegration::Microsoft => format!("az://{}", &self.bucket), + StorageIntegration::Onelake => format!("https://{}", &self.bucket), + StorageIntegration::OnelakeAbfs => format!("abfss://{}", &self.bucket), StorageIntegration::Google => format!("gs://{}", &self.bucket), StorageIntegration::Local => format!("file://{}", &self.bucket), StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &self.bucket), @@ -149,6 +166,8 @@ impl Drop for IntegrationContext { StorageIntegration::Google => { gs_cli::delete_bucket(&self.bucket).unwrap(); } + StorageIntegration::Onelake => (), + StorageIntegration::OnelakeAbfs => (), StorageIntegration::Local => (), StorageIntegration::Hdfs => { hdfs_cli::delete_dir(&self.bucket).unwrap(); @@ -161,17 +180,21 @@ impl Drop for IntegrationContext { pub enum StorageIntegration { Amazon, Microsoft, + Onelake, Google, Local, Hdfs, + OnelakeAbfs } impl StorageIntegration { fn prepare_env(&self) { match self { Self::Microsoft => az_cli::prepare_env(), + Self::Onelake => onelake_cli::prepare_env(), Self::Amazon => s3_cli::prepare_env(), Self::Google => gs_cli::prepare_env(), + Self::OnelakeAbfs => onelake_cli::prepare_env(), Self::Local => (), Self::Hdfs => (), } @@ -183,6 +206,12 @@ impl StorageIntegration { az_cli::create_container(name)?; Ok(()) } + Self::Onelake => { + Ok(()) + } + Self::OnelakeAbfs => { + Ok(()) + } Self::Amazon => { s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?; set_env_if_not_set( @@ -264,6 +293,19 @@ pub fn set_env_if_not_set(key: impl AsRef, value: impl AsRef) { }; } +//cli for onelake +pub mod onelake_cli { + use super::set_env_if_not_set; + /// prepare_env + pub fn prepare_env() { + let token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6Ii1LSTNROW5OUjdiUm9meG1lWm9YcWJIWkdldyIsImtpZCI6Ii1LSTNROW5OUjdiUm9meG1lWm9YcWJIWkdldyJ9.eyJhdWQiOiJodHRwczovL2FuYWx5c2lzLndpbmRvd3MubmV0L3Bvd2VyYmkvYXBpIiwiaXNzIjoiaHR0cHM6Ly9zdHMud2luZG93cy5uZXQvNzJmOTg4YmYtODZmMS00MWFmLTkxYWItMmQ3Y2QwMTFkYjQ3LyIsImlhdCI6MTY5MDM4NzE3MiwibmJmIjoxNjkwMzg3MTcyLCJleHAiOjE2OTAzOTE5NzQsImFjY3QiOjAsImFjciI6IjEiLCJhaW8iOiJBWVFBZS84VUFBQUF2L2VCOVBpdHk0ZGMrRWJkWHV5TEh3UFVOLytDaDduUGtqLy9SK2ZUVjMrMXRHeHN6UDhMeklhRFFURmdTWm4weURsekRVNXMwaG1IbjlCTklJRmZLMElRVy9vZGJseXU0aDdRWE9qelQ5S0pmeDU1Wk9GM0lSOVhqdXk4dE9LWDZOSEZSL1BlM1dRUXVDY3NQK3I1UVlDaktDV3lObitDNW5CVEhFQXZHb3M9IiwiYW1yIjpbInJzYSIsIm1mYSJdLCJhcHBpZCI6Ijg3MWMwMTBmLTVlNjEtNGZiMS04M2FjLTk4NjEwYTdlOTExMCIsImFwcGlkYWNyIjoiMCIsImNvbnRyb2xzIjpbImFwcF9yZXMiXSwiY29udHJvbHNfYXVkcyI6WyI4NzFjMDEwZi01ZTYxLTRmYjEtODNhYy05ODYxMGE3ZTkxMTAiLCIwMDAwMDAwOS0wMDAwLTAwMDAtYzAwMC0wMDAwMDAwMDAwMDAiLCIwMDAwMDAwMy0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDAiXSwiZGV2aWNlaWQiOiJhNWEwNGMzNC1kNjhkLTQxODktYTA5OS0zNTlmMmUyMTM3YmIiLCJmYW1pbHlfbmFtZSI6Ik11ZGRhc3NpciIsImdpdmVuX25hbWUiOiJNb2hhbW1lZCIsImlwYWRkciI6IjI0MDU6MjAxOmUwMjY6ZTE5YzpkNWVlOmZjY2Q6MWRmNDo5MGUiLCJuYW1lIjoiTW9oYW1tZWQgTXVkZGFzc2lyIChpTGluayBTeXN0ZW1zIEluYykiLCJvaWQiOiI5Yjk3ZWRkZC0yYTgyLTRiYTgtYmYyZi03YWFlM2E1NTIwN2IiLCJvbnByZW1fc2lkIjoiUy0xLTUtMjEtMjEyNzUyMTE4NC0xNjA0MDEyOTIwLTE4ODc5Mjc1MjctNjg3NDI5OTgiLCJwdWlkIjoiMTAwMzIwMDJCNDA5OTgxNyIsInJoIjoiMC5BUm9BdjRqNWN2R0dyMEdScXkxODBCSGJSd2tBQUFBQUFBQUF3QUFBQUFBQUFBQWFBTlEuIiwic2NwIjoidXNlcl9pbXBlcnNvbmF0aW9uIiwic2lnbmluX3N0YXRlIjpbImR2Y19tbmdkIiwiZHZjX2NtcCIsImttc2kiXSwic3ViIjoiaEMtTkI0MDZSYzBiZEpGM056ejFMT0J5Mkl2bTE2RzNwUzM1dW54TEItWSIsInRpZCI6IjcyZjk4OGJmLTg2ZjEtNDFhZi05MWFiLTJkN2NkMDExZGI0NyIsInVuaXF1ZV9uYW1lIjoidi1tbXVkZGFzc2lyQG1pY3Jvc29mdC5jb20iLCJ1cG4iOiJ2LW1tdWRkYXNzaXJAbWljcm9zb2Z0LmNvbSIsInV0aSI6IjJ4NjVKNjg4aWtXU01rSzcxdm9BQUEiLCJ2ZXIiOiIxLjAiLCJ3aWRzIjpbImI3OWZiZjRkLTNlZjktNDY4OS04MTQzLTc2YjE5NGU4NTUwOSJdLCJ4bXNfY2MiOlsiQ1AxIl19.QRzJ8STgBiRYYeFmoF6mxI4u9La8NmYqCiXMx8xkvqEzU2iu_Ma4LrrII6YZRPzU8pwMurL3DrWaiPgOFr3UcTYgSOz5sxVEEyIS0us2RlgoZvIvRh7ikB77tg-sm0FD9BdOBFLOD4J0pkFDtTFgMq8TWZHOZUA6rpOtR1ziJfile-QwDkKoYpIa64caEEahq2o3SrCYD9IU1_9FDfBZSLGfY6aqQ3ucyLHflA3IPoECmoiu4EvqILvkt2G_Nj5GDrcTCctqeNEuupMCKNwT8E7B7nviKIzFObJl_eaohV3c2IgGYOlEVis4QaH4p7HExdthND-hXlgpntkr5Ck4Lg"; + set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "0"); + set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "daily-onelake"); + set_env_if_not_set("AZURE_STORAGE_CONTAINER_NAME", "86bc63cf-5086-42e0-b16d-6bc580d1dc87"); + set_env_if_not_set("AZURE_STORAGE_TOKEN", token); + } +} + /// small wrapper around az cli pub mod az_cli { use super::set_env_if_not_set; diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index 6fd31d759b..bb8637174c 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -22,6 +22,24 @@ async fn test_object_store_azure() -> TestResult { Ok(()) } +#[cfg(feature = "azure")] +#[tokio::test] +#[serial] +async fn test_object_store_onelake() -> TestResult { + let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv"); + read_write_test_onelake(StorageIntegration::Onelake, &path).await?; + Ok(()) +} + +#[cfg(feature = "azure")] +#[tokio::test] +#[serial] +async fn test_object_store_onelake_abfs() -> TestResult { + let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv"); + read_write_test_onelake(StorageIntegration::OnelakeAbfs, &path).await?; + Ok(()) +} + #[cfg(feature = "s3")] #[tokio::test] #[serial] @@ -48,6 +66,31 @@ async fn test_object_store_hdfs() -> TestResult { Ok(()) } +async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) -> TestResult{ + let context = IntegrationContext::new(integration)?; + + //println!("line 102-{:#?}",context.root_uri()); + + let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) + .with_allow_http(true) + .build_storage()?; + + //println!("{:#?}",delta_store); + + let expected = Bytes::from_static(b"test world from delta-rs on friday"); + + delta_store.put(path, expected.clone()).await.unwrap(); + let fetched = delta_store.get(path).await.unwrap().bytes().await.unwrap(); + assert_eq!(expected, fetched); + + for range in [0..10, 3..5, 0..expected.len()] { + let data = delta_store.get_range(path, range.clone()).await.unwrap(); + assert_eq!(&data[..], &expected[range]) + } + + Ok(()) +} + async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> TestResult { let context = IntegrationContext::new(integration)?; let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) From 2a773bd18628a3484016d94105d7bd2bc2c19ffd Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Thu, 27 Jul 2023 01:51:06 +0530 Subject: [PATCH 02/11] Remove token --- rust/src/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 7fd18ab246..02138a9b4d 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -298,7 +298,7 @@ pub mod onelake_cli { use super::set_env_if_not_set; /// prepare_env pub fn prepare_env() { - let token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6Ii1LSTNROW5OUjdiUm9meG1lWm9YcWJIWkdldyIsImtpZCI6Ii1LSTNROW5OUjdiUm9meG1lWm9YcWJIWkdldyJ9.eyJhdWQiOiJodHRwczovL2FuYWx5c2lzLndpbmRvd3MubmV0L3Bvd2VyYmkvYXBpIiwiaXNzIjoiaHR0cHM6Ly9zdHMud2luZG93cy5uZXQvNzJmOTg4YmYtODZmMS00MWFmLTkxYWItMmQ3Y2QwMTFkYjQ3LyIsImlhdCI6MTY5MDM4NzE3MiwibmJmIjoxNjkwMzg3MTcyLCJleHAiOjE2OTAzOTE5NzQsImFjY3QiOjAsImFjciI6IjEiLCJhaW8iOiJBWVFBZS84VUFBQUF2L2VCOVBpdHk0ZGMrRWJkWHV5TEh3UFVOLytDaDduUGtqLy9SK2ZUVjMrMXRHeHN6UDhMeklhRFFURmdTWm4weURsekRVNXMwaG1IbjlCTklJRmZLMElRVy9vZGJseXU0aDdRWE9qelQ5S0pmeDU1Wk9GM0lSOVhqdXk4dE9LWDZOSEZSL1BlM1dRUXVDY3NQK3I1UVlDaktDV3lObitDNW5CVEhFQXZHb3M9IiwiYW1yIjpbInJzYSIsIm1mYSJdLCJhcHBpZCI6Ijg3MWMwMTBmLTVlNjEtNGZiMS04M2FjLTk4NjEwYTdlOTExMCIsImFwcGlkYWNyIjoiMCIsImNvbnRyb2xzIjpbImFwcF9yZXMiXSwiY29udHJvbHNfYXVkcyI6WyI4NzFjMDEwZi01ZTYxLTRmYjEtODNhYy05ODYxMGE3ZTkxMTAiLCIwMDAwMDAwOS0wMDAwLTAwMDAtYzAwMC0wMDAwMDAwMDAwMDAiLCIwMDAwMDAwMy0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDAiXSwiZGV2aWNlaWQiOiJhNWEwNGMzNC1kNjhkLTQxODktYTA5OS0zNTlmMmUyMTM3YmIiLCJmYW1pbHlfbmFtZSI6Ik11ZGRhc3NpciIsImdpdmVuX25hbWUiOiJNb2hhbW1lZCIsImlwYWRkciI6IjI0MDU6MjAxOmUwMjY6ZTE5YzpkNWVlOmZjY2Q6MWRmNDo5MGUiLCJuYW1lIjoiTW9oYW1tZWQgTXVkZGFzc2lyIChpTGluayBTeXN0ZW1zIEluYykiLCJvaWQiOiI5Yjk3ZWRkZC0yYTgyLTRiYTgtYmYyZi03YWFlM2E1NTIwN2IiLCJvbnByZW1fc2lkIjoiUy0xLTUtMjEtMjEyNzUyMTE4NC0xNjA0MDEyOTIwLTE4ODc5Mjc1MjctNjg3NDI5OTgiLCJwdWlkIjoiMTAwMzIwMDJCNDA5OTgxNyIsInJoIjoiMC5BUm9BdjRqNWN2R0dyMEdScXkxODBCSGJSd2tBQUFBQUFBQUF3QUFBQUFBQUFBQWFBTlEuIiwic2NwIjoidXNlcl9pbXBlcnNvbmF0aW9uIiwic2lnbmluX3N0YXRlIjpbImR2Y19tbmdkIiwiZHZjX2NtcCIsImttc2kiXSwic3ViIjoiaEMtTkI0MDZSYzBiZEpGM056ejFMT0J5Mkl2bTE2RzNwUzM1dW54TEItWSIsInRpZCI6IjcyZjk4OGJmLTg2ZjEtNDFhZi05MWFiLTJkN2NkMDExZGI0NyIsInVuaXF1ZV9uYW1lIjoidi1tbXVkZGFzc2lyQG1pY3Jvc29mdC5jb20iLCJ1cG4iOiJ2LW1tdWRkYXNzaXJAbWljcm9zb2Z0LmNvbSIsInV0aSI6IjJ4NjVKNjg4aWtXU01rSzcxdm9BQUEiLCJ2ZXIiOiIxLjAiLCJ3aWRzIjpbImI3OWZiZjRkLTNlZjktNDY4OS04MTQzLTc2YjE5NGU4NTUwOSJdLCJ4bXNfY2MiOlsiQ1AxIl19.QRzJ8STgBiRYYeFmoF6mxI4u9La8NmYqCiXMx8xkvqEzU2iu_Ma4LrrII6YZRPzU8pwMurL3DrWaiPgOFr3UcTYgSOz5sxVEEyIS0us2RlgoZvIvRh7ikB77tg-sm0FD9BdOBFLOD4J0pkFDtTFgMq8TWZHOZUA6rpOtR1ziJfile-QwDkKoYpIa64caEEahq2o3SrCYD9IU1_9FDfBZSLGfY6aqQ3ucyLHflA3IPoECmoiu4EvqILvkt2G_Nj5GDrcTCctqeNEuupMCKNwT8E7B7nviKIzFObJl_eaohV3c2IgGYOlEVis4QaH4p7HExdthND-hXlgpntkr5Ck4Lg"; + let token = "jwt-token"; set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "0"); set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "daily-onelake"); set_env_if_not_set("AZURE_STORAGE_CONTAINER_NAME", "86bc63cf-5086-42e0-b16d-6bc580d1dc87"); From 2ee3d5a6516a038314f7dc85e6080ae4a29d1d29 Mon Sep 17 00:00:00 2001 From: vmuddassir-msft <140655500+vmuddassir-msft@users.noreply.github.com> Date: Mon, 21 Aug 2023 11:23:43 +0530 Subject: [PATCH 03/11] Update rust/src/storage/config.rs Co-authored-by: Christopher Watford <132389385+watfordkcf@users.noreply.github.com> --- rust/src/storage/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index b309a6c7a2..a9859b2c9b 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -181,7 +181,7 @@ fn try_configure_azure( options: &StorageOptions, ) -> DeltaResult> { let store = MicrosoftAzureBuilder::from_env() - .with_url(storage_url.as_ref()) + .with_url(storage_url.as_ref()) .try_with_options(&options.as_azure_options())? .with_allow_http(options.allow_http()) .build()?; From 4cca09351ff6e2be2a09f0b929c60a39a578542c Mon Sep 17 00:00:00 2001 From: vmuddassir-msft <140655500+vmuddassir-msft@users.noreply.github.com> Date: Mon, 21 Aug 2023 11:23:49 +0530 Subject: [PATCH 04/11] Update rust/src/test_utils.rs Co-authored-by: Christopher Watford <132389385+watfordkcf@users.noreply.github.com> --- rust/src/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 02138a9b4d..36718d1612 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -37,7 +37,7 @@ impl IntegrationContext { StorageIntegration::Onelake => { let account_name = env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); let container_name = env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); - format!("{0}.dfs.fabric.microsoft.com/{1}", account_name,container_name) + format!("{0}.dfs.fabric.microsoft.com/{1}", account_name, container_name) }, StorageIntegration::OnelakeAbfs =>{ let account_name = env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); From 8a2f2aa5802e80d06c432eb550b888934434ca61 Mon Sep 17 00:00:00 2001 From: vmuddassir-msft <140655500+vmuddassir-msft@users.noreply.github.com> Date: Mon, 21 Aug 2023 11:23:56 +0530 Subject: [PATCH 05/11] Update rust/src/test_utils.rs Co-authored-by: Christopher Watford <132389385+watfordkcf@users.noreply.github.com> --- rust/src/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 36718d1612..4eea6eb826 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -42,7 +42,7 @@ impl IntegrationContext { StorageIntegration::OnelakeAbfs =>{ let account_name = env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); let container_name = env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); - format!("{0}@{1}.dfs.fabric.microsoft.com",container_name,account_name) + format!("{0}@{1}.dfs.fabric.microsoft.com", container_name, account_name) } _ => format!("test-delta-table-{}", Utc::now().timestamp()), }; From f0f11eb3ade3b59cd50f0b5a0dda319aaca813ed Mon Sep 17 00:00:00 2001 From: vmuddassir-msft <140655500+vmuddassir-msft@users.noreply.github.com> Date: Mon, 21 Aug 2023 11:24:03 +0530 Subject: [PATCH 06/11] Update rust/src/test_utils.rs Co-authored-by: Christopher Watford <132389385+watfordkcf@users.noreply.github.com> --- rust/src/test_utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 4eea6eb826..82d719edf9 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -95,7 +95,7 @@ impl IntegrationContext { } /// Get the URI for initializing a store at the root - pub fn root_uri(&self) -> String { + pub fn root_uri(&self) -> String { match self.integration { StorageIntegration::Amazon => format!("s3://{}", &self.bucket), StorageIntegration::Microsoft => format!("az://{}", &self.bucket), From ca48d52d743568762b9a8814570cadeaf73d207a Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Wed, 6 Sep 2023 21:09:45 +0530 Subject: [PATCH 07/11] upgrade versions for arrow and datafusion --- Cargo.toml | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1ced24b917..5745f52556 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,23 +14,23 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "43" } -arrow-array = { version = "43" } -arrow-buffer = { version = "43" } -arrow-cast = { version = "43" } -arrow-ord = { version = "43" } -arrow-row = { version = "43" } -arrow-schema = { version = "43" } -arrow-select = { version = "43" } -parquet = { version = "43" } +arrow = { version = "46.0.0" } +arrow-array = { version = "46.0.0" } +arrow-buffer = { version = "46.0.0" } +arrow-cast = { version = "46.0.0" } +arrow-ord = { version = "46.0.0" } +arrow-row = { version = "46.0.0" } +arrow-schema = { version = "46.0.0" } +arrow-select = { version = "46.0.0" } +parquet = { version = "46.0.0" } # datafusion -datafusion = { version = "28" } -datafusion-expr = { version = "28" } -datafusion-common = { version = "28" } -datafusion-proto = { version = "28" } -datafusion-sql = { version = "28" } -datafusion-physical-expr = { version = "28" } +datafusion = { version = "30" } +datafusion-expr = { version = "30" } +datafusion-common = { version = "30" } +datafusion-proto = { version = "30" } +datafusion-sql = { version = "30" } +datafusion-physical-expr = { version = "30" } # serde serde = { version = "1", features = ["derive"] } @@ -48,4 +48,4 @@ uuid = { version = "1" } async-trait = { version = "0.1" } futures = { version = "0.3" } tokio = { version = "1" } -num_cpus = { version = "1" } +num_cpus = { version = "1" } \ No newline at end of file From 8d6b814e24b53e37b9f3fc5efb276abe68ecba00 Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Thu, 7 Sep 2023 16:20:11 +0530 Subject: [PATCH 08/11] Uprade versions arrow depedencies --- Cargo.toml | 18 +++++++++--------- rust/Cargo.toml | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6503adfc4b..e0071a1894 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,15 +15,15 @@ debug = "line-tables-only" [workspace.dependencies] # arrow -arrow = { version = "45" } -arrow-array = { version = "45" } -arrow-buffer = { version = "45" } -arrow-cast = { version = "45" } -arrow-ord = { version = "45" } -arrow-row = { version = "45" } -arrow-schema = { version = "45" } -arrow-select = { version = "45" } -parquet = { version = "45" } +arrow = { version = "46.0.0" } +arrow-array = { version = "46.0.0" } +arrow-buffer = { version = "46.0.0" } +arrow-cast = { version = "46.0.0" } +arrow-ord = { version = "46.0.0" } +arrow-row = { version = "46.0.0" } +arrow-schema = { version = "46.0.0" } +arrow-select = { version = "46.0.0" } +parquet = { version = "46.0.0" } # datafusion datafusion = { version = "30" } diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 4c8ebea213..d8fc77564e 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -73,7 +73,7 @@ log = "0" libc = ">=0.2.90, <1" num-bigint = "0.4" num-traits = "0.2.15" -object_store = "0.6.1" +object_store = "0.7.0" once_cell = "1.16.0" parking_lot = "0.12" parquet2 = { version = "0.17", optional = true } From 15b6031335281b6e54582c369257eddb15ac8dcd Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Thu, 14 Sep 2023 17:37:07 +0530 Subject: [PATCH 09/11] Upgrade datafusion related packages --- Cargo.toml | 12 ++++++------ rust/Cargo.toml | 2 +- rust/src/writer/record_batch.rs | 5 +++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e0071a1894..9a70e8965d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,12 +26,12 @@ arrow-select = { version = "46.0.0" } parquet = { version = "46.0.0" } # datafusion -datafusion = { version = "30" } -datafusion-expr = { version = "30" } -datafusion-common = { version = "30" } -datafusion-proto = { version = "30" } -datafusion-sql = { version = "30" } -datafusion-physical-expr = { version = "30" } +datafusion = { version = "31" } +datafusion-expr = { version = "31" } +datafusion-common = { version = "31" } +datafusion-proto = { version = "31" } +datafusion-sql = { version = "31" } +datafusion-physical-expr = { version = "31" } # serde serde = { version = "1", features = ["derive"] } diff --git a/rust/Cargo.toml b/rust/Cargo.toml index d8fc77564e..b0f55aba92 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -99,7 +99,7 @@ reqwest-retry = { version = "0.2.2", optional = true } # Datafusion dashmap = { version = "5", optional = true } -sqlparser = { version = "0.36", optional = true } +sqlparser = { version = "0.37", optional = true } # NOTE dependencies only for integration tests fs_extra = { version = "1.2.0", optional = true } diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index 2b62fe686c..69056a07d4 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -29,7 +29,7 @@ use std::{collections::HashMap, sync::Arc}; use arrow::array::{Array, UInt32Array}; -use arrow::compute::{lexicographical_partition_ranges, take, SortColumn}; +use arrow::compute::{take, SortColumn}; use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; @@ -372,7 +372,8 @@ pub(crate) fn divide_by_partition_values( }) .collect::, DeltaWriterError>>()?; - let partition_ranges = lexicographical_partition_ranges(sorted_partition_columns.as_slice())?; + #[allow(warnings)] + let partition_ranges = arrow::compute::lexicographical_partition_ranges(sorted_partition_columns.as_slice())?; for range in partition_ranges { // get row indices for current partition From d109d427e179327ad370f416b7807005e5136447 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 15 Sep 2023 08:49:59 -0700 Subject: [PATCH 10/11] Run `cargo fmt` on the tree --- rust/src/test_utils.rs | 59 +++++++++++++++----------- rust/src/writer/record_batch.rs | 3 +- rust/tests/integration_object_store.rs | 6 +-- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 82d719edf9..352a46d2b3 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -5,9 +5,9 @@ use chrono::Utc; use fs_extra::dir::{copy, CopyOptions}; use object_store::DynObjectStore; use serde_json::json; +use std::env; use std::sync::Arc; use tempdir::TempDir; -use std::env; pub type TestResult = Result<(), Box>; @@ -35,16 +35,26 @@ impl IntegrationContext { let bucket = match integration { StorageIntegration::Local => tmp_dir.as_ref().to_str().unwrap().to_owned(), StorageIntegration::Onelake => { - let account_name = env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); - let container_name = env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); - format!("{0}.dfs.fabric.microsoft.com/{1}", account_name, container_name) - }, - StorageIntegration::OnelakeAbfs =>{ - let account_name = env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); - let container_name = env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); - format!("{0}@{1}.dfs.fabric.microsoft.com", container_name, account_name) - } - _ => format!("test-delta-table-{}", Utc::now().timestamp()), + let account_name = + env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); + let container_name = + env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); + format!( + "{0}.dfs.fabric.microsoft.com/{1}", + account_name, container_name + ) + } + StorageIntegration::OnelakeAbfs => { + let account_name = + env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake")); + let container_name = + env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs")); + format!( + "{0}@{1}.dfs.fabric.microsoft.com", + container_name, account_name + ) + } + _ => format!("test-delta-table-{}", Utc::now().timestamp()), }; if let StorageIntegration::Google = integration { @@ -100,7 +110,7 @@ impl IntegrationContext { StorageIntegration::Amazon => format!("s3://{}", &self.bucket), StorageIntegration::Microsoft => format!("az://{}", &self.bucket), StorageIntegration::Onelake => format!("https://{}", &self.bucket), - StorageIntegration::OnelakeAbfs => format!("abfss://{}", &self.bucket), + StorageIntegration::OnelakeAbfs => format!("abfss://{}", &self.bucket), StorageIntegration::Google => format!("gs://{}", &self.bucket), StorageIntegration::Local => format!("file://{}", &self.bucket), StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &self.bucket), @@ -184,17 +194,17 @@ pub enum StorageIntegration { Google, Local, Hdfs, - OnelakeAbfs + OnelakeAbfs, } impl StorageIntegration { fn prepare_env(&self) { match self { Self::Microsoft => az_cli::prepare_env(), - Self::Onelake => onelake_cli::prepare_env(), + Self::Onelake => onelake_cli::prepare_env(), Self::Amazon => s3_cli::prepare_env(), Self::Google => gs_cli::prepare_env(), - Self::OnelakeAbfs => onelake_cli::prepare_env(), + Self::OnelakeAbfs => onelake_cli::prepare_env(), Self::Local => (), Self::Hdfs => (), } @@ -206,12 +216,8 @@ impl StorageIntegration { az_cli::create_container(name)?; Ok(()) } - Self::Onelake => { - Ok(()) - } - Self::OnelakeAbfs => { - Ok(()) - } + Self::Onelake => Ok(()), + Self::OnelakeAbfs => Ok(()), Self::Amazon => { s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?; set_env_if_not_set( @@ -295,13 +301,16 @@ pub fn set_env_if_not_set(key: impl AsRef, value: impl AsRef) { //cli for onelake pub mod onelake_cli { - use super::set_env_if_not_set; - /// prepare_env - pub fn prepare_env() { + use super::set_env_if_not_set; + /// prepare_env + pub fn prepare_env() { let token = "jwt-token"; set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "0"); set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "daily-onelake"); - set_env_if_not_set("AZURE_STORAGE_CONTAINER_NAME", "86bc63cf-5086-42e0-b16d-6bc580d1dc87"); + set_env_if_not_set( + "AZURE_STORAGE_CONTAINER_NAME", + "86bc63cf-5086-42e0-b16d-6bc580d1dc87", + ); set_env_if_not_set("AZURE_STORAGE_TOKEN", token); } } diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index 69056a07d4..cba936d779 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -373,7 +373,8 @@ pub(crate) fn divide_by_partition_values( .collect::, DeltaWriterError>>()?; #[allow(warnings)] - let partition_ranges = arrow::compute::lexicographical_partition_ranges(sorted_partition_columns.as_slice())?; + let partition_ranges = + arrow::compute::lexicographical_partition_ranges(sorted_partition_columns.as_slice())?; for range in partition_ranges { // get row indices for current partition diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index bb8637174c..f75a31b020 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -66,9 +66,9 @@ async fn test_object_store_hdfs() -> TestResult { Ok(()) } -async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) -> TestResult{ +async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) -> TestResult { let context = IntegrationContext::new(integration)?; - + //println!("line 102-{:#?}",context.root_uri()); let delta_store = DeltaTableBuilder::from_uri(&context.root_uri()) @@ -77,7 +77,7 @@ async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) - //println!("{:#?}",delta_store); - let expected = Bytes::from_static(b"test world from delta-rs on friday"); + let expected = Bytes::from_static(b"test world from delta-rs on friday"); delta_store.put(path, expected.clone()).await.unwrap(); let fetched = delta_store.get(path).await.unwrap().bytes().await.unwrap(); From 7340be0d95ae1df37b8edd9b7920627e1261ece6 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 15 Sep 2023 15:40:33 -0700 Subject: [PATCH 11/11] Clean up old unnecessary docstring --- rust/src/storage/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index 6d4bd080e0..fd316b1212 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -212,8 +212,6 @@ impl ObjectStore for DeltaObjectStore { } /// Perform a get request with options - /// - /// Note: options.range will be ignored if [`GetResult::File`] async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { self.storage.get_opts(location, options).await }