Skip to content

Commit

Permalink
Merge branch 'main' into fix-num-rows-stat
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw authored Nov 21, 2024
2 parents 4883f0c + eff5735 commit 2311974
Show file tree
Hide file tree
Showing 42 changed files with 766 additions and 128 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
run: make check-rust

test-minimal:
name: Python Build (Python 3.8 PyArrow 16.0.0)
name: Python Build (Python 3.9 PyArrow 16.0.0)
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-C debuginfo=line-tables-only"
Expand All @@ -43,7 +43,7 @@ jobs:
- name: Setup Environment
uses: ./.github/actions/setup-env
with:
python-version: 3.8
python-version: 3.9

- name: Build and install deltalake
run: |
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:

strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v3
Expand Down
48 changes: 24 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,34 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "=0.3.0" }
delta_kernel = { version = "0.3.1" }
# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" }

# arrow
arrow = { version = "52" }
arrow-arith = { version = "52" }
arrow-array = { version = "52", features = ["chrono-tz"] }
arrow-buffer = { version = "52" }
arrow-cast = { version = "52" }
arrow-ipc = { version = "52" }
arrow-json = { version = "52" }
arrow-ord = { version = "52" }
arrow-row = { version = "52" }
arrow-schema = { version = "52" }
arrow-select = { version = "52" }
object_store = { version = "0.10.1" }
parquet = { version = "52" }
arrow = { version = "53" }
arrow-arith = { version = "53" }
arrow-array = { version = "53", features = ["chrono-tz"] }
arrow-buffer = { version = "53" }
arrow-cast = { version = "53" }
arrow-ipc = { version = "53" }
arrow-json = { version = "53" }
arrow-ord = { version = "53" }
arrow-row = { version = "53" }
arrow-schema = { version = "53" }
arrow-select = { version = "53" }
object_store = { version = "0.11" }
parquet = { version = "53" }

# datafusion
datafusion = { version = "41" }
datafusion-expr = { version = "41" }
datafusion-common = { version = "41" }
datafusion-proto = { version = "41" }
datafusion-sql = { version = "41" }
datafusion-physical-expr = { version = "41" }
datafusion-physical-plan = { version = "41" }
datafusion-functions = { version = "41" }
datafusion-functions-aggregate = { version = "41" }
datafusion = { version = "43" }
datafusion-expr = { version = "43" }
datafusion-common = { version = "43" }
datafusion-proto = { version = "43" }
datafusion-sql = { version = "43" }
datafusion-physical-expr = { version = "43" }
datafusion-physical-plan = { version = "43" }
datafusion-functions = { version = "43" }
datafusion-functions-aggregate = { version = "43" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand All @@ -64,7 +64,7 @@ bytes = { version = "1" }
chrono = { version = ">0.4.34", default-features = false, features = ["clock"] }
tracing = { version = "0.1", features = ["log"] }
regex = { version = "1" }
thiserror = { version = "1" }
thiserror = { version = "2" }
url = { version = "2" }
urlencoding = "2.1.3"
uuid = { version = "1" }
Expand Down
4 changes: 2 additions & 2 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.4.1"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.21.0", path = "../core" }
deltalake-core = { version = "0.22.0", path = "../core" }
aws-smithy-runtime-api = { version="1.7" }
aws-smithy-runtime = { version="1.7", optional = true}
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
Expand Down
1 change: 1 addition & 0 deletions crates/aws/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD";
/// The list of option keys owned by the S3 module.
/// Option keys not contained in this list will be added to the `extra_opts`
/// field of [crate::storage::s3::S3StorageOptions].
#[allow(deprecated)]
pub const S3_OPTS: &[&str] = &[
AWS_ENDPOINT_URL,
AWS_ENDPOINT_URL_DYNAMODB,
Expand Down
101 changes: 98 additions & 3 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::meta::credentials::CredentialsProviderChain;
Expand All @@ -19,6 +20,7 @@ use deltalake_core::storage::object_store::{
};
use deltalake_core::storage::StorageOptions;
use deltalake_core::DeltaResult;
use tokio::sync::Mutex;
use tracing::log::*;

use crate::constants;
Expand All @@ -27,12 +29,21 @@ use crate::constants;
/// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3]
#[derive(Clone, Debug)]
pub(crate) struct AWSForObjectStore {
/// TODO: replace this with something with a credential cache instead of the sdkConfig
sdk_config: SdkConfig,
cache: Arc<Mutex<Option<Credentials>>>,
}

impl AWSForObjectStore {
pub(crate) fn new(sdk_config: SdkConfig) -> Self {
Self { sdk_config }
let cache = Arc::new(Mutex::new(None));
Self { sdk_config, cache }
}

/// Return true if a credential has been cached
async fn has_cached_credentials(&self) -> bool {
let guard = self.cache.lock().await;
(*guard).is_some()
}
}

Expand All @@ -43,10 +54,34 @@ impl CredentialProvider for AWSForObjectStore {
/// Provide the necessary configured credentials from the AWS SDK for use by
/// [object_store::aws::AmazonS3]
async fn get_credential(&self) -> ObjectStoreResult<Arc<Self::Credential>> {
debug!("AWSForObjectStore is unlocking..");
let mut guard = self.cache.lock().await;

if let Some(cached) = guard.as_ref() {
debug!("Located cached credentials");
let now = SystemTime::now();

// Credentials such as assume role credentials will have an expiry on them, whereas
// environmental provided credentials will *not*. In the latter case, it's still
// useful avoid running through the provider chain again, so in both cases we should
// still treat credentials as useful
if cached.expiry().unwrap_or(now) >= now {
debug!("Cached credentials are still valid, returning");
return Ok(Arc::new(Self::Credential {
key_id: cached.access_key_id().into(),
secret_key: cached.secret_access_key().into(),
token: cached.session_token().map(|o| o.to_string()),
}));
} else {
debug!("Cached credentials appear to no longer be valid, re-resolving");
}
}

let provider = self
.sdk_config
.credentials_provider()
.ok_or(ObjectStoreError::NotImplemented)?;

let credentials =
provider
.provide_credentials()
Expand All @@ -60,11 +95,15 @@ impl CredentialProvider for AWSForObjectStore {
credentials.access_key_id()
);

Ok(Arc::new(Self::Credential {
let result = Ok(Arc::new(Self::Credential {
key_id: credentials.access_key_id().into(),
secret_key: credentials.secret_access_key().into(),
token: credentials.session_token().map(|o| o.to_string()),
}))
}));

// Update the mutex before exiting with the new Credentials from the AWS provider
*guard = Some(credentials);
return result;
}
}

Expand Down Expand Up @@ -324,4 +363,60 @@ mod tests {
panic!("Could not retrieve credentials from the SdkConfig: {config:?}");
}
}

#[tokio::test]
async fn test_object_store_credential_provider() -> DeltaResult<()> {
let options = StorageOptions(hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
});
let sdk_config = resolve_credentials(options)
.await
.expect("Failed to resolve credentijals for the test");
let provider = AWSForObjectStore::new(sdk_config);
let _credential = provider
.get_credential()
.await
.expect("Failed to produce a credential");
Ok(())
}

/// The [CredentialProvider] is called _repeatedly_ by the [object_store] create, in essence on
/// every get/put/list/etc operation, the `get_credential` function will be invoked.
///
/// In some cases, such as when assuming roles, this can result in an excessive amount of STS
/// API calls in the scenarios where the delta-rs process is performing a large number of S3
/// operations.
#[tokio::test]
async fn test_object_store_credential_provider_consistency() -> DeltaResult<()> {
let options = StorageOptions(hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
});
let sdk_config = resolve_credentials(options)
.await
.expect("Failed to resolve credentijals for the test");
let provider = AWSForObjectStore::new(sdk_config);
let credential_a = provider
.get_credential()
.await
.expect("Failed to produce a credential");

assert!(
provider.has_cached_credentials().await,
"The provider should have cached the credential on the first call!"
);

let credential_b = provider
.get_credential()
.await
.expect("Failed to produce a credential");

assert_ne!(
Arc::as_ptr(&credential_a),
Arc::as_ptr(&credential_b),
"Repeated calls to get_credential() produced different results!"
);
Ok(())
}
}
7 changes: 6 additions & 1 deletion crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,15 @@ mod tests {
);
std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name");
std::env::set_var(
#[allow(deprecated)]
constants::AWS_S3_ASSUME_ROLE_ARN,
"arn:aws:iam::123456789012:role/some_role",
);
std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name");
std::env::set_var(
#[allow(deprecated)]
constants::AWS_S3_ROLE_SESSION_NAME,
"session_name",
);
std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file");

let options = S3StorageOptions::try_default().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-azure"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.21.0", path = "../core", features = [
deltalake-core = { version = "0.22.0", path = "../core", features = [
"datafusion",
] }
lazy_static = "1"
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-glue"
version = "0.5.0"
version = "0.6.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -15,7 +15,7 @@ rust-version.workspace = true
async-trait = { workspace = true }
aws-config = "1"
aws-sdk-glue = "1"
deltalake-core = { version = "0.21.0", path = "../core" }
deltalake-core = { version = "0.22.0", path = "../core" }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.21.0"
version = "0.22.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const DELTA_LOG_FOLDER: &str = "_delta_log";
///
/// assuming it contains valid deltalake data, i.e a `_delta_log` folder:
/// s3://host.example.com:3000/data/tpch/customer/_delta_log/
#[derive(Debug)]
pub struct ListingSchemaProvider {
authority: String,
/// Underlying object store
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::data_catalog::models::ListSchemasResponse;
use crate::DeltaTableBuilder;

/// In-memory list of catalogs populated by unity catalog
#[derive(Debug)]
pub struct UnityCatalogList {
/// Collection of catalogs containing schemas and ultimately TableProviders
pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
Expand Down Expand Up @@ -73,6 +74,7 @@ impl CatalogProviderList for UnityCatalogList {
}

/// A datafusion [`CatalogProvider`] backed by Databricks UnityCatalog
#[derive(Debug)]
pub struct UnityCatalogProvider {
/// Parent catalog for schemas of interest.
pub schemas: DashMap<String, Arc<dyn SchemaProvider>>,
Expand Down Expand Up @@ -124,6 +126,7 @@ impl CatalogProvider for UnityCatalogProvider {
}

/// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog
#[derive(Debug)]
pub struct UnitySchemaProvider {
/// UnityCatalog Api client
client: Arc<UnityCatalog>,
Expand Down
Loading

0 comments on commit 2311974

Please sign in to comment.