From d0ea3bc48c57872ebb19be909f3e65b9b2896a36 Mon Sep 17 00:00:00 2001 From: Jan Noha Date: Mon, 1 May 2023 00:01:59 +0200 Subject: [PATCH 1/3] added support for Databricks Unity Catalog --- python/Cargo.toml | 2 +- python/deltalake/data_catalog.py | 3 +- python/docs/source/usage.rst | 15 +++- rust/Cargo.toml | 10 +++ rust/src/data_catalog/mod.rs | 27 +++++++ rust/src/data_catalog/unity/mod.rs | 125 +++++++++++++++++++++++++++++ 6 files changed, 179 insertions(+), 3 deletions(-) create mode 100644 rust/src/data_catalog/unity/mod.rs diff --git a/python/Cargo.toml b/python/Cargo.toml index 4d95efc681..da5c04ca4a 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -39,7 +39,7 @@ features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] path = "../rust" version = "0" -features = ["azure", "gcs", "python", "datafusion"] +features = ["azure", "gcs", "python", "datafusion", "unity"] [features] default = ["rustls"] diff --git a/python/deltalake/data_catalog.py b/python/deltalake/data_catalog.py index 387075c661..e3ce1645f9 100644 --- a/python/deltalake/data_catalog.py +++ b/python/deltalake/data_catalog.py @@ -4,4 +4,5 @@ class DataCatalog(Enum): """List of the Data Catalogs""" - AWS = "glue" # Only AWS Glue Data Catalog is available + AWS = "glue" # AWS Glue Data Catalog + UNITY = "unity" # Databricks Unity Catalog diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index 3282481d7d..5390fb1c0f 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -55,7 +55,7 @@ being used. We try to support many of the well-known formats to identify basic s * gs:/// Alternatively, if you have a data catalog you can load it by reference to a -database and table name. Currently only AWS Glue is supported. +database and table name. Currently supported are AWS Glue and Databricks Unity Catalog. For AWS Glue catalog, use AWS environment variables to authenticate. @@ -70,6 +70,19 @@ For AWS Glue catalog, use AWS environment variables to authenticate. >>> dt.to_pyarrow_table().to_pydict() {'id': [5, 7, 9, 5, 6, 7, 8, 9]} +For Databricks Unity Catalog authentication, use environment variables: + * DATABRICKS_WORKSPACE_URL (e.g. https://adb-62800498333851.30.azuredatabricks.net) + * DATABRICKS_ACCESS_TOKEN + +.. code-block:: python + + >>> from deltalake import DataCatalog, DeltaTable + >>> catalog_name = 'main' + >>> schema_name = 'db_schema' + >>> table_name = 'db_table' + >>> data_catalog = DataCatalog.UNITY + >>> dt = DeltaTable.from_data_catalog(data_catalog=data_catalog, data_catalog_id=catalog_name, database_name=schema_name, table_name=table_name) + .. _`s3 options`: https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants .. _`azure options`: https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants .. _`gcs options`: https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants diff --git a/rust/Cargo.toml b/rust/Cargo.toml index a78a618691..d302d21cf8 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -53,6 +53,11 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true # Glue rusoto_glue = { version = "0.47", default-features = false, optional = true } +# Unity +reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "json"], optional = true } +reqwest-middleware = { version = "0.2.1", optional = true } +reqwest-retry = { version = "0.2.2", optional = true } + # Datafusion datafusion = { version = "23", optional = true } datafusion-expr = { version = "23", optional = true } @@ -130,6 +135,11 @@ s3 = [ "object_store/aws", "object_store/aws_profile", ] +unity = [ + "reqwest", + "reqwest-middleware", + "reqwest-retry", +] [[bench]] name = "read_checkpoint" diff --git a/rust/src/data_catalog/mod.rs b/rust/src/data_catalog/mod.rs index 5570c7e408..6ae79fa055 100644 --- a/rust/src/data_catalog/mod.rs +++ b/rust/src/data_catalog/mod.rs @@ -5,6 +5,9 @@ use std::fmt::Debug; #[cfg(feature = "glue")] pub mod glue; +#[cfg(feature = "unity")] +pub mod unity; + /// Error enum that represents a CatalogError. #[derive(thiserror::Error, Debug)] pub enum DataCatalogError { @@ -43,6 +46,28 @@ pub enum DataCatalogError { source: rusoto_credential::CredentialsError, }, + /// Error caused by missing environment variable for Unity Catalog. + #[cfg(feature = "unity")] + #[error("Missing Unity Catalog environment variable: {var_name}")] + MissingEnvVar { + /// Variable name + var_name: String, + }, + + /// Error caused by invalid access token value + #[cfg(feature = "unity")] + #[error("Invalid Databricks personal access token")] + InvalidAccessToken, + + /// Databricks API client error + #[cfg(feature = "unity")] + #[error("API client error: {source}")] + APIClientError { + /// The underlying unity::GetTableError + #[from] + source: unity::GetTableError, + }, + /// Error representing an invalid Data Catalog. #[error("This data catalog doesn't exist: {data_catalog}")] InvalidDataCatalog { @@ -74,6 +99,8 @@ pub fn get_data_catalog(data_catalog: &str) -> Result, Data "hdfs" => unimplemented!("HDFS Data Catalog is not implemented"), #[cfg(feature = "glue")] "glue" => Ok(Box::new(glue::GlueDataCatalog::new()?)), + #[cfg(feature = "unity")] + "unity" => Ok(Box::new(unity::UnityCatalog::new()?)), _ => Err(DataCatalogError::InvalidDataCatalog { data_catalog: data_catalog.to_string(), }), diff --git a/rust/src/data_catalog/unity/mod.rs b/rust/src/data_catalog/unity/mod.rs new file mode 100644 index 0000000000..85943a0282 --- /dev/null +++ b/rust/src/data_catalog/unity/mod.rs @@ -0,0 +1,125 @@ +//! Databricks Unity Catalog. +//! +//! This module is gated behind the "unity" feature. +use super::{DataCatalog, DataCatalogError}; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; +use reqwest::header; +use serde::Deserialize; + +/// Databricks Unity Catalog - implementation of the `DataCatalog` trait +pub struct UnityCatalog { + client_with_retry: ClientWithMiddleware, + workspace_url: String, +} + +impl UnityCatalog { + /// Creates a new UnityCatalog + pub fn new() -> Result { + let token_var = "DATABRICKS_ACCESS_TOKEN"; + let access_token = std::env::var(token_var).map_err(|_| + DataCatalogError::MissingEnvVar { var_name: token_var.into() })?; + + let auth_header_val = header::HeaderValue::from_str( + &format!("Bearer {}", &access_token) + ).map_err(|_| DataCatalogError::InvalidAccessToken)?; + + let headers = header::HeaderMap::from_iter([ + (header::AUTHORIZATION, auth_header_val), + ]); + let client = reqwest::Client::builder() + .default_headers(headers).build()?; + + let retry_policy = ExponentialBackoff::builder() + .build_with_max_retries(10); + + let client_with_retry = ClientBuilder::new(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build(); + + let workspace_var = "DATABRICKS_WORKSPACE_URL"; + let workspace_url = std::env::var(workspace_var).map_err(|_| + DataCatalogError::MissingEnvVar { var_name: workspace_var.into() })?; + + Ok(Self{ + client_with_retry, + workspace_url, + }) + } +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum TableResponse { + Success { storage_location: String }, + Error { error_code: String, message: String }, +} + +/// Possible errors from the unity-catalog/tables API call +#[derive(thiserror::Error, Debug)] +pub enum GetTableError { + #[error("GET request error: {source}")] + /// Error from reqwest library + RequestError { + /// The underlying reqwest_middleware::Error + #[from] + source: reqwest_middleware::Error, + }, + + /// Request returned error response + #[error("Invalid table error: {error_code}: {message}")] + InvalidTable { + /// Error code + error_code: String, + /// Error description + message: String, + } +} + +impl From for DataCatalogError { + fn from(value: reqwest_middleware::Error) -> Self { + value.into() + } +} + +impl From for DataCatalogError { + fn from(value: reqwest::Error) -> Self { + reqwest_middleware::Error::Reqwest(value).into() + } +} + +#[async_trait::async_trait] +impl DataCatalog for UnityCatalog { + /// Get the table storage location from the UnityCatalog + async fn get_table_storage_location( + &self, + catalog_id: Option, + database_name: &str, + table_name: &str, + ) -> Result { + let resp = self.client_with_retry.get(format!( + "{}/api/2.1/unity-catalog/tables/{}.{}.{}", + &self.workspace_url, + catalog_id.as_deref().unwrap_or("main"), + &database_name, + &table_name + )).send().await?; + + let parsed_resp: TableResponse = resp.json().await?; + match parsed_resp { + TableResponse::Success { storage_location } => + Ok(storage_location), + TableResponse::Error { error_code, message } => + Err(GetTableError::InvalidTable { + error_code, + message + }.into()), + } + } +} + +impl std::fmt::Debug for UnityCatalog { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "UnityCatalog") + } +} From bc787bb82e59a9bbeaf7c0c65051aa7ed62c5a2d Mon Sep 17 00:00:00 2001 From: Jan Noha Date: Thu, 4 May 2023 02:00:28 +0200 Subject: [PATCH 2/3] fixed source code formatting --- python/deltalake/data_catalog.py | 2 +- rust/src/data_catalog/unity/mod.rs | 69 ++++++++++++++++-------------- 2 files changed, 39 insertions(+), 32 deletions(-) diff --git a/python/deltalake/data_catalog.py b/python/deltalake/data_catalog.py index e3ce1645f9..b65debcfa3 100644 --- a/python/deltalake/data_catalog.py +++ b/python/deltalake/data_catalog.py @@ -5,4 +5,4 @@ class DataCatalog(Enum): """List of the Data Catalogs""" AWS = "glue" # AWS Glue Data Catalog - UNITY = "unity" # Databricks Unity Catalog + UNITY = "unity" # Databricks Unity Catalog diff --git a/rust/src/data_catalog/unity/mod.rs b/rust/src/data_catalog/unity/mod.rs index 85943a0282..11b7cd7382 100644 --- a/rust/src/data_catalog/unity/mod.rs +++ b/rust/src/data_catalog/unity/mod.rs @@ -2,9 +2,9 @@ //! //! This module is gated behind the "unity" feature. use super::{DataCatalog, DataCatalogError}; -use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; -use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; use reqwest::header; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use serde::Deserialize; /// Databricks Unity Catalog - implementation of the `DataCatalog` trait @@ -17,31 +17,32 @@ impl UnityCatalog { /// Creates a new UnityCatalog pub fn new() -> Result { let token_var = "DATABRICKS_ACCESS_TOKEN"; - let access_token = std::env::var(token_var).map_err(|_| - DataCatalogError::MissingEnvVar { var_name: token_var.into() })?; + let access_token = + std::env::var(token_var).map_err(|_| DataCatalogError::MissingEnvVar { + var_name: token_var.into(), + })?; - let auth_header_val = header::HeaderValue::from_str( - &format!("Bearer {}", &access_token) - ).map_err(|_| DataCatalogError::InvalidAccessToken)?; + let auth_header_val = header::HeaderValue::from_str(&format!("Bearer {}", &access_token)) + .map_err(|_| DataCatalogError::InvalidAccessToken)?; - let headers = header::HeaderMap::from_iter([ - (header::AUTHORIZATION, auth_header_val), - ]); + let headers = header::HeaderMap::from_iter([(header::AUTHORIZATION, auth_header_val)]); let client = reqwest::Client::builder() - .default_headers(headers).build()?; + .default_headers(headers) + .build()?; - let retry_policy = ExponentialBackoff::builder() - .build_with_max_retries(10); + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(10); let client_with_retry = ClientBuilder::new(client) .with(RetryTransientMiddleware::new_with_policy(retry_policy)) .build(); let workspace_var = "DATABRICKS_WORKSPACE_URL"; - let workspace_url = std::env::var(workspace_var).map_err(|_| - DataCatalogError::MissingEnvVar { var_name: workspace_var.into() })?; + let workspace_url = + std::env::var(workspace_var).map_err(|_| DataCatalogError::MissingEnvVar { + var_name: workspace_var.into(), + })?; - Ok(Self{ + Ok(Self { client_with_retry, workspace_url, }) @@ -73,7 +74,7 @@ pub enum GetTableError { error_code: String, /// Error description message: String, - } + }, } impl From for DataCatalogError { @@ -97,23 +98,29 @@ impl DataCatalog for UnityCatalog { database_name: &str, table_name: &str, ) -> Result { - let resp = self.client_with_retry.get(format!( - "{}/api/2.1/unity-catalog/tables/{}.{}.{}", - &self.workspace_url, - catalog_id.as_deref().unwrap_or("main"), - &database_name, - &table_name - )).send().await?; + let resp = self + .client_with_retry + .get(format!( + "{}/api/2.1/unity-catalog/tables/{}.{}.{}", + &self.workspace_url, + catalog_id.as_deref().unwrap_or("main"), + &database_name, + &table_name + )) + .send() + .await?; let parsed_resp: TableResponse = resp.json().await?; match parsed_resp { - TableResponse::Success { storage_location } => - Ok(storage_location), - TableResponse::Error { error_code, message } => - Err(GetTableError::InvalidTable { - error_code, - message - }.into()), + TableResponse::Success { storage_location } => Ok(storage_location), + TableResponse::Error { + error_code, + message, + } => Err(GetTableError::InvalidTable { + error_code, + message, + } + .into()), } } } From 5e06d4211e730f20a17f059a3a60dbfa2c2d3e55 Mon Sep 17 00:00:00 2001 From: Jan Noha Date: Fri, 5 May 2023 23:20:32 +0200 Subject: [PATCH 3/3] renamed unity feature flag to unity-experimental --- python/Cargo.toml | 2 +- rust/Cargo.toml | 2 +- rust/src/data_catalog/mod.rs | 10 +++++----- rust/src/data_catalog/unity/mod.rs | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index da5c04ca4a..3b86b1d663 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -39,7 +39,7 @@ features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] path = "../rust" version = "0" -features = ["azure", "gcs", "python", "datafusion", "unity"] +features = ["azure", "gcs", "python", "datafusion", "unity-experimental"] [features] default = ["rustls"] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index d302d21cf8..531966e618 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -135,7 +135,7 @@ s3 = [ "object_store/aws", "object_store/aws_profile", ] -unity = [ +unity-experimental = [ "reqwest", "reqwest-middleware", "reqwest-retry", diff --git a/rust/src/data_catalog/mod.rs b/rust/src/data_catalog/mod.rs index 6ae79fa055..cfe4ea8cf1 100644 --- a/rust/src/data_catalog/mod.rs +++ b/rust/src/data_catalog/mod.rs @@ -5,7 +5,7 @@ use std::fmt::Debug; #[cfg(feature = "glue")] pub mod glue; -#[cfg(feature = "unity")] +#[cfg(feature = "unity-experimental")] pub mod unity; /// Error enum that represents a CatalogError. @@ -47,7 +47,7 @@ pub enum DataCatalogError { }, /// Error caused by missing environment variable for Unity Catalog. - #[cfg(feature = "unity")] + #[cfg(feature = "unity-experimental")] #[error("Missing Unity Catalog environment variable: {var_name}")] MissingEnvVar { /// Variable name @@ -55,12 +55,12 @@ pub enum DataCatalogError { }, /// Error caused by invalid access token value - #[cfg(feature = "unity")] + #[cfg(feature = "unity-experimental")] #[error("Invalid Databricks personal access token")] InvalidAccessToken, /// Databricks API client error - #[cfg(feature = "unity")] + #[cfg(feature = "unity-experimental")] #[error("API client error: {source}")] APIClientError { /// The underlying unity::GetTableError @@ -99,7 +99,7 @@ pub fn get_data_catalog(data_catalog: &str) -> Result, Data "hdfs" => unimplemented!("HDFS Data Catalog is not implemented"), #[cfg(feature = "glue")] "glue" => Ok(Box::new(glue::GlueDataCatalog::new()?)), - #[cfg(feature = "unity")] + #[cfg(feature = "unity-experimental")] "unity" => Ok(Box::new(unity::UnityCatalog::new()?)), _ => Err(DataCatalogError::InvalidDataCatalog { data_catalog: data_catalog.to_string(), diff --git a/rust/src/data_catalog/unity/mod.rs b/rust/src/data_catalog/unity/mod.rs index 11b7cd7382..f3dc3982bf 100644 --- a/rust/src/data_catalog/unity/mod.rs +++ b/rust/src/data_catalog/unity/mod.rs @@ -1,6 +1,6 @@ //! Databricks Unity Catalog. //! -//! This module is gated behind the "unity" feature. +//! This module is gated behind the "unity-experimental" feature. use super::{DataCatalog, DataCatalogError}; use reqwest::header; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};