From e6d61dd9e56cc9f908d6e766604acb2bc5c56757 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Fri, 19 Jul 2024 14:51:22 +1000 Subject: [PATCH] c --- crates/polars-io/Cargo.toml | 12 +- .../polars-io/src/cloud/object_store_setup.rs | 9 +- crates/polars-io/src/cloud/options.rs | 141 +++++++++++++----- .../polars-io/src/path_utils/hugging_face.rs | 48 +++--- crates/polars-io/src/path_utils/mod.rs | 2 +- py-polars/polars/io/csv/functions.py | 8 +- py-polars/polars/io/ipc/functions.py | 6 +- py-polars/polars/io/parquet/functions.py | 23 +-- py-polars/src/lazyframe/mod.rs | 6 +- 9 files changed, 178 insertions(+), 77 deletions(-) diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index c8f7e268022b..c99cc6e407f9 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -111,7 +111,17 @@ async = [ "polars-error/regex", "polars-parquet?/async", ] -cloud = ["object_store", "async", "polars-error/object_store", "url", "serde_json", "serde", "file_cache", "reqwest"] +cloud = [ + "object_store", + "async", + "polars-error/object_store", + "url", + "serde_json", + "serde", + "file_cache", + "reqwest", + "http", +] file_cache = ["async", "dep:blake3", "dep:fs4"] aws = ["object_store/aws", "cloud", "reqwest"] azure = ["object_store/azure", "cloud"] diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index d70abb421005..705e48a77f14 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -67,7 +67,6 @@ pub async fn build_object_store( } } - #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))] let options = options.map(std::borrow::Cow::Borrowed).unwrap_or_default(); let cloud_type = CloudType::from_url(&parsed)?; @@ -111,16 +110,14 @@ pub async fn build_object_store( allow_cache = false; #[cfg(feature = "http")] { - let store = object_store::http::HttpBuilder::new() - .with_url(url) - .with_client_options(super::get_client_options()) - .build()?; - Ok::<_, PolarsError>(Arc::new(store) as Arc) + let store = options.build_http(url)?; + PolarsResult::Ok(Arc::new(store) as Arc) } } #[cfg(not(feature = "http"))] return err_missing_feature("http", &cloud_location.scheme); }, + CloudType::Hf => panic!("impl error: unresolved hf:// path"), }?; if allow_cache { let mut cache = OBJECT_STORE_CACHE.write().await; diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index b5d970a81562..c9958f4b745e 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -27,6 +27,8 @@ use polars_error::*; use polars_utils::cache::FastFixedCache; #[cfg(feature = "aws")] use regex::Regex; +#[cfg(feature = "http")] +use reqwest::header::HeaderMap; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; #[cfg(feature = "aws")] @@ -54,6 +56,19 @@ static BUCKET_REGION: Lazy = Vec<(T, String)>; +#[derive(Clone, Debug, PartialEq, Hash, Eq)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub(crate) enum CloudConfig { + #[cfg(feature = "aws")] + Aws(Configs), + #[cfg(feature = "azure")] + Azure(Configs), + #[cfg(feature = "gcp")] + Gcp(Configs), + #[cfg(feature = "http")] + Http { headers: Vec<(String, String)> }, +} + #[derive(Clone, Debug, PartialEq, Hash, Eq)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] /// Options to connect to various cloud providers. @@ -61,12 +76,7 @@ pub struct CloudOptions { pub max_retries: usize, #[cfg(feature = "file_cache")] pub file_cache_ttl: u64, - #[cfg(feature = "aws")] - aws: Option>, - #[cfg(feature = "azure")] - azure: Option>, - #[cfg(feature = "gcp")] - gcp: Option>, + pub(crate) config: Option, } impl Default for CloudOptions { @@ -75,16 +85,29 @@ impl Default for CloudOptions { max_retries: 2, #[cfg(feature = "file_cache")] file_cache_ttl: get_env_file_cache_ttl(), - #[cfg(feature = "aws")] - aws: Default::default(), - #[cfg(feature = "azure")] - azure: Default::default(), - #[cfg(feature = "gcp")] - gcp: Default::default(), + config: None, } } } +#[cfg(feature = "http")] +pub(crate) fn try_build_http_header_map_from_items_slice>( + headers: &[(S, S)], +) -> PolarsResult { + use reqwest::header::{HeaderName, HeaderValue}; + + let mut map = HeaderMap::with_capacity(headers.len()); + for (k, v) in headers { + let (k, v) = (k.as_ref(), v.as_ref()); + map.insert( + HeaderName::from_str(k).map_err(to_compute_err)?, + HeaderValue::from_str(v).map_err(to_compute_err)?, + ); + } + + Ok(map) +} + #[allow(dead_code)] /// Parse an untype configuration hashmap to a typed configuration for the given configuration key type. fn parsed_untyped_config, impl Into)>>( @@ -112,6 +135,7 @@ pub enum CloudType { File, Gcp, Http, + Hf, } impl CloudType { @@ -123,6 +147,7 @@ impl CloudType { "gs" | "gcp" | "gcs" => Self::Gcp, "file" => Self::File, "http" | "https" => Self::Http, + "hf" => Self::Hf, _ => polars_bail!(ComputeError: "unknown url scheme"), }) } @@ -225,21 +250,20 @@ impl CloudOptions { mut self, configs: I, ) -> Self { - self.aws = Some( - configs - .into_iter() - .map(|(k, v)| (k, v.into())) - .collect::>(), - ); + self.config = Some(CloudConfig::Aws( + configs.into_iter().map(|(k, v)| (k, v.into())).collect(), + )); self } /// Build the [`object_store::ObjectStore`] implementation for AWS. #[cfg(feature = "aws")] pub async fn build_aws(&self, url: &str) -> PolarsResult { - let options = self.aws.as_ref(); let mut builder = AmazonS3Builder::from_env().with_url(url); - if let Some(options) = options { + if let Some(options) = &self.config { + let CloudConfig::Aws(options) = options else { + panic!("impl error: cloud type mismatch") + }; for (key, value) in options.iter() { builder = builder.with_config(*key, value); } @@ -328,21 +352,20 @@ impl CloudOptions { mut self, configs: I, ) -> Self { - self.azure = Some( - configs - .into_iter() - .map(|(k, v)| (k, v.into())) - .collect::>(), - ); + self.config = Some(CloudConfig::Azure( + configs.into_iter().map(|(k, v)| (k, v.into())).collect(), + )); self } /// Build the [`object_store::ObjectStore`] implementation for Azure. #[cfg(feature = "azure")] pub fn build_azure(&self, url: &str) -> PolarsResult { - let options = self.azure.as_ref(); let mut builder = MicrosoftAzureBuilder::from_env(); - if let Some(options) = options { + if let Some(options) = &self.config { + let CloudConfig::Azure(options) = options else { + panic!("impl error: cloud type mismatch") + }; for (key, value) in options.iter() { builder = builder.with_config(*key, value); } @@ -362,21 +385,20 @@ impl CloudOptions { mut self, configs: I, ) -> Self { - self.gcp = Some( - configs - .into_iter() - .map(|(k, v)| (k, v.into())) - .collect::>(), - ); + self.config = Some(CloudConfig::Gcp( + configs.into_iter().map(|(k, v)| (k, v.into())).collect(), + )); self } /// Build the [`object_store::ObjectStore`] implementation for GCP. #[cfg(feature = "gcp")] pub fn build_gcp(&self, url: &str) -> PolarsResult { - let options = self.gcp.as_ref(); let mut builder = GoogleCloudStorageBuilder::from_env(); - if let Some(options) = options { + if let Some(options) = &self.config { + let CloudConfig::Gcp(options) = options else { + panic!("impl error: cloud type mismatch") + }; for (key, value) in options.iter() { builder = builder.with_config(*key, value); } @@ -390,6 +412,23 @@ impl CloudOptions { .map_err(to_compute_err) } + #[cfg(feature = "http")] + pub fn build_http(&self, url: &str) -> PolarsResult { + object_store::http::HttpBuilder::new() + .with_url(url) + .with_client_options({ + let mut opts = super::get_client_options(); + if let Some(CloudConfig::Http { headers }) = &self.config { + opts = opts.with_default_headers(try_build_http_header_map_from_items_slice( + headers.as_slice(), + )?); + } + opts + }) + .build() + .map_err(to_compute_err) + } + /// Parse a configuration from a Hashmap. This is the interface from Python. #[allow(unused_variables)] pub fn from_untyped_config, impl Into)>>( @@ -432,6 +471,36 @@ impl CloudOptions { polars_bail!(ComputeError: "'gcp' feature is not enabled"); } }, + CloudType::Hf => { + #[cfg(feature = "http")] + { + let mut this = Self::default(); + + if let Ok(v) = std::env::var("HF_TOKEN") { + this.config = Some(CloudConfig::Http { + headers: vec![("Authorization".into(), format!("Bearer {}", v))], + }) + } + + for (i, (k, v)) in config.into_iter().enumerate() { + let (k, v) = (k.as_ref(), v.into()); + + if i == 0 && k == "token" { + this.config = Some(CloudConfig::Http { + headers: vec![("Authorization".into(), format!("Bearer {}", v))], + }) + } else { + polars_bail!(ComputeError: "unknown configuration key: {}", k) + } + } + + Ok(this) + } + #[cfg(not(feature = "http"))] + { + polars_bail!(ComputeError: "'http' feature is not enabled"); + } + }, } } } diff --git a/crates/polars-io/src/path_utils/hugging_face.rs b/crates/polars-io/src/path_utils/hugging_face.rs index f8f7854f2498..4900233e4b1e 100644 --- a/crates/polars-io/src/path_utils/hugging_face.rs +++ b/crates/polars-io/src/path_utils/hugging_face.rs @@ -3,9 +3,12 @@ use std::collections::VecDeque; use std::path::PathBuf; -use polars_error::{polars_bail, to_compute_err, PolarsResult}; +use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult}; -use crate::cloud::{extract_prefix_expansion, Matcher}; +use crate::cloud::{ + extract_prefix_expansion, try_build_http_header_map_from_items_slice, CloudConfig, + CloudOptions, Matcher, +}; use crate::path_utils::HiveIdxTracker; use crate::pl_async::with_concurrency_budget; @@ -198,14 +201,25 @@ impl<'a> GetPages<'a> { pub(super) async fn expand_paths_hf( paths: &[PathBuf], check_directory_level: bool, + cloud_options: Option<&CloudOptions>, ) -> PolarsResult<(usize, Vec)> { assert!(!paths.is_empty()); - let client = &reqwest::ClientBuilder::new() - .http1_only() - .https_only(true) - .build() - .unwrap(); + let client = reqwest::ClientBuilder::new().http1_only().https_only(true); + + let client = if let Some(CloudOptions { + config: Some(CloudConfig::Http { headers }), + .. + }) = cloud_options + { + client.default_headers(try_build_http_header_map_from_items_slice( + headers.as_slice(), + )?) + } else { + client + }; + + let client = &client.build().unwrap(); let mut out_paths = vec![]; let mut stack = VecDeque::new(); @@ -263,26 +277,26 @@ pub(super) async fn expand_paths_hf( client, }; + fn try_parse_api_response(bytes: &[u8]) -> PolarsResult> { + serde_json::from_slice::>(bytes).map_err( + |e| polars_err!(ComputeError: "failed to parse API response as JSON: error: {}, value: {}", e, std::str::from_utf8(bytes).unwrap()), + ) + } + if let Some(matcher) = expansion_matcher { while let Some(bytes) = gp.next().await { let bytes = bytes?; let bytes = bytes.as_ref(); - entries.extend( - serde_json::from_slice::>(bytes) - .map_err(to_compute_err)? - .into_iter() - .filter(|x| { - matcher.is_matching(x.path.as_str()) && (!x.is_file() || x.size > 0) - }), - ); + entries.extend(try_parse_api_response(bytes)?.into_iter().filter(|x| { + matcher.is_matching(x.path.as_str()) && (!x.is_file() || x.size > 0) + })); } } else { while let Some(bytes) = gp.next().await { let bytes = bytes?; let bytes = bytes.as_ref(); entries.extend( - serde_json::from_slice::>(bytes) - .map_err(to_compute_err)? + try_parse_api_response(bytes)? .into_iter() .filter(|x| !x.is_file() || x.size > 0), ); diff --git a/crates/polars-io/src/path_utils/mod.rs b/crates/polars-io/src/path_utils/mod.rs index 54ea10d27f26..3fc484a06104 100644 --- a/crates/polars-io/src/path_utils/mod.rs +++ b/crates/polars-io/src/path_utils/mod.rs @@ -151,7 +151,7 @@ pub fn expand_paths_hive( if first_path.starts_with("hf://") { let (expand_start_idx, paths) = crate::pl_async::get_runtime().block_on_potential_spawn( - hugging_face::expand_paths_hf(paths, check_directory_level), + hugging_face::expand_paths_hf(paths, check_directory_level, cloud_options), )?; return Ok((Arc::from(paths), expand_start_idx)); diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index 34b603174d61..3b0fa4cf808a 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -962,7 +962,9 @@ def scan_csv( Parameters ---------- source - Path to a file. + Path(s) to a file or directory + When needing to authenticate for scanning cloud locations, see the + `storage_options` parameter. has_header Indicate if the first row of the dataset is a header or not. If set to False, column names will be autogenerated in the following format: `column_x`, with @@ -1054,8 +1056,6 @@ def scan_csv( Expand path given via globbing rules. storage_options Options that indicate how to connect to a cloud provider. - If the cloud provider is not supported by Polars, the storage options - are passed to `fsspec.open()`. The cloud providers currently supported are AWS, GCP, and Azure. See supported keys here: @@ -1063,6 +1063,8 @@ def scan_csv( * `aws `_ * `gcp `_ * `azure `_ + * Hugging Face (`hf://`): Accepts an API key under the `token` parameter: \ + `{'token': '...'}`, or by setting the `HF_TOKEN` environment variable. If `storage_options` is not provided, Polars will try to infer the information from environment variables. diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index e8765c015f32..c07e32790a7d 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -333,7 +333,9 @@ def scan_ipc( Parameters ---------- source - Path to a IPC file. + Path(s) to a file or directory + When needing to authenticate for scanning cloud locations, see the + `storage_options` parameter. n_rows Stop reading from IPC file after reading `n_rows`. cache @@ -354,6 +356,8 @@ def scan_ipc( * `aws `_ * `gcp `_ * `azure `_ + * Hugging Face (`hf://`): Accepts an API key under the `token` parameter: \ + `{'token': '...'}`, or by setting the `HF_TOKEN` environment variable. If `storage_options` is not provided, Polars will try to infer the information from environment variables. diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index f8c46092cf94..99084afa38d0 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -59,12 +59,14 @@ def read_parquet( Parameters ---------- source - Path to a file or a file-like object (by "file-like object" we refer to objects + Path(s) to a file or directory + When needing to authenticate for scanning cloud locations, see the + `storage_options` parameter. + + File-like objects are supported (by "file-like object" we refer to objects that have a `read()` method, such as a file handler like the builtin `open` - function, or a `BytesIO` instance). If the path is a directory, files in that - directory will all be read. - For file-like objects, - stream position may not be updated accordingly after reading. + function, or a `BytesIO` instance) For file-like objects, stream position + may not be updated accordingly after reading. columns Columns to select. Accepts a list of column indices (starting at zero) or a list of column names. @@ -106,8 +108,6 @@ def read_parquet( Reduce memory pressure at the expense of performance. storage_options Options that indicate how to connect to a cloud provider. - If the cloud provider is not supported by Polars, the storage options - are passed to `fsspec.open()`. The cloud providers currently supported are AWS, GCP, and Azure. See supported keys here: @@ -115,6 +115,8 @@ def read_parquet( * `aws `_ * `gcp `_ * `azure `_ + * Hugging Face (`hf://`): Accepts an API key under the `token` parameter: \ + `{'token': '...'}`, or by setting the `HF_TOKEN` environment variable. If `storage_options` is not provided, Polars will try to infer the information from environment variables. @@ -320,8 +322,9 @@ def scan_parquet( Parameters ---------- source - Path(s) to a file - If a single path is given, it can be a globbing pattern. + Path(s) to a file or directory + When needing to authenticate for scanning cloud locations, see the + `storage_options` parameter. n_rows Stop reading from parquet file after reading `n_rows`. row_index_name @@ -365,6 +368,8 @@ def scan_parquet( * `aws `_ * `gcp `_ * `azure `_ + * Hugging Face (`hf://`): Accepts an API key under the `token` parameter: \ + `{'token': '...'}`, or by setting the `HF_TOKEN` environment variable. If `storage_options` is not provided, Polars will try to infer the information from environment variables. diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index e2f1708689f7..1dc86db3905a 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -158,7 +158,7 @@ impl PyLazyFrame { let mut cloud_options = if let Some(opts) = cloud_options { parse_cloud_options(&first_path_url, opts)? } else { - Default::default() + parse_cloud_options(&first_path_url, vec![])? }; cloud_options = cloud_options.with_max_retries(retries); @@ -271,7 +271,7 @@ impl PyLazyFrame { let mut cloud_options = if let Some(opts) = cloud_options { parse_cloud_options(&first_path_url, opts)? } else { - Default::default() + parse_cloud_options(&first_path_url, vec![])? }; cloud_options = cloud_options.with_max_retries(retries); @@ -352,7 +352,7 @@ impl PyLazyFrame { let mut cloud_options = if let Some(opts) = cloud_options { parse_cloud_options(&first_path_url, opts)? } else { - Default::default() + parse_cloud_options(&first_path_url, vec![])? }; cloud_options = cloud_options.with_max_retries(retries);