Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add aws glue as an iceberg connection type #16824

Merged
merged 11 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ humantime = "2.1.0"
hyper = "1"
hyper-util = { version = "0.1.9", features = ["client", "client-legacy", "tokio", "service"] }
iceberg = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" }
iceberg-catalog-glue = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" }
iceberg-catalog-hms = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" }
iceberg-catalog-rest = { version = "0.3.0", git = "https://github.com/Xuanwo/iceberg-rust/", rev = "fe5df3f" }
indexmap = "2.0.0"
Expand Down
9 changes: 9 additions & 0 deletions src/meta/app/src/schema/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ pub struct HiveCatalogOption {
pub enum IcebergCatalogType {
Rest = 1,
Hms = 2,
Glue = 3,
}

/// Option for creating a iceberg catalog
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum IcebergCatalogOption {
Rest(IcebergRestCatalogOption),
Hms(IcebergHmsCatalogOption),
Glue(IcebergGlueCatalogOption),
}

impl IcebergCatalogOption {
Expand All @@ -92,6 +94,7 @@ impl IcebergCatalogOption {
match self {
IcebergCatalogOption::Rest(_) => IcebergCatalogType::Rest,
IcebergCatalogOption::Hms(_) => IcebergCatalogType::Hms,
IcebergCatalogOption::Glue(_) => IcebergCatalogType::Glue,
}
}
}
Expand All @@ -117,6 +120,12 @@ pub struct IcebergHmsCatalogOption {
pub props: HashMap<String, String>,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct IcebergGlueCatalogOption {
pub warehouse: String,
pub props: HashMap<String, String>,
}

/// Same as `CatalogNameIdent`, but with `serde` support,
/// and can be used a s part of a value.
// #[derive(Clone, Debug, PartialEq, Eq)]
Expand Down
39 changes: 39 additions & 0 deletions src/meta/proto-conv/src/catalog_from_to_protobuf_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ impl FromToProto for mt::IcebergCatalogOption {
pb::iceberg_catalog_option::IcebergCatalogOption::HmsCatalog(v) => {
mt::IcebergCatalogOption::Hms(mt::IcebergHmsCatalogOption::from_pb(v)?)
}
pb::iceberg_catalog_option::IcebergCatalogOption::GlueCatalog(v) => {
mt::IcebergCatalogOption::Glue(mt::IcebergGlueCatalogOption::from_pb(v)?)
}
})
}

Expand All @@ -140,6 +143,9 @@ impl FromToProto for mt::IcebergCatalogOption {
mt::IcebergCatalogOption::Hms(v) => {
pb::iceberg_catalog_option::IcebergCatalogOption::HmsCatalog(v.to_pb()?)
}
mt::IcebergCatalogOption::Glue(v) => {
pb::iceberg_catalog_option::IcebergCatalogOption::GlueCatalog(v.to_pb()?)
}
}),
})
}
Expand Down Expand Up @@ -215,6 +221,39 @@ impl FromToProto for mt::IcebergHmsCatalogOption {
}
}

impl FromToProto for mt::IcebergGlueCatalogOption {
type PB = pb::IcebergGlueCatalogOption;

fn get_pb_ver(p: &Self::PB) -> u64 {
p.ver
}

fn from_pb(p: Self::PB) -> Result<Self, Incompatible>
where Self: Sized {
Ok(Self {
warehouse: p.warehouse,
props: p
.props
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
})
}

fn to_pb(&self) -> Result<Self::PB, Incompatible> {
Ok(pb::IcebergGlueCatalogOption {
ver: VER,
min_reader_ver: MIN_READER_VER,
warehouse: self.warehouse.clone(),
props: self
.props
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
})
}
}

impl FromToProto for mt::HiveCatalogOption {
type PB = pb::HiveCatalogOption;

Expand Down
1 change: 1 addition & 0 deletions src/meta/proto-conv/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[
(108, "2024-08-29: Add: procedure.proto: ProcedureMeta and ProcedureIdentity"),
(109, "2024-08-29: Refactor: ProcedureMeta add arg_names"),
(110, "2024-09-18: Add: database.proto: DatabaseMeta.gc_in_progress"),
(111, "2024-11-13: Add: Enable AWS Glue as an Apache Iceberg type when creating a catalog."),
// Dear developer:
// If you're gonna add a new metadata version, you'll have to add a test for it.
// You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::TimeZone;
use chrono::Utc;
use databend_common_meta_app::schema::CatalogOption;
use databend_common_meta_app::schema::IcebergCatalogOption;
use databend_common_meta_app::schema::IcebergGlueCatalogOption;
use fastrace::func_name;
use std::collections::HashMap;

use crate::common;

// These bytes are built when a new version in introduced,
// and are kept for backward compatibility test.
//
// *************************************************************
// * These messages should never be updated, *
// * only be added when a new version is added, *
// * or be removed when an old version is no longer supported. *
// *************************************************************
//
// The message bytes are built from the output of `test_pb_from_to()`
#[test]
fn test_v111_add_glue_as_iceberg_catalog_option() -> anyhow::Result<()> {
let catalog_meta_v111 = vec![
18, 55, 26, 53, 18, 45, 10, 21, 104, 116, 116, 112, 58, 47, 47, 49, 50, 55, 46, 48, 46, 48,
46, 49, 58, 57, 57, 48, 48, 18, 14, 115, 51, 58, 47, 47, 109, 121, 95, 98, 117, 99, 107,
101, 116, 160, 6, 98, 168, 6, 24, 160, 6, 98, 168, 6, 24, 162, 1, 23, 50, 48, 49, 52, 45,
49, 49, 45, 50, 56, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 160, 6, 98, 168, 6,
24,
];

let mut props = HashMap::new();
props.insert("AWS_KEY_ID".to_string(), "super secure access key".to_string());
props.insert("AWS_SECRET_KEY".to_string(), "even more secure secret key".to_string());
props.insert("REGION".to_string(), "us-east-1 aka anti-multi-availability".to_string());

let want = || databend_common_meta_app::schema::CatalogMeta {
catalog_option: CatalogOption::Iceberg(IcebergGlueCatalogOption::Rest(
IcebergGlueCatalogOption {
address: "http://127.0.0.1:9900".to_string(),
warehouse: "s3://my_bucket".to_string(),
props,
},
)),
created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(),
};

common::test_pb_from_to(func_name!(), want())?;
common::test_load_old(func_name!(), catalog_meta_v111.as_slice(), 111, want())?;

Ok(())
}
9 changes: 9 additions & 0 deletions src/meta/protos/proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ message IcebergCatalogOption {
oneof iceberg_catalog_option {
IcebergRestCatalogOption rest_catalog = 2;
IcebergHmsCatalogOption hms_catalog = 3;
IcebergGlueCatalogOption glue_catalog = 4;
}
}

Expand All @@ -78,6 +79,14 @@ message IcebergHmsCatalogOption {
map<string, string> props = 3;
}

message IcebergGlueCatalogOption {
uint64 ver = 100;
uint64 min_reader_ver = 101;

string warehouse = 1;
map<string, string> props = 2;
}

message ShareCatalogOption {
uint64 ver = 100;
uint64 min_reader_ver = 101;
Expand Down
1 change: 0 additions & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ sqlx = { workspace = true }
strength_reduce = { workspace = true }
sysinfo = { workspace = true }
tempfile = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
toml = { workspace = true, default-features = false }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ impl Interpreter for ShowCreateCatalogInterpreter {
IcebergCatalogOption::Hms(cfg) => {
format!("ADDRESS\n{}\nWAREHOUSE\n{}", cfg.address, cfg.warehouse)
}
IcebergCatalogOption::Glue(cfg) => {
format!("WAREHOUSE\n{}", cfg.warehouse)
}
}),
};

Expand Down
1 change: 0 additions & 1 deletion src/query/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ roaring = { workspace = true }
serde = { workspace = true }
sha2 = { workspace = true }
simsearch = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }

Expand Down
5 changes: 5 additions & 0 deletions src/query/sql/src/planner/binder/ddl/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use databend_common_meta_app::schema::CatalogOption;
use databend_common_meta_app::schema::CatalogType;
use databend_common_meta_app::schema::HiveCatalogOption;
use databend_common_meta_app::schema::IcebergCatalogOption;
use databend_common_meta_app::schema::IcebergGlueCatalogOption;
use databend_common_meta_app::schema::IcebergHmsCatalogOption;
use databend_common_meta_app::schema::IcebergRestCatalogOption;
use databend_common_meta_app::storage::StorageParams;
Expand Down Expand Up @@ -234,6 +235,10 @@ fn parse_iceberg_rest_catalog(
warehouse,
props: HashMap::from_iter(options),
}),
"glue" => IcebergCatalogOption::Glue(IcebergGlueCatalogOption {
warehouse,
props: HashMap::from_iter(options),
}),
v => {
return Err(ErrorCode::InvalidArgument(format!(
"iceberg catalog with type {v} is not supported"
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/common/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ doctest = false
test = true

[dependencies]
arrow = { workspace = true }
databend-common-arrow = { workspace = true }
databend-common-base = { workspace = true }
databend-common-cache = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ databend-storages-common-table-meta = { workspace = true }
fastrace = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-glue = { workspace = true }
iceberg-catalog-hms = { workspace = true }
iceberg-catalog-rest = { workspace = true }
match-template = { workspace = true }
Expand Down
24 changes: 24 additions & 0 deletions src/query/storages/iceberg/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_store::MetaStore;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MetaId;
use iceberg_catalog_glue::GlueCatalog;
use iceberg_catalog_glue::GlueCatalogConfig;
use iceberg_catalog_hms::HmsCatalog;
use iceberg_catalog_hms::HmsCatalogConfig;
use iceberg_catalog_hms::HmsThriftTransport;
Expand Down Expand Up @@ -196,6 +198,28 @@ impl IcebergCatalog {
let ctl = RestCatalog::new(cfg);
Arc::new(ctl)
}
IcebergCatalogOption::Glue(glue) => {
let cfg = GlueCatalogConfig::builder()
.warehouse(glue.warehouse.clone())
.props(
glue.props
.clone()
.into_iter()
.map(|(k, v)| (k.trim_matches('"').to_string(), v))
.collect(),
)
.build();

// Due to the AWS Glue catalog creation being asynchronous, forced to run it a bit different way, so we don't have to make the outer function asynchronous.
let ctl = databend_common_base::runtime::block_on(GlueCatalog::new(cfg)).map_err(
|err| {
ErrorCode::BadArguments(format!(
"There was an error building the AWS Glue catalog: {err:?}"
))
},
)?;
Arc::new(ctl)
}
};

Ok(Self { info, ctl })
Expand Down
9 changes: 9 additions & 0 deletions tests/sqllogictests/suites/glue/queries.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
statement ok
CREATE CATALOG ctl
TYPE = ICEBERG
CONNECTION = (
TYPE = 'glue',
WAREHOUSE = 's3://bucket'
);

SHOW CREATE CATALOG ctl;
Loading