-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for reading remote storage systems #811
Conversation
Thank you @yjshen , this is huge! I will help review it tomorrow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there is some duplication on file content reading and listing responsibilities between DataSource
, SourceDescBuilder
and ProtocolHandler
traits. Would be good to give more thoughts on how these abstractions would interact with each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking quite cool @yjshen -- thank you! I left some comments of things that might be worth considering.
Also I wonder if you have thought about trying to make this interface async somehow? I realize the underlying parquet reader isn't async (yet) but I think that is the direction things are heading in Rust I/O land
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this interesting proposition @yjshen!
I agree with @alamb that async should be taken into account, especially for fetching the file list and metadata which are typically high latency but with little processing. But here you cannot use async because the file list and statistics are materialized at the ParquetTable
creation level which is too early. This early materialization will also be problematic with buckets that have thousands of files:
- getting the metadata from all parquet files will be too long
- it can be interesting to leave the listing to the last moment, so that in case you implement some partition pruning later on, you can list the files only in the partitions you are interested in.
Overall I would prefer (but this is just my opinion) a higher level abstraction in which we can also plug catalogs such as Delta or Iceberg
@houqp @alamb I've done with the original implementation by abstracting file listing/reading logic into /// Objct Reader for one file in a object store
pub trait ObjectReader {
/// Get reader for a part [start, start + length] in the file
fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read>;
/// Get lenght for the file
fn length(&self) -> u64;
}
/// A ObjectStore abstracts access to an underlying file/object storage.
/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
pub trait ObjectStore: Sync + Send + Debug {
/// Returns the object store as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Returns all the files with filename extension `ext` in path `prefix`
fn list_all_files(&self, prefix: &str, ext: &str) -> Result<Vec<String>>;
/// Get object reader for one file
fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>>;
} Currently, there are several things remaining (I suppose that are not blockers for this PR, please correct me if get something wrong):
|
Regarding the async part, should I just make |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @yjshen !
My high level feeling is still that we lack an abstraction for the list of files (catalog).
datafusion/src/datasource/parquet.rs
Outdated
.unwrap() | ||
.object_store_registry | ||
.store_for_path(root_path); | ||
let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in my experience, it is too strict to expect parquet files to have the parquet suffix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, it was restricted to parquet suffix in the original implementation, so I moved it here. Probably we could make it as an argument and ask from the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or just list all files that not start with _
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @rdettai , I think we can also address this as a quick follow up PR since this is also the old behavior.
datafusion/src/datasource/csv.rs
Outdated
@@ -64,7 +66,8 @@ impl CsvFile { | |||
let schema = Arc::new(match options.schema { | |||
Some(s) => s.clone(), | |||
None => { | |||
let filenames = common::build_file_list(&path, options.file_extension)?; | |||
let filenames = LocalFileSystem |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not also allow csv/json files to be fetched using the object_store_registry ? this would make the behavior more consistent, but can definitively be added later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I leave out csv/json for now for simplicity, since their reading logic are quite different from parquet, I prefer to do these as follow-ups.
@alamb @andygrove @Dandandan @jorgecarleitao @rdettai On making the remote storage system object listing & data reading API async, a design choice occurs. This might be quite important, and I'd love to have your suggestions: To which level should I propagate async?This was because once we have async dir listing -> we can have async logical plans & async table provider -> we can have async DataFrame / context API Two available alternatives are:
Currently, This PR took the first approach by constructing all APIs in Does approach 1 make sense to you? If I take approach 1, how should the sync version function be constructed?This PR tries to make a wrapper over the async counterparts and keep single logic for each functionality. therefore relies on However, this approach is flawed for |
Thanks a lot for taking a good look at this and for the proposal.
Could you describe which APIs would be affected by this? For example, creating a logical plan would become I agree with making the planing I agree that this would be a major change. :) |
Mainly API change:
Other pub function / trait touched:
Upstream dependencies need to change:
|
I am starting to check this out carefully |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all, thank you again @yjshen and everyone else who has helped out on this PR.
This PR is pretty massive and I think we should begin breaking it down to merge it in -- the longer it stays open the more potential conflicts it hits as well as the longer before others can start playing with / helping out it.
TLDR; I would be in favor of making the DataFusion planning API async. async
planning is inevitable, in my opinion, if we want DataFusion to operate on remote catalogs that have not been locally cached in memory and must be accessed via async
I/O.
From my perspective, there are actually several major changes to this PR:
- An API to read data (during
async ExecutionPlan::execute
) from a remote file system - An API to read the metadata from a remote filesystem (e.g. what files exist, read parquet statistics, etc)
- Partial rewrite of NDJson, CSV and parquet readers to use the new ObjectStore API
The first is sufficient to do partial reads from S3 / other filesources if you already know what files exist there.
The second is needed to drive DataFusion entirely from a remote data source without having to read/cache a catalog locally.
To which level should I propagate async?
Since execution (calling ExecutionPlan::execute
and then collect
on the result) in DataFusion is async
, I think adding the async read (change 1 above) is a relatively small change and no async propagation is needed.
However, since as of today planning (everything up to calling ExecutionPlan::execute
) in Datafusion is not async
if we want to support async
catalog/metadata access (usecase 2 above) then I think we have no choice but to propagate async
all the way into planing.
To be clear, given the direction of database systems in general towards distributed systems I am in favor of plumbing async as far up to planning as needed to allow use of DataFusion with non-local catalogs. However, as you have noted, this is a much larger code change.
The alternate compromise, which you have partly implemented in this PR, is to implement both async
and non async
versions. This is similar to the approach in the C/C++ filesystem api (props to @nealrichardson for the pointer), which has both having synchronous and asynchronous APIs.
I also posted an announcement to the arrow mailing list about this change for broader visibility.
@@ -269,8 +269,8 @@ mod test { | |||
}; | |||
} | |||
|
|||
#[test] | |||
fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> { | |||
#[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case anyone else is interested, this is what happens if you don't have tokio::test
:
failures:
---- planner::test::distributed_hash_aggregate_plan stdout ----
thread 'planner::test::distributed_hash_aggregate_plan' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/alamb/.cargo/registry/src/git.luolix.top-1ecc6299db9ec823/tokio-1.10.0/src/runtime/blocking/pool.rs:84:33
stack backtrace:
0: rust_begin_unwind
at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/std/src/panicking.rs:515:5
1: core::panicking::panic_fmt
at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/panicking.rs:92:14
2: core::option::expect_failed
at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/option.rs:1243:5
3: core::option::Option<T>::expect
at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/option.rs:351:21
4: tokio::runtime::blocking::pool::spawn_blocking
at /Users/alamb/.cargo/registry/src/git.luolix.top-1ecc6299db9ec823/tokio-1.10.0/src/runtime/blocking/pool.rs:84:14
5: tokio::fs::asyncify::{{closure}}
at /Users/alamb/.cargo/registry/src/git.luolix.top-1ecc6299db9ec823/tokio-1.10.0/src/fs/mod.rs:119:11
6: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
7: tokio::fs::metadata::metadata::{{closure}}
at /Users/alamb/.cargo/registry/src/git.luolix.top-1ecc6299db9ec823/tokio-1.10.0/src/fs/metadata.rs:46:5
8: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
9: datafusion::datasource::object_store::local::list_all_async::{{closure}}
at /Users/alamb/Software/arrow-datafusion/datafusion/src/datasource/object_store/local.rs:148:8
10: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
11: datafusion::datasource::object_store::local::list_all::{{closure}}
at /Users/alamb/Software/arrow-datafusion/datafusion/src/datasource/object_store/local.rs:111:15
@@ -56,9 +56,9 @@ paste = "^1.0" | |||
num_cpus = "1.13.0" | |||
chrono = "0.4" | |||
async-trait = "0.1.41" | |||
futures = "0.3" | |||
futures = { version = "0.3", features = ["executor"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we already have tokio
(which has full on executor) I don't think we also need the futures executor so I would like to avoid this new dependency.
I tried removing this change locally and it seems to work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can use tokio::runtime::Handle::block_on
rather than futures::executor::block_on
as a way to play as nicely as possible with the tokio executor: https://docs.rs/tokio/1.10.0/tokio/runtime/struct.Handle.html#method.block_on
So something like
Handle::current()
.block_on(async { .... });
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While using tokio::runtime::Handle::block_on
, I'm facing with:
’Cannot start a runtime from within a runtime. This happens because a function (like
block_on
) attempted to block the current thread while the thread is being used to drive asynchronous tasks.
Since block_on
is try_enter
ing an already entered runtime, therefore I changed to future::executor
's to avoid panic in the first place. But as I noted before, future::executor::block_on
is also flawed here:
However, this approach is flawed for block_on may block the only thread in tokio, and the future inside won't get a chance to run, therefore hanging forever if the tokio runtime is not a multi-threaded one. (I temporarily change the related test to use #[tokio::test(flavor = "multi_thread", worker_threads = 2)] to avoid hanging).
Do you have any suggestions on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the only real suggestion is to plumb async
all the way through to planning (aka remove the non async API)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The alternate compromise, which you have partly implemented in this PR, is to implement both async and non async versions. This is similar to the approach in the C/C++ filesystem api (props to @nealrichardson for the pointer), which has both having synchronous and asynchronous APIs.
How about this alternative to reduce the scope of this PR? i.e. implement both sync and async, but only use sync API to migrate existing code to the new IO abstraction, then work on async propagation as a fast follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other thing I was thinking about was what about adding in the ObjectStore interfaces in one PR and then start hooking that up into the rest of the system / rewrite the existing data sources (like Parquet, etc) as separate PRs.
I think @yjshen has done a great job with this PR showing how everything would hook together, but I do feel like this PR is slightly beyond my ability to comprehend given its size and scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am onboard with further reducing the scope by focusing only on the ObjectStore interface :)
collect_statistics: bool, | ||
) -> Result<SourceRootDescriptor> { | ||
let mut results: Vec<Result<PartitionedFile>> = Vec::new(); | ||
futures::executor::block_on(async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above I think you can use tokio::runtime::Handle::current().block_on
results.into_iter().collect(); | ||
let partition_files = partition_results?; | ||
|
||
// build a list of Parquet partitions with statistics and gather all unique schemas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is strange to me that the collating of partitions doesn't happen in get_source_desc_async
-- it seems like get_source_desc
would just be doing the adapting of async
--> sync code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the get_source_desc
is used to adapting async
to sync
to stop propagating async to API.
If I understand you correctly, do you mean I should tell sync and async implementation apart, with two different logics? Instead of the current wrapper way (sync function wrap over async logic.)? |
I have a question on the ThreadSafeRead trait...is there anything prebuilt (or recommendation) to wrap say a tokio AsyncRead or bytes.Buf to easily implement the get_reader_async function? I see the example for the local filesystem using the FileSource2 to wrap the File, but I'm assuming most remote implementations will approach this function implementation with some kind of in-memory buffer or stream. I had some issues figuring this one out trying to implement this for S3 (I'm still a bit new to rust and lifetimes, etc). |
I think this would need to be handled case by case for different remote store client. It would be helpful to share exactly what client API signatures you are trying to use within |
Could it make sense to write a design doc like @houqp wrote some time ago for the qualified names? Does it feel a sufficiently impactful change to design this a bit before commiting? |
I think a design doc is a great idea @jorgecarleitao -- it would let us make sure some of the larger points are clear and there is consensus (especially around adding I am personally very interested in getting the ideas in this PR into DataFusion -- I think it is an important architectural step forward and since I think it will directly help IOx (the project I am working on) I can spend non trivial amounts of time working on it |
Thank you @houqp @alamb @jorgecarleitao for your great help! This PR initially contains several functions related to reading data, including the core object store abstraction, a more general scan partitioning abstraction, and some refactoring of parquet scan. At the same time, as I think more and get more valuable input, the scope becomes more extensive. Although I try to maintain PR lean as possible, leaving out some functionality such as JSON/CVS scan, it grows inevitably huge and is hard to review. I agree we could make the current PR a proof of concept, and I'm happy to break it down into several parts to get the work finally merged. As for the design doc for the object store API, I can write up a draft proposal first this weekend. Please help to review and revise it when it's available. Thanks again @alamb for offering the help on the doc part :) |
I've drafted a design doc here: https://docs.google.com/document/d/1ZEZqvdohrot0ewtTNeaBtqczOIJ1Q0OnX9PqMMxpOF8/edit#. Please help to review it. Thanks! |
Thanks @yjshen -- the plan you lay out sounds great |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yjshen one way to avoid the async planning code, might be to have an async version of a ParquetTable
TableProvider
. Would the following structure work for delta-rs?
(Create a RemoteParquetTable provider outside of Datafusion) <-- async
(Create Execution Context)
(Register Table Providers)
(Create LogicalPlan)
(Create ExecutionPlan)
(Call ExecutionPlan::execute) <-- async
(Stream results back from stream) <-- async
where RemoteParquetTable
is something that knows how to interact with the ObjectStore
and fetch the appropriate metadata for statistics and schema.
Planning these
Table Providers During
┌───────────────────────┐ results in same execution
│ (non async) │ ExecutionPlan reads data via
│ ParquetTable │──────┐ ObjectStore
│ │ │ ┌───────────────────────┐
└───────────────────────┘ │ │ (async) │ ┌───────────────┐
├─────▶│ existing │─────▶│ ObjectStore │
┌───────────────────────┐ │ │ ParquetExec │ └───────────────┘
│ async │ │ └───────────────────────┘
│ RemoteParquetTable │ │
│ │──────┘
│ fetches metadata on │
│RemoteParquetTable::new│
└───────────────────────┘
Planning Time Execution Time
This idea is not as nice as the unified framework you have here but it might allow DF to get to the more unified design incrementally
Today, in my mind the general flow of querying goes like this, and trying to add async
to the creation of LogicalPlan
or ExecutionPlan
is a large change, as you have pointed out
(Create Execution Context)
(Register Table Providers)
(Create LogicalPlan)
(Create ExecutionPlan)
(Call ExecutionPlan::execute) <-- async
(Stream results back from stream) <-- async
@alamb I might be wrong on this: is it possible to not provide a By doing this, we may pass async table building logic from planning API to users' hands, during they construct I think of this from the perspective of ballista, even though I'm not quite familiar with the code there, it seems ballista could only serialize/deserialize known typed TableProviders, therefore |
cc @houqp since I'm not familiar with delta-rs. |
That is a really neat idea @yjshen - I hadn't thought of that but it sounds very good
Not in my opinion.
That is correct -- Rust doesn't have built in runtime reflection support -- that type of behavior needs to be added in the application logic |
I also think constructing With regards to ballista table provider protobuf ser/de limitation, I think it's something we need address in the long term, otherwise, it would impossible to support custom table sources in ballista. |
As a result of previous discussions on this PR as well as in the design doc (updated according to latest reviews as well). I break down this PR into one dedicated API adding PR #950 and a |
I'm closing this PR since most of the functionalities in this one come true or will soon get in. I am excited about the changes taking place. |
Thank you again for all your work in this area @yjshen -- the improvements to DataFusion are amazing! |
Which issue does this PR close?
Closes #616
Rationale for this change
Currently, we can only read files from LocalFS since we use std::fs in ParquetExec. It would be nice to add support to read files that reside on storage sources such as HDFS, Amazon S3, etc.
What changes are included in this PR?
Introduce
ObjectStore
API as an abstraction of the underlying storage systems, such as local filesystem, HDFS, S3, etc. And make theObjectStore
implementation pluggable through theObjectStoreRegistery
in Datafusion'sExecutionContext
.Are there any user-facing changes?
Users can provide implementations for the
ObjectStore
trait, register it intoExecutionContext
's registry, and run queries against data that resides in remote storage systems as well as local fs (the only default implementation ofObjectStore
).