Skip to content

Commit

Permalink
fix: add support for Microsoft OneLake
Browse files Browse the repository at this point in the history
This change introduces tests and support for Microsoft OneLake. This specific
commit is a rebase of the work done by our pals at Microsoft.

Co-authored-by: Mohammed Muddassir <v-mmuddassir@microsoft.com>
Co-authored-by: Christopher Watford <christopher.watford@kcftech.com>
  • Loading branch information
2 people authored and rtyler committed Sep 20, 2023
1 parent 6746dd4 commit 6eef77b
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 0 deletions.
2 changes: 2 additions & 0 deletions rust/src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ pub(crate) fn configure_store(
try_configure_s3(url, options)
} else if host.contains("dfs.core.windows.net")
|| host.contains("blob.core.windows.net")
|| host.contains("dfs.fabric.microsoft.com")
|| host.contains("blob.fabric.microsoft.com")
{
try_configure_azure(url, options)
} else {
Expand Down
51 changes: 51 additions & 0 deletions rust/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use chrono::Utc;
use fs_extra::dir::{copy, CopyOptions};
use object_store::DynObjectStore;
use serde_json::json;
use std::env;
use std::sync::Arc;
use tempdir::TempDir;

Expand Down Expand Up @@ -33,8 +34,29 @@ impl IntegrationContext {
// create a fresh bucket in every context. THis is done via CLI...
let bucket = match integration {
StorageIntegration::Local => tmp_dir.as_ref().to_str().unwrap().to_owned(),
StorageIntegration::Onelake => {
let account_name =
env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake"));
let container_name =
env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs"));
format!(
"{0}.dfs.fabric.microsoft.com/{1}",
account_name, container_name
)
}
StorageIntegration::OnelakeAbfs => {
let account_name =
env::var("AZURE_STORAGE_ACCOUNT_NAME").unwrap_or(String::from("onelake"));
let container_name =
env::var("AZURE_STORAGE_CONTAINER_NAME").unwrap_or(String::from("delta-rs"));
format!(
"{0}@{1}.dfs.fabric.microsoft.com",
container_name, account_name
)
}
_ => format!("test-delta-table-{}", Utc::now().timestamp()),
};

if let StorageIntegration::Google = integration {
gs_cli::prepare_env();
let base_url = std::env::var("GOOGLE_BASE_URL")?;
Expand All @@ -46,10 +68,13 @@ impl IntegrationContext {
account_path.as_path().to_str().unwrap(),
);
}

integration.create_bucket(&bucket)?;
let store_uri = match integration {
StorageIntegration::Amazon => format!("s3://{}", &bucket),
StorageIntegration::Microsoft => format!("az://{}", &bucket),
StorageIntegration::Onelake => format!("https://{}", &bucket),
StorageIntegration::OnelakeAbfs => format!("abfss://{}", &bucket),
StorageIntegration::Google => format!("gs://{}", &bucket),
StorageIntegration::Local => format!("file://{}", &bucket),
StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &bucket),
Expand Down Expand Up @@ -84,6 +109,8 @@ impl IntegrationContext {
match self.integration {
StorageIntegration::Amazon => format!("s3://{}", &self.bucket),
StorageIntegration::Microsoft => format!("az://{}", &self.bucket),
StorageIntegration::Onelake => format!("https://{}", &self.bucket),
StorageIntegration::OnelakeAbfs => format!("abfss://{}", &self.bucket),
StorageIntegration::Google => format!("gs://{}", &self.bucket),
StorageIntegration::Local => format!("file://{}", &self.bucket),
StorageIntegration::Hdfs => format!("hdfs://localhost:9000/{}", &self.bucket),
Expand Down Expand Up @@ -149,6 +176,8 @@ impl Drop for IntegrationContext {
StorageIntegration::Google => {
gs_cli::delete_bucket(&self.bucket).unwrap();
}
StorageIntegration::Onelake => (),
StorageIntegration::OnelakeAbfs => (),
StorageIntegration::Local => (),
StorageIntegration::Hdfs => {
hdfs_cli::delete_dir(&self.bucket).unwrap();
Expand All @@ -161,17 +190,21 @@ impl Drop for IntegrationContext {
pub enum StorageIntegration {
Amazon,
Microsoft,
Onelake,
Google,
Local,
Hdfs,
OnelakeAbfs,
}

impl StorageIntegration {
fn prepare_env(&self) {
match self {
Self::Microsoft => az_cli::prepare_env(),
Self::Onelake => onelake_cli::prepare_env(),
Self::Amazon => s3_cli::prepare_env(),
Self::Google => gs_cli::prepare_env(),
Self::OnelakeAbfs => onelake_cli::prepare_env(),
Self::Local => (),
Self::Hdfs => (),
}
Expand All @@ -183,6 +216,8 @@ impl StorageIntegration {
az_cli::create_container(name)?;
Ok(())
}
Self::Onelake => Ok(()),
Self::OnelakeAbfs => Ok(()),
Self::Amazon => {
s3_cli::create_bucket(format!("s3://{}", name.as_ref()))?;
set_env_if_not_set(
Expand Down Expand Up @@ -264,6 +299,22 @@ pub fn set_env_if_not_set(key: impl AsRef<str>, value: impl AsRef<str>) {
};
}

//cli for onelake
pub mod onelake_cli {
use super::set_env_if_not_set;
/// prepare_env
pub fn prepare_env() {
let token = "jwt-token";
set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "0");
set_env_if_not_set("AZURE_STORAGE_ACCOUNT_NAME", "daily-onelake");
set_env_if_not_set(
"AZURE_STORAGE_CONTAINER_NAME",
"86bc63cf-5086-42e0-b16d-6bc580d1dc87",
);
set_env_if_not_set("AZURE_STORAGE_TOKEN", token);
}
}

/// small wrapper around az cli
pub mod az_cli {
use super::set_env_if_not_set;
Expand Down
43 changes: 43 additions & 0 deletions rust/tests/integration_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ async fn test_object_store_azure() -> TestResult {
Ok(())
}

#[cfg(feature = "azure")]
#[tokio::test]
#[serial]
async fn test_object_store_onelake() -> TestResult {
let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv");
read_write_test_onelake(StorageIntegration::Onelake, &path).await?;
Ok(())
}

#[cfg(feature = "azure")]
#[tokio::test]
#[serial]
async fn test_object_store_onelake_abfs() -> TestResult {
let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv");
read_write_test_onelake(StorageIntegration::OnelakeAbfs, &path).await?;
Ok(())
}

#[cfg(feature = "s3")]
#[tokio::test]
#[serial]
Expand All @@ -48,6 +66,31 @@ async fn test_object_store_hdfs() -> TestResult {
Ok(())
}

async fn read_write_test_onelake(integration: StorageIntegration, path: &Path) -> TestResult {
let context = IntegrationContext::new(integration)?;

//println!("line 102-{:#?}",context.root_uri());

let delta_store = DeltaTableBuilder::from_uri(&context.root_uri())
.with_allow_http(true)
.build_storage()?;

//println!("{:#?}",delta_store);

let expected = Bytes::from_static(b"test world from delta-rs on friday");

delta_store.put(path, expected.clone()).await.unwrap();
let fetched = delta_store.get(path).await.unwrap().bytes().await.unwrap();
assert_eq!(expected, fetched);

for range in [0..10, 3..5, 0..expected.len()] {
let data = delta_store.get_range(path, range.clone()).await.unwrap();
assert_eq!(&data[..], &expected[range])
}

Ok(())
}

async fn test_object_store(integration: StorageIntegration, skip_copy: bool) -> TestResult {
let context = IntegrationContext::new(integration)?;
let delta_store = DeltaTableBuilder::from_uri(&context.root_uri())
Expand Down

0 comments on commit 6eef77b

Please sign in to comment.