Skip to content

Commit

Permalink
refactor: Upgrade hive_metastore to 0.1 (#409)
Browse files Browse the repository at this point in the history
* refactor: Upgrade hive_metastore to 0.1

Signed-off-by: Xuanwo <github@xuanwo.io>

* format toml

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix typo

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored Jun 19, 2024
1 parent def6114 commit fa7fee9
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 45 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ once_cell = "1"
opendal = "0.47"
ordered-float = "4.0.0"
parquet = "52"
pilota = "0.11.0"
pilota = "0.11.2"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
reqwest = { version = "^0.12", features = ["json"] }
Expand All @@ -86,6 +86,6 @@ typed-builder = "^0.18"
url = "2"
urlencoding = "2"
uuid = "1.6.1"
volo-thrift = "0.9.2"
hive_metastore = "0.0.2"
volo-thrift = "0.10"
hive_metastore = "0.1.0"
tera = "1"
57 changes: 30 additions & 27 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::error::from_io_error;
use crate::error::from_thrift_error;
use crate::error::{from_io_error, from_thrift_exception};

use super::utils::*;
use anyhow::anyhow;
use async_trait::async_trait;
use hive_metastore::ThriftHiveMetastoreClient;
use hive_metastore::ThriftHiveMetastoreClientBuilder;
Expand All @@ -36,7 +37,7 @@ use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
use typed_builder::TypedBuilder;
use volo_thrift::ResponseError;
use volo_thrift::MaybeException;

/// Which variant of the thrift transport to communicate with HMS
/// See: <https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
Expand Down Expand Up @@ -137,7 +138,8 @@ impl Catalog for HmsCatalog {
.0
.get_all_databases()
.await
.map_err(from_thrift_error)?
.map(from_thrift_exception)
.map_err(from_thrift_error)??
};

Ok(dbs
Expand Down Expand Up @@ -195,7 +197,8 @@ impl Catalog for HmsCatalog {
.0
.get_database(name.into())
.await
.map_err(from_thrift_error)?;
.map(from_thrift_exception)
.map_err(from_thrift_error)??;

let ns = convert_to_namespace(&db)?;

Expand All @@ -220,17 +223,16 @@ impl Catalog for HmsCatalog {
let resp = self.client.0.get_database(name.into()).await;

match resp {
Ok(_) => Ok(true),
Err(err) => {
if let ResponseError::UserException(ThriftHiveMetastoreGetDatabaseException::O1(
_,
)) = &err
{
Ok(false)
} else {
Err(from_thrift_error(err))
}
Ok(MaybeException::Ok(_)) => Ok(true),
Ok(MaybeException::Exception(ThriftHiveMetastoreGetDatabaseException::O1(_))) => {
Ok(false)
}
Ok(MaybeException::Exception(exception)) => Err(Error::new(
ErrorKind::Unexpected,
"Operation failed for hitting thrift error".to_string(),
)
.with_source(anyhow!("thrift error: {:?}", exception))),
Err(err) => Err(from_thrift_error(err)),
}
}

Expand Down Expand Up @@ -306,7 +308,8 @@ impl Catalog for HmsCatalog {
.0
.get_all_tables(name.into())
.await
.map_err(from_thrift_error)?;
.map(from_thrift_exception)
.map_err(from_thrift_error)??;

let tables = tables
.iter()
Expand Down Expand Up @@ -397,7 +400,8 @@ impl Catalog for HmsCatalog {
.0
.get_table(db_name.clone().into(), table.name.clone().into())
.await
.map_err(from_thrift_error)?;
.map(from_thrift_exception)
.map_err(from_thrift_error)??;

let metadata_location = get_metadata_location(&hive_table.parameters)?;

Expand Down Expand Up @@ -457,16 +461,14 @@ impl Catalog for HmsCatalog {
.await;

match resp {
Ok(_) => Ok(true),
Err(err) => {
if let ResponseError::UserException(ThriftHiveMetastoreGetTableException::O2(_)) =
&err
{
Ok(false)
} else {
Err(from_thrift_error(err))
}
}
Ok(MaybeException::Ok(_)) => Ok(true),
Ok(MaybeException::Exception(ThriftHiveMetastoreGetTableException::O2(_))) => Ok(false),
Ok(MaybeException::Exception(exception)) => Err(Error::new(
ErrorKind::Unexpected,
"Operation failed for hitting thrift error".to_string(),
)
.with_source(anyhow!("thrift error: {:?}", exception))),
Err(err) => Err(from_thrift_error(err)),
}
}

Expand All @@ -488,7 +490,8 @@ impl Catalog for HmsCatalog {
.0
.get_table(src_dbname.clone().into(), src_tbl_name.clone().into())
.await
.map_err(from_thrift_error)?;
.map(from_thrift_exception)
.map_err(from_thrift_error)??;

tbl.db_name = Some(dest_dbname.into());
tbl.table_name = Some(dest_tbl_name.into());
Expand Down
20 changes: 16 additions & 4 deletions crates/catalog/hms/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,31 @@ use anyhow::anyhow;
use iceberg::{Error, ErrorKind};
use std::fmt::Debug;
use std::io;
use volo_thrift::MaybeException;

/// Format a thrift error into iceberg error.
pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) -> Error
where
T: Debug,
{
///
/// Please only throw this error when you are sure that the error is caused by thrift.
pub fn from_thrift_error(error: impl std::error::Error) -> Error {
Error::new(
ErrorKind::Unexpected,
"Operation failed for hitting thrift error".to_string(),
)
.with_source(anyhow!("thrift error: {:?}", error))
}

/// Format a thrift exception into iceberg error.
pub fn from_thrift_exception<T, E: Debug>(value: MaybeException<T, E>) -> Result<T, Error> {
match value {
MaybeException::Ok(v) => Ok(v),
MaybeException::Exception(err) => Err(Error::new(
ErrorKind::Unexpected,
"Operation failed for hitting thrift error".to_string(),
)
.with_source(anyhow!("thrift error: {:?}", err))),
}
}

/// Format an io error into iceberg error.
pub fn from_io_error(error: io::Error) -> Error {
Error::new(
Expand Down
26 changes: 15 additions & 11 deletions crates/catalog/hms/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ pub(crate) fn convert_to_namespace(database: &Database) -> Result<Namespace> {
properties.insert(HMS_DB_OWNER.to_string(), owner.to_string());
};

if let Some(owner_type) = &database.owner_type {
let value = match owner_type {
PrincipalType::User => "User",
PrincipalType::Group => "Group",
PrincipalType::Role => "Role",
if let Some(owner_type) = database.owner_type {
let value = if owner_type == PrincipalType::USER {
"User"
} else if owner_type == PrincipalType::GROUP {
"Group"
} else if owner_type == PrincipalType::ROLE {
"Role"
} else {
unreachable!("Invalid owner type")
};

properties.insert(HMS_DB_OWNER_TYPE.to_string(), value.to_string());
Expand Down Expand Up @@ -117,9 +121,9 @@ pub(crate) fn convert_to_database(
HMS_DB_OWNER => db.owner_name = Some(v.clone().into()),
HMS_DB_OWNER_TYPE => {
let owner_type = match v.to_lowercase().as_str() {
"user" => PrincipalType::User,
"group" => PrincipalType::Group,
"role" => PrincipalType::Role,
"user" => PrincipalType::USER,
"group" => PrincipalType::GROUP,
"role" => PrincipalType::ROLE,
_ => {
return Err(Error::new(
ErrorKind::DataInvalid,
Expand All @@ -144,7 +148,7 @@ pub(crate) fn convert_to_database(
// https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveHadoopUtil.java#L44
if db.owner_name.is_none() {
db.owner_name = Some(HMS_DEFAULT_DB_OWNER.into());
db.owner_type = Some(PrincipalType::User);
db.owner_type = Some(PrincipalType::USER);
}

Ok(db)
Expand Down Expand Up @@ -504,7 +508,7 @@ mod tests {
assert_eq!(db.name, Some(FastStr::from("my_namespace")));
assert_eq!(db.description, Some(FastStr::from("my_description")));
assert_eq!(db.owner_name, Some(FastStr::from("apache")));
assert_eq!(db.owner_type, Some(PrincipalType::User));
assert_eq!(db.owner_type, Some(PrincipalType::USER));

if let Some(params) = db.parameters {
assert_eq!(params.get("key1"), Some(&FastStr::from("value1")));
Expand All @@ -522,7 +526,7 @@ mod tests {

assert_eq!(db.name, Some(FastStr::from("my_namespace")));
assert_eq!(db.owner_name, Some(FastStr::from(HMS_DEFAULT_DB_OWNER)));
assert_eq!(db.owner_type, Some(PrincipalType::User));
assert_eq!(db.owner_type, Some(PrincipalType::USER));

Ok(())
}
Expand Down

0 comments on commit fa7fee9

Please sign in to comment.