Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added support for Databricks Unity Catalog #1331

Merged
merged 3 commits into from
May 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ features = ["extension-module", "abi3", "abi3-py37"]
[dependencies.deltalake]
path = "../rust"
version = "0"
features = ["azure", "gcs", "python", "datafusion"]
features = ["azure", "gcs", "python", "datafusion", "unity-experimental"]

[features]
default = ["rustls"]
Expand Down
3 changes: 2 additions & 1 deletion python/deltalake/data_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
class DataCatalog(Enum):
"""List of the Data Catalogs"""

AWS = "glue" # Only AWS Glue Data Catalog is available
AWS = "glue" # AWS Glue Data Catalog
UNITY = "unity" # Databricks Unity Catalog
15 changes: 14 additions & 1 deletion python/docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ being used. We try to support many of the well-known formats to identify basic s
* gs://<bucket>/<path>

Alternatively, if you have a data catalog you can load it by reference to a
database and table name. Currently only AWS Glue is supported.
database and table name. Currently supported are AWS Glue and Databricks Unity Catalog.

For AWS Glue catalog, use AWS environment variables to authenticate.

Expand All @@ -70,6 +70,19 @@ For AWS Glue catalog, use AWS environment variables to authenticate.
>>> dt.to_pyarrow_table().to_pydict()
{'id': [5, 7, 9, 5, 6, 7, 8, 9]}

For Databricks Unity Catalog authentication, use environment variables:
* DATABRICKS_WORKSPACE_URL (e.g. https://adb-62800498333851.30.azuredatabricks.net)
* DATABRICKS_ACCESS_TOKEN

.. code-block:: python

>>> from deltalake import DataCatalog, DeltaTable
>>> catalog_name = 'main'
>>> schema_name = 'db_schema'
>>> table_name = 'db_table'
>>> data_catalog = DataCatalog.UNITY
>>> dt = DeltaTable.from_data_catalog(data_catalog=data_catalog, data_catalog_id=catalog_name, database_name=schema_name, table_name=table_name)

.. _`s3 options`: https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants
.. _`azure options`: https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants
.. _`gcs options`: https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants
Expand Down
10 changes: 10 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true
# Glue
rusoto_glue = { version = "0.47", default-features = false, optional = true }

# Unity
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "json"], optional = true }
reqwest-middleware = { version = "0.2.1", optional = true }
reqwest-retry = { version = "0.2.2", optional = true }

# Datafusion
datafusion = { version = "23", optional = true }
datafusion-expr = { version = "23", optional = true }
Expand Down Expand Up @@ -130,6 +135,11 @@ s3 = [
"object_store/aws",
"object_store/aws_profile",
]
unity-experimental = [
"reqwest",
"reqwest-middleware",
"reqwest-retry",
]

[[bench]]
name = "read_checkpoint"
Expand Down
27 changes: 27 additions & 0 deletions rust/src/data_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use std::fmt::Debug;
#[cfg(feature = "glue")]
pub mod glue;

#[cfg(feature = "unity-experimental")]
pub mod unity;

/// Error enum that represents a CatalogError.
#[derive(thiserror::Error, Debug)]
pub enum DataCatalogError {
Expand Down Expand Up @@ -43,6 +46,28 @@ pub enum DataCatalogError {
source: rusoto_credential::CredentialsError,
},

/// Error caused by missing environment variable for Unity Catalog.
#[cfg(feature = "unity-experimental")]
#[error("Missing Unity Catalog environment variable: {var_name}")]
MissingEnvVar {
/// Variable name
var_name: String,
},

/// Error caused by invalid access token value
#[cfg(feature = "unity-experimental")]
#[error("Invalid Databricks personal access token")]
InvalidAccessToken,

/// Databricks API client error
#[cfg(feature = "unity-experimental")]
#[error("API client error: {source}")]
APIClientError {
/// The underlying unity::GetTableError
#[from]
source: unity::GetTableError,
},

/// Error representing an invalid Data Catalog.
#[error("This data catalog doesn't exist: {data_catalog}")]
InvalidDataCatalog {
Expand Down Expand Up @@ -74,6 +99,8 @@ pub fn get_data_catalog(data_catalog: &str) -> Result<Box<dyn DataCatalog>, Data
"hdfs" => unimplemented!("HDFS Data Catalog is not implemented"),
#[cfg(feature = "glue")]
"glue" => Ok(Box::new(glue::GlueDataCatalog::new()?)),
#[cfg(feature = "unity-experimental")]
"unity" => Ok(Box::new(unity::UnityCatalog::new()?)),
_ => Err(DataCatalogError::InvalidDataCatalog {
data_catalog: data_catalog.to_string(),
}),
Expand Down
132 changes: 132 additions & 0 deletions rust/src/data_catalog/unity/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//! Databricks Unity Catalog.
//!
//! This module is gated behind the "unity-experimental" feature.
use super::{DataCatalog, DataCatalogError};
use reqwest::header;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use serde::Deserialize;

/// Databricks Unity Catalog - implementation of the `DataCatalog` trait
pub struct UnityCatalog {
client_with_retry: ClientWithMiddleware,
workspace_url: String,
}

impl UnityCatalog {
/// Creates a new UnityCatalog
pub fn new() -> Result<Self, DataCatalogError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For our clients etc, we usually adopt a builder pattern, that allows us to configure options on the client before building the client. We would then have some methos like UnitiCatalogClientBuilder::from_env() to support parsing the configuration from the environment.

That said, haven't worked with our catalogs for a long time, so not sure if we properly pass things through right now to make that work. IF not, this maybe something we want to address in a a follow-up.

let token_var = "DATABRICKS_ACCESS_TOKEN";
let access_token =
std::env::var(token_var).map_err(|_| DataCatalogError::MissingEnvVar {
var_name: token_var.into(),
})?;

let auth_header_val = header::HeaderValue::from_str(&format!("Bearer {}", &access_token))
.map_err(|_| DataCatalogError::InvalidAccessToken)?;

let headers = header::HeaderMap::from_iter([(header::AUTHORIZATION, auth_header_val)]);
let client = reqwest::Client::builder()
.default_headers(headers)
.build()?;

let retry_policy = ExponentialBackoff::builder().build_with_max_retries(10);

let client_with_retry = ClientBuilder::new(client)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();

let workspace_var = "DATABRICKS_WORKSPACE_URL";
let workspace_url =
std::env::var(workspace_var).map_err(|_| DataCatalogError::MissingEnvVar {
var_name: workspace_var.into(),
})?;

Ok(Self {
client_with_retry,
workspace_url,
})
}
}

#[derive(Deserialize)]
#[serde(untagged)]
enum TableResponse {
Success { storage_location: String },
Error { error_code: String, message: String },
}

/// Possible errors from the unity-catalog/tables API call
#[derive(thiserror::Error, Debug)]
pub enum GetTableError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using these error variants somewhere?

While still very much a long term things, we have been trying to reduce the number of error variants we expose to users in this crate and also make them more actionable. Maybe we can make some common errors explicit - like 403?

Copy link
Contributor Author

@nohajc nohajc May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it in a similar way to the existing AWS glue module but I understand it's not perfect.

Didn't want to start refactoring existing code either. At least not yet.

#[error("GET request error: {source}")]
/// Error from reqwest library
RequestError {
/// The underlying reqwest_middleware::Error
#[from]
source: reqwest_middleware::Error,
},

/// Request returned error response
#[error("Invalid table error: {error_code}: {message}")]
InvalidTable {
/// Error code
error_code: String,
/// Error description
message: String,
},
}

impl From<reqwest_middleware::Error> for DataCatalogError {
fn from(value: reqwest_middleware::Error) -> Self {
value.into()
}
}

impl From<reqwest::Error> for DataCatalogError {
fn from(value: reqwest::Error) -> Self {
reqwest_middleware::Error::Reqwest(value).into()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit convoluted. We are wrapping the actual error in another error to then convert it again. Not sure how many variants the middleware error has, but maybe its viable to unpack the middleware error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be only two variants, unpacking sounds like a good idea. I was just trying to solve a type checking error, didn't give it much thought...

Copy link
Contributor Author

@nohajc nohajc May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the middleware error is defined like this:

#[derive(Error, Debug)]
pub enum Error {
    /// There was an error running some middleware
    #[error("Middleware error: {0}")]
    Middleware(#[from] anyhow::Error),
    /// Error from the underlying reqwest client
    #[error("Request error: {0}")]
    Reqwest(#[from] reqwest::Error),
}

I could flatten the hierarchy by adding the same variants into GetTableError but both still represent RequestError so I don't like it much. Basically, the conversion was introduced only because of the reqwest::Client retry wrapper. Otherwise I would have just the one reqwest::Error type.

Copy link
Contributor Author

@nohajc nohajc May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could also convert reqwest::Error to anyhow::Error but then I'm losing type information... Not sure it's even good practice to return anyhow from library code. It makes sense for the middleware itself but mixing it with the underlying request error seems wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it would be nice to unify some of the DataCatalogError variants across implementations but I'd prefer to do it separately.

}
}

#[async_trait::async_trait]
impl DataCatalog for UnityCatalog {
/// Get the table storage location from the UnityCatalog
async fn get_table_storage_location(
&self,
catalog_id: Option<String>,
database_name: &str,
table_name: &str,
) -> Result<String, DataCatalogError> {
let resp = self
.client_with_retry
.get(format!(
"{}/api/2.1/unity-catalog/tables/{}.{}.{}",
&self.workspace_url,
catalog_id.as_deref().unwrap_or("main"),
&database_name,
&table_name
))
.send()
.await?;

let parsed_resp: TableResponse = resp.json().await?;
match parsed_resp {
TableResponse::Success { storage_location } => Ok(storage_location),
TableResponse::Error {
error_code,
message,
} => Err(GetTableError::InvalidTable {
error_code,
message,
}
.into()),
}
}
}

impl std::fmt::Debug for UnityCatalog {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
write!(fmt, "UnityCatalog")
}
}