diff --git a/python/Cargo.toml b/python/Cargo.toml index 4d95efc681..3b86b1d663 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -39,7 +39,7 @@ features = ["extension-module", "abi3", "abi3-py37"] [dependencies.deltalake] path = "../rust" version = "0" -features = ["azure", "gcs", "python", "datafusion"] +features = ["azure", "gcs", "python", "datafusion", "unity-experimental"] [features] default = ["rustls"] diff --git a/python/deltalake/data_catalog.py b/python/deltalake/data_catalog.py index 387075c661..b65debcfa3 100644 --- a/python/deltalake/data_catalog.py +++ b/python/deltalake/data_catalog.py @@ -4,4 +4,5 @@ class DataCatalog(Enum): """List of the Data Catalogs""" - AWS = "glue" # Only AWS Glue Data Catalog is available + AWS = "glue" # AWS Glue Data Catalog + UNITY = "unity" # Databricks Unity Catalog diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index 3282481d7d..5390fb1c0f 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -55,7 +55,7 @@ being used. We try to support many of the well-known formats to identify basic s * gs:/// Alternatively, if you have a data catalog you can load it by reference to a -database and table name. Currently only AWS Glue is supported. +database and table name. Currently supported are AWS Glue and Databricks Unity Catalog. For AWS Glue catalog, use AWS environment variables to authenticate. @@ -70,6 +70,19 @@ For AWS Glue catalog, use AWS environment variables to authenticate. >>> dt.to_pyarrow_table().to_pydict() {'id': [5, 7, 9, 5, 6, 7, 8, 9]} +For Databricks Unity Catalog authentication, use environment variables: + * DATABRICKS_WORKSPACE_URL (e.g. https://adb-62800498333851.30.azuredatabricks.net) + * DATABRICKS_ACCESS_TOKEN + +.. code-block:: python + + >>> from deltalake import DataCatalog, DeltaTable + >>> catalog_name = 'main' + >>> schema_name = 'db_schema' + >>> table_name = 'db_table' + >>> data_catalog = DataCatalog.UNITY + >>> dt = DeltaTable.from_data_catalog(data_catalog=data_catalog, data_catalog_id=catalog_name, database_name=schema_name, table_name=table_name) + .. _`s3 options`: https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants .. _`azure options`: https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants .. _`gcs options`: https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants diff --git a/rust/Cargo.toml b/rust/Cargo.toml index a78a618691..531966e618 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -53,6 +53,11 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true # Glue rusoto_glue = { version = "0.47", default-features = false, optional = true } +# Unity +reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "json"], optional = true } +reqwest-middleware = { version = "0.2.1", optional = true } +reqwest-retry = { version = "0.2.2", optional = true } + # Datafusion datafusion = { version = "23", optional = true } datafusion-expr = { version = "23", optional = true } @@ -130,6 +135,11 @@ s3 = [ "object_store/aws", "object_store/aws_profile", ] +unity-experimental = [ + "reqwest", + "reqwest-middleware", + "reqwest-retry", +] [[bench]] name = "read_checkpoint" diff --git a/rust/src/data_catalog/mod.rs b/rust/src/data_catalog/mod.rs index 5570c7e408..cfe4ea8cf1 100644 --- a/rust/src/data_catalog/mod.rs +++ b/rust/src/data_catalog/mod.rs @@ -5,6 +5,9 @@ use std::fmt::Debug; #[cfg(feature = "glue")] pub mod glue; +#[cfg(feature = "unity-experimental")] +pub mod unity; + /// Error enum that represents a CatalogError. #[derive(thiserror::Error, Debug)] pub enum DataCatalogError { @@ -43,6 +46,28 @@ pub enum DataCatalogError { source: rusoto_credential::CredentialsError, }, + /// Error caused by missing environment variable for Unity Catalog. + #[cfg(feature = "unity-experimental")] + #[error("Missing Unity Catalog environment variable: {var_name}")] + MissingEnvVar { + /// Variable name + var_name: String, + }, + + /// Error caused by invalid access token value + #[cfg(feature = "unity-experimental")] + #[error("Invalid Databricks personal access token")] + InvalidAccessToken, + + /// Databricks API client error + #[cfg(feature = "unity-experimental")] + #[error("API client error: {source}")] + APIClientError { + /// The underlying unity::GetTableError + #[from] + source: unity::GetTableError, + }, + /// Error representing an invalid Data Catalog. #[error("This data catalog doesn't exist: {data_catalog}")] InvalidDataCatalog { @@ -74,6 +99,8 @@ pub fn get_data_catalog(data_catalog: &str) -> Result, Data "hdfs" => unimplemented!("HDFS Data Catalog is not implemented"), #[cfg(feature = "glue")] "glue" => Ok(Box::new(glue::GlueDataCatalog::new()?)), + #[cfg(feature = "unity-experimental")] + "unity" => Ok(Box::new(unity::UnityCatalog::new()?)), _ => Err(DataCatalogError::InvalidDataCatalog { data_catalog: data_catalog.to_string(), }), diff --git a/rust/src/data_catalog/unity/mod.rs b/rust/src/data_catalog/unity/mod.rs new file mode 100644 index 0000000000..f3dc3982bf --- /dev/null +++ b/rust/src/data_catalog/unity/mod.rs @@ -0,0 +1,132 @@ +//! Databricks Unity Catalog. +//! +//! This module is gated behind the "unity-experimental" feature. +use super::{DataCatalog, DataCatalogError}; +use reqwest::header; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; +use serde::Deserialize; + +/// Databricks Unity Catalog - implementation of the `DataCatalog` trait +pub struct UnityCatalog { + client_with_retry: ClientWithMiddleware, + workspace_url: String, +} + +impl UnityCatalog { + /// Creates a new UnityCatalog + pub fn new() -> Result { + let token_var = "DATABRICKS_ACCESS_TOKEN"; + let access_token = + std::env::var(token_var).map_err(|_| DataCatalogError::MissingEnvVar { + var_name: token_var.into(), + })?; + + let auth_header_val = header::HeaderValue::from_str(&format!("Bearer {}", &access_token)) + .map_err(|_| DataCatalogError::InvalidAccessToken)?; + + let headers = header::HeaderMap::from_iter([(header::AUTHORIZATION, auth_header_val)]); + let client = reqwest::Client::builder() + .default_headers(headers) + .build()?; + + let retry_policy = ExponentialBackoff::builder().build_with_max_retries(10); + + let client_with_retry = ClientBuilder::new(client) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build(); + + let workspace_var = "DATABRICKS_WORKSPACE_URL"; + let workspace_url = + std::env::var(workspace_var).map_err(|_| DataCatalogError::MissingEnvVar { + var_name: workspace_var.into(), + })?; + + Ok(Self { + client_with_retry, + workspace_url, + }) + } +} + +#[derive(Deserialize)] +#[serde(untagged)] +enum TableResponse { + Success { storage_location: String }, + Error { error_code: String, message: String }, +} + +/// Possible errors from the unity-catalog/tables API call +#[derive(thiserror::Error, Debug)] +pub enum GetTableError { + #[error("GET request error: {source}")] + /// Error from reqwest library + RequestError { + /// The underlying reqwest_middleware::Error + #[from] + source: reqwest_middleware::Error, + }, + + /// Request returned error response + #[error("Invalid table error: {error_code}: {message}")] + InvalidTable { + /// Error code + error_code: String, + /// Error description + message: String, + }, +} + +impl From for DataCatalogError { + fn from(value: reqwest_middleware::Error) -> Self { + value.into() + } +} + +impl From for DataCatalogError { + fn from(value: reqwest::Error) -> Self { + reqwest_middleware::Error::Reqwest(value).into() + } +} + +#[async_trait::async_trait] +impl DataCatalog for UnityCatalog { + /// Get the table storage location from the UnityCatalog + async fn get_table_storage_location( + &self, + catalog_id: Option, + database_name: &str, + table_name: &str, + ) -> Result { + let resp = self + .client_with_retry + .get(format!( + "{}/api/2.1/unity-catalog/tables/{}.{}.{}", + &self.workspace_url, + catalog_id.as_deref().unwrap_or("main"), + &database_name, + &table_name + )) + .send() + .await?; + + let parsed_resp: TableResponse = resp.json().await?; + match parsed_resp { + TableResponse::Success { storage_location } => Ok(storage_location), + TableResponse::Error { + error_code, + message, + } => Err(GetTableError::InvalidTable { + error_code, + message, + } + .into()), + } + } +} + +impl std::fmt::Debug for UnityCatalog { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + write!(fmt, "UnityCatalog") + } +}