diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 870e61ec6..a0a0d76e7 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -36,6 +36,10 @@ pub(crate) enum Storage { /// s3 storage could have `s3://` and `s3a://`. /// Storing the scheme string here to return the correct path. scheme_str: String, + /// uses the same client for one FileIO Storage. + /// + /// TODO: allow users to configure this client. + client: reqwest::Client, config: Arc, }, } @@ -54,6 +58,7 @@ impl Storage { #[cfg(feature = "storage-s3")] Scheme::S3 => Ok(Self::S3 { scheme_str, + client: reqwest::Client::new(), config: super::s3_config_parse(props)?.into(), }), _ => Err(Error::new( @@ -102,8 +107,12 @@ impl Storage { } } #[cfg(feature = "storage-s3")] - Storage::S3 { scheme_str, config } => { - let op = super::s3_config_build(config, path)?; + Storage::S3 { + scheme_str, + client, + config, + } => { + let op = super::s3_config_build(client, config, path)?; let op_info = op.info(); // Check prefix of s3 path. diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index 9abb32103..4374846f9 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -17,8 +17,9 @@ use std::collections::HashMap; +use opendal::raw::HttpClient; use opendal::services::S3Config; -use opendal::Operator; +use opendal::{Configurator, Operator}; use url::Url; use crate::{Error, ErrorKind, Result}; @@ -106,7 +107,11 @@ pub(crate) fn s3_config_parse(mut m: HashMap) -> Result Result { +pub(crate) fn s3_config_build( + client: &reqwest::Client, + cfg: &S3Config, + path: &str, +) -> Result { let url = Url::parse(path)?; let bucket = url.host_str().ok_or_else(|| { Error::new( @@ -115,7 +120,13 @@ pub(crate) fn s3_config_build(cfg: &S3Config, path: &str) -> Result { ) })?; - let mut cfg = cfg.clone(); - cfg.bucket = bucket.to_string(); - Ok(Operator::from_config(cfg)?.finish()) + let builder = cfg + .clone() + .into_builder() + // Set bucket name. + .bucket(bucket) + // Set http client we want to use. + .http_client(HttpClient::with(client.clone())); + + Ok(Operator::new(builder)?.finish()) }