Skip to content

Commit

Permalink
refactor(rust): Use ObjectStore instead of AsyncRead in parquet get m…
Browse files Browse the repository at this point in the history
…etadata (#15069)
  • Loading branch information
mickvangelderen authored Mar 15, 2024
1 parent ec04150 commit 4933040
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 302 deletions.
144 changes: 12 additions & 132 deletions crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,134 +2,17 @@
//! This is used, for example, by the [parquet2] crate.
//!
//! [parquet2]: https://crates.io/crates/parquet2
use std::io::{self};
use std::pin::Pin;
use std::task::Poll;

use bytes::Bytes;
use futures::executor::block_on;
use futures::future::BoxFuture;
use futures::{AsyncRead, AsyncSeek, Future, TryFutureExt};
use std::sync::Arc;

use object_store::path::Path;
use object_store::MultipartId;
use polars_error::to_compute_err;
use object_store::{MultipartId, ObjectStore};
use polars_error::{to_compute_err, PolarsResult};
use tokio::io::{AsyncWrite, AsyncWriteExt};

use super::*;
use super::CloudOptions;
use crate::pl_async::get_runtime;

type OptionalFuture = Option<BoxFuture<'static, std::io::Result<Bytes>>>;

/// Adaptor to translate from AsyncSeek and AsyncRead to the object_store get_range API.
pub struct CloudReader {
// The current position in the stream, it is set by seeking and updated by reading bytes.
pos: u64,
// The total size of the object is required when seeking from the end of the file.
length: Option<u64>,
// Hold an reference to the store in a thread safe way.
object_store: Arc<dyn ObjectStore>,
// The path in the object_store of the current object being read.
path: Path,
// If a read is pending then `active` will point to its future.
active: OptionalFuture,
}

impl CloudReader {
pub fn new(length: Option<u64>, object_store: Arc<dyn ObjectStore>, path: Path) -> Self {
Self {
pos: 0,
length,
object_store,
path,
active: None,
}
}

/// For each read request we create a new future.
async fn read_operation(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
length: usize,
) -> std::task::Poll<std::io::Result<Bytes>> {
let start = self.pos as usize;

// If we already have a future just poll it.
if let Some(fut) = self.active.as_mut() {
return Future::poll(fut.as_mut(), cx);
}

// Create the future.
let future = {
let path = self.path.clone();
let object_store = self.object_store.clone();
// Use an async move block to get our owned objects.
async move {
object_store
.get_range(&path, start..start + length)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("object store error {e:?}"),
)
})
.await
}
};
// Prepare for next read.
self.pos += length as u64;

let mut future = Box::pin(future);

// Need to poll it once to get the pump going.
let polled = Future::poll(future.as_mut(), cx);

// Save for next time.
self.active = Some(future);
polled
}
}

impl AsyncRead for CloudReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<std::io::Result<usize>> {
// Use block_on in order to get the future result in this thread and copy the data in the output buffer.
// With this approach we keep ownership of the buffer and we don't have to pass it to the future runtime.
match block_on(self.read_operation(cx, buf.len())) {
Poll::Ready(Ok(bytes)) => {
buf.copy_from_slice(bytes.as_ref());
Poll::Ready(Ok(bytes.len()))
},
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}

impl AsyncSeek for CloudReader {
fn poll_seek(
mut self: Pin<&mut Self>,
_: &mut std::task::Context<'_>,
pos: io::SeekFrom,
) -> std::task::Poll<std::io::Result<u64>> {
match pos {
io::SeekFrom::Start(pos) => self.pos = pos,
io::SeekFrom::End(pos) => {
let length = self.length.ok_or::<io::Error>(io::Error::new(
std::io::ErrorKind::Other,
"Cannot seek from end of stream when length is unknown.",
))?;
self.pos = (length as i64 + pos) as u64
},
io::SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
};
self.active = None;
std::task::Poll::Ready(Ok(self.pos))
}
}

/// Adaptor which wraps the asynchronous interface of [ObjectStore::put_multipart](https://docs.rs/object_store/latest/object_store/trait.ObjectStore.html#tymethod.put_multipart)
/// exposing a synchronous interface which implements `std::io::Write`.
///
Expand All @@ -156,16 +39,13 @@ impl CloudWriter {
object_store: Arc<dyn ObjectStore>,
path: Path,
) -> PolarsResult<Self> {
let build_result = Self::build_writer(&object_store, &path).await;
match build_result {
Err(error) => Err(PolarsError::from(error)),
Ok((multipart_id, writer)) => Ok(CloudWriter {
object_store,
path,
multipart_id,
writer,
}),
}
let (multipart_id, writer) = Self::build_writer(&object_store, &path).await?;
Ok(CloudWriter {
object_store,
path,
multipart_id,
writer,
})
}

/// Constructs a new CloudWriter from a path and an optional set of CloudOptions.
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-io/src/cloud/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use polars_core::error::to_compute_err;
use polars_core::prelude::{polars_ensure, polars_err};
use polars_error::{PolarsError, PolarsResult};
use regex::Regex;
use url::Url;

use super::*;
use super::CloudOptions;

const DELIMITER: char = '/';

Expand Down
21 changes: 8 additions & 13 deletions crates/polars-io/src/cloud/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,24 @@
//! Interface with cloud storage through the object_store crate.

#[cfg(feature = "cloud")]
use std::borrow::Cow;
mod adaptors;
#[cfg(feature = "cloud")]
use std::sync::Arc;
pub use adaptors::*;

#[cfg(feature = "cloud")]
use object_store::local::LocalFileSystem;
mod polars_object_store;
#[cfg(feature = "cloud")]
use object_store::ObjectStore;
#[cfg(feature = "cloud")]
use polars_core::prelude::{polars_bail, PolarsError, PolarsResult};
pub use polars_object_store::*;

#[cfg(feature = "cloud")]
mod adaptors;
#[cfg(feature = "cloud")]
mod glob;
#[cfg(feature = "cloud")]
mod object_store_setup;
pub mod options;
pub use glob::*;

#[cfg(feature = "cloud")]
pub use adaptors::*;
#[cfg(feature = "cloud")]
pub use glob::*;
mod object_store_setup;
#[cfg(feature = "cloud")]
pub use object_store_setup::*;

pub mod options;
pub use options::*;
25 changes: 17 additions & 8 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::sync::Arc;

use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use once_cell::sync::Lazy;
pub use options::*;
use polars_error::to_compute_err;
use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult};
use polars_utils::aliases::PlHashMap;
use tokio::sync::RwLock;
use url::Url;

use super::*;
use super::{parse_url, CloudLocation, CloudOptions, CloudType};

/// Object stores must be cached. Every object-store will do DNS lookups and
/// get rate limited when querying the DNS (can take up to 5s).
Expand Down Expand Up @@ -35,7 +38,14 @@ fn url_to_key(url: &Url) -> String {
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> BuildResult {
pub async fn build_object_store(
url: &str,
#[cfg_attr(
not(any(feature = "aws", feature = "gcp", feature = "azure")),
allow(unused_variables)
)]
options: Option<&CloudOptions>,
) -> BuildResult {
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed)?;

Expand All @@ -48,9 +58,8 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu
}
}

let options = options
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(Default::default()));
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
let options = options.map(std::borrow::Cow::Borrowed).unwrap_or_default();

let cloud_type = CloudType::from_url(&parsed)?;
let store = match cloud_type {
Expand Down Expand Up @@ -93,7 +102,7 @@ pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> Bu
{
let store = object_store::http::HttpBuilder::new()
.with_url(url)
.with_client_options(get_client_options())
.with_client_options(super::get_client_options())
.build()?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
Expand Down
14 changes: 6 additions & 8 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ use object_store::gcp::GoogleCloudStorageBuilder;
pub use object_store::gcp::GoogleConfigKey;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure", feature = "http"))]
use object_store::ClientOptions;
#[cfg(feature = "cloud")]
use object_store::ObjectStore;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
use object_store::{BackoffConfig, RetryConfig};
#[cfg(feature = "aws")]
Expand Down Expand Up @@ -226,9 +224,9 @@ impl CloudOptions {
self
}

/// Build the [`ObjectStore`] implementation for AWS.
/// Build the [`object_store::ObjectStore`] implementation for AWS.
#[cfg(feature = "aws")]
pub async fn build_aws(&self, url: &str) -> PolarsResult<impl ObjectStore> {
pub async fn build_aws(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
let options = self.aws.as_ref();
let mut builder = AmazonS3Builder::from_env().with_url(url);
if let Some(options) = options {
Expand Down Expand Up @@ -329,9 +327,9 @@ impl CloudOptions {
self
}

/// Build the [`ObjectStore`] implementation for Azure.
/// Build the [`object_store::ObjectStore`] implementation for Azure.
#[cfg(feature = "azure")]
pub fn build_azure(&self, url: &str) -> PolarsResult<impl ObjectStore> {
pub fn build_azure(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
let options = self.azure.as_ref();
let mut builder = MicrosoftAzureBuilder::from_env();
if let Some(options) = options {
Expand Down Expand Up @@ -363,9 +361,9 @@ impl CloudOptions {
self
}

/// Build the [`ObjectStore`] implementation for GCP.
/// Build the [`object_store::ObjectStore`] implementation for GCP.
#[cfg(feature = "gcp")]
pub fn build_gcp(&self, url: &str) -> PolarsResult<impl ObjectStore> {
pub fn build_gcp(&self, url: &str) -> PolarsResult<impl object_store::ObjectStore> {
let options = self.gcp.as_ref();
let mut builder = GoogleCloudStorageBuilder::from_env();
if let Some(options) = options {
Expand Down
61 changes: 61 additions & 0 deletions crates/polars-io/src/cloud/polars_object_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use polars_error::{to_compute_err, PolarsResult};

use crate::pl_async::{
tune_with_concurrency_budget, with_concurrency_budget, MAX_BUDGET_PER_REQUEST,
};

/// Polars specific wrapper for `Arc<dyn ObjectStore>` that limits the number of
/// concurrent requests for the entire application.
#[derive(Debug, Clone)]
pub struct PolarsObjectStore(Arc<dyn ObjectStore>);

impl PolarsObjectStore {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self(store)
}

pub async fn get(&self, path: &Path) -> PolarsResult<Bytes> {
tune_with_concurrency_budget(1, || async {
self.0
.get(path)
.await
.map_err(to_compute_err)?
.bytes()
.await
.map_err(to_compute_err)
})
.await
}

pub async fn get_range(&self, path: &Path, range: Range<usize>) -> PolarsResult<Bytes> {
tune_with_concurrency_budget(1, || self.0.get_range(path, range))
.await
.map_err(to_compute_err)
}

pub async fn get_ranges(
&self,
path: &Path,
ranges: &[Range<usize>],
) -> PolarsResult<Vec<Bytes>> {
tune_with_concurrency_budget(
(ranges.len() as u32).clamp(0, MAX_BUDGET_PER_REQUEST as u32),
|| self.0.get_ranges(path, ranges),
)
.await
.map_err(to_compute_err)
}

/// Fetch the metadata of the parquet file, do not memoize it.
pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {
with_concurrency_budget(1, || self.0.head(path))
.await
.map_err(to_compute_err)
}
}
Loading

0 comments on commit 4933040

Please sign in to comment.