Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reading remote storage systems #811

Closed
wants to merge 17 commits into from
5 changes: 3 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
LogicalPlanBuilder::scan_parquet_with_name(
&scan.path,
projection,
24,
ExecutionContext::with_concurrency(24),
&scan.table_name,
)? //TODO concurrency
.build()
Expand Down Expand Up @@ -1114,10 +1114,11 @@ impl TryInto<Field> for &protobuf::Field {
}
}

use datafusion::physical_plan::datetime_expressions::to_timestamp;
use datafusion::physical_plan::{aggregates, windows};
use datafusion::prelude::{
array, date_part, date_trunc, length, lower, ltrim, md5, rtrim, sha224, sha256,
sha384, sha512, trim, upper,
sha384, sha512, trim, upper, ExecutionContext,
};
use std::convert::TryFrom;

Expand Down
16 changes: 10 additions & 6 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::execution::context::{
ExecutionConfig, ExecutionContextState, ExecutionProps,
};
Expand Down Expand Up @@ -70,7 +71,7 @@ use datafusion::physical_plan::{
Partitioning,
};
use datafusion::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
use datafusion::prelude::CsvReadOptions;
use datafusion::prelude::{CsvReadOptions, ExecutionContext};
use log::debug;
use protobuf::physical_expr_node::ExprType;
use protobuf::physical_plan_node::PhysicalPlanType;
Expand Down Expand Up @@ -130,14 +131,13 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
PhysicalPlanType::ParquetScan(scan) => {
let projection = scan.projection.iter().map(|i| *i as usize).collect();
let filenames: Vec<&str> =
scan.filename.iter().map(|s| s.as_str()).collect();
Ok(Arc::new(ParquetExec::try_from_files(
&filenames,
let path: &str = scan.filename[0].as_str();
Ok(Arc::new(ParquetExec::try_from_path(
path,
Some(projection),
None,
scan.batch_size as usize,
scan.num_partitions as usize,
ExecutionContext::with_concurrency(scan.num_partitions as usize),
None,
)?))
}
Expand Down Expand Up @@ -621,13 +621,17 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc<dyn PhysicalExpr> {

let catalog_list =
Arc::new(MemoryCatalogList::new()) as Arc<dyn CatalogList>;

let object_store_registry = Arc::new(ObjectStoreRegistry::new());

let ctx_state = ExecutionContextState {
catalog_list,
scalar_functions: Default::default(),
var_provider: Default::default(),
aggregate_functions: Default::default(),
config: ExecutionConfig::new(),
execution_props: ExecutionProps::new(),
object_store_registry,
};

let fun_expr = functions::create_physical_fun(
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
let filenames = exec
.partitions()
.iter()
.flat_map(|part| part.filenames().to_owned())
.flat_map(|part| part.filenames())
.collect();
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
Expand Down
27 changes: 11 additions & 16 deletions ballista/rust/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ use self::state::{ConfigBackendClient, SchedulerState};
use ballista_core::config::BallistaConfig;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::datasource::parquet::ParquetRootDesc;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use std::time::{Instant, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -282,24 +282,19 @@ impl SchedulerGrpc for SchedulerServer {

match file_type {
FileType::Parquet => {
let parquet_exec =
ParquetExec::try_from_path(&path, None, None, 1024, 1, None)
.map_err(|e| {
let msg = format!("Error opening parquet files: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;
let ctx = ExecutionContext::with_concurrency(1);
let parquet_desc = ParquetRootDesc::new(&path, ctx).map_err(|e| {
let msg = format!("Error opening parquet files: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;

//TODO include statistics and any other info needed to reconstruct ParquetExec
Ok(Response::new(GetFileMetadataResult {
schema: Some(parquet_exec.schema().as_ref().into()),
partitions: parquet_exec
.partitions()
.iter()
.map(|part| FilePartitionMetadata {
filename: part.filenames().to_vec(),
})
.collect(),
schema: Some(parquet_desc.schema().as_ref().into()),
partitions: vec![FilePartitionMetadata {
filename: vec![path],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are always returning a single path for the partitions field? This changes the behavior doesn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior is unchanged indeed, the origin filenames all comes from the root_path, and here I just use the root_path instead, to avoid touching too much code in ballista proto definition as well as its serde (to_proto and from_proto)

}],
}))
}
//TODO implement for CSV
Expand Down
12 changes: 6 additions & 6 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ mod test {
};
}

#[test]
fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case anyone else is interested, this is what happens if you don't have tokio::test:

failures:

---- planner::test::distributed_hash_aggregate_plan stdout ----
thread 'planner::test::distributed_hash_aggregate_plan' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/alamb/.cargo/registry/src/git.luolix.top-1ecc6299db9ec823/tokio-1.10.0/src/runtime/blocking/pool.rs:84:33
stack backtrace:
   0: rust_begin_unwind
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/std/src/panicking.rs:515:5
   1: core::panicking::panic_fmt
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/panicking.rs:92:14
   2: core::option::expect_failed
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/option.rs:1243:5
   3: core::option::Option<T>::expect
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/option.rs:351:21
   4: tokio::runtime::blocking::pool::spawn_blocking
             at /Users/alamb/.cargo/registry/src/git.luolix.top-1ecc6299db9ec823/tokio-1.10.0/src/runtime/blocking/pool.rs:84:14
   5: tokio::fs::asyncify::{{closure}}
             at /Users/alamb/.cargo/registry/src/git.luolix.top-1ecc6299db9ec823/tokio-1.10.0/src/fs/mod.rs:119:11
   6: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
   7: tokio::fs::metadata::metadata::{{closure}}
             at /Users/alamb/.cargo/registry/src/git.luolix.top-1ecc6299db9ec823/tokio-1.10.0/src/fs/metadata.rs:46:5
   8: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
   9: datafusion::datasource::object_store::local::list_all_async::{{closure}}
             at /Users/alamb/Software/arrow-datafusion/datafusion/src/datasource/object_store/local.rs:148:8
  10: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
  11: datafusion::datasource::object_store::local::list_all::{{closure}}
             at /Users/alamb/Software/arrow-datafusion/datafusion/src/datasource/object_store/local.rs:111:15

async fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
let mut ctx = datafusion_test_context("testdata")?;

// simplified form of TPC-H query 1
Expand Down Expand Up @@ -352,8 +352,8 @@ mod test {
Ok(())
}

#[test]
fn distributed_join_plan() -> Result<(), BallistaError> {
#[tokio::test]
async fn distributed_join_plan() -> Result<(), BallistaError> {
let mut ctx = datafusion_test_context("testdata")?;

// simplified form of TPC-H query 12
Expand Down Expand Up @@ -523,8 +523,8 @@ order by
Ok(())
}

#[test]
fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
#[tokio::test]
async fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
let mut ctx = datafusion_test_context("testdata")?;

// simplified form of TPC-H query 1
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,8 @@ fn get_table(
let schema = get_schema(table);
Ok(Arc::new(ParquetTable::try_new_with_schema(
&path,
ExecutionContext::with_concurrency(max_concurrency),
schema,
max_concurrency,
false,
)?))
}
Expand Down
6 changes: 5 additions & 1 deletion datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ impl FlightService for FlightServiceImpl {
) -> Result<Response<SchemaResult>, Status> {
let request = request.into_inner();

let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();
let table = ParquetTable::try_new(
&request.path[0],
ExecutionContext::with_concurrency(num_cpus::get()),
)
.unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into();
Expand Down
4 changes: 2 additions & 2 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ paste = "^1.0"
num_cpus = "1.13.0"
chrono = "0.4"
async-trait = "0.1.41"
futures = "0.3"
futures = { version = "0.3", features = ["executor"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have tokio (which has full on executor) I don't think we also need the futures executor so I would like to avoid this new dependency.

I tried removing this change locally and it seems to work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use tokio::runtime::Handle::block_on rather than futures::executor::block_on as a way to play as nicely as possible with the tokio executor: https://docs.rs/tokio/1.10.0/tokio/runtime/struct.Handle.html#method.block_on

So something like

Handle::current()
  .block_on(async { .... });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While using tokio::runtime::Handle::block_on, I'm facing with:

’Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

Since block_on is try_entering an already entered runtime, therefore I changed to future::executor's to avoid panic in the first place. But as I noted before, future::executor::block_on is also flawed here:

However, this approach is flawed for block_on may block the only thread in tokio, and the future inside won't get a chance to run, therefore hanging forever if the tokio runtime is not a multi-threaded one. (I temporarily change the related test to use #[tokio::test(flavor = "multi_thread", worker_threads = 2)] to avoid hanging).

Do you have any suggestions on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only real suggestion is to plumb async all the way through to planning (aka remove the non async API)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternate compromise, which you have partly implemented in this PR, is to implement both async and non async versions. This is similar to the approach in the C/C++ filesystem api (props to @nealrichardson for the pointer), which has both having synchronous and asynchronous APIs.

How about this alternative to reduce the scope of this PR? i.e. implement both sync and async, but only use sync API to migrate existing code to the new IO abstraction, then work on async propagation as a fast follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other thing I was thinking about was what about adding in the ObjectStore interfaces in one PR and then start hooking that up into the rest of the system / rewrite the existing data sources (like Parquet, etc) as separate PRs.

I think @yjshen has done a great job with this PR showing how everything would hook together, but I do feel like this PR is slightly beyond my ability to comprehend given its size and scope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am onboard with further reducing the scope by focusing only on the ObjectStore interface :)

pin-project-lite= "^0.2.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs"] }
tokio-stream = "0.1"
log = "^0.4"
md-5 = { version = "^0.9.1", optional = true }
Expand Down
33 changes: 22 additions & 11 deletions datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use async_trait::async_trait;
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.filter(col("a").lt_eq(col("b")))?
Expand All @@ -59,7 +60,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.select_columns(&["a", "b"])?;
Expand All @@ -73,7 +75,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.select(vec![col("a") * col("b"), col("c")])?;
Expand All @@ -87,7 +90,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.filter(col("a").lt_eq(col("b")))?;
Expand All @@ -101,7 +105,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
///
Expand All @@ -124,7 +129,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.limit(100)?;
Expand All @@ -138,7 +144,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.union(df.clone())?;
Expand All @@ -153,7 +160,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
Expand Down Expand Up @@ -196,7 +204,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
Expand Down Expand Up @@ -275,7 +284,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let schema = df.schema();
Expand Down Expand Up @@ -309,7 +319,8 @@ pub trait DataFrame: Send + Sync {
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let f = df.registry();
Expand Down
12 changes: 9 additions & 3 deletions datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
//! ```
//! use datafusion::datasource::TableProvider;
//! use datafusion::datasource::csv::{CsvFile, CsvReadOptions};
//! #[tokio::main]
//! # async fn main() {
//!
//! let testdata = datafusion::test_util::arrow_test_data();
//! let csvdata = CsvFile::try_new(
//! &format!("{}/csv/aggregate_test_100.csv", testdata),
//! CsvReadOptions::new().delimiter(b'|'),
//! ).unwrap();
//! let schema = csvdata.schema();
//! # }
//! ```

use arrow::datatypes::SchemaRef;
Expand All @@ -40,14 +43,16 @@ use std::string::String;
use std::sync::{Arc, Mutex};

use crate::datasource::datasource::Statistics;
use crate::datasource::object_store::local::LocalFileSystem;
use crate::datasource::object_store::ObjectStore;
use crate::datasource::{Source, TableProvider};
use crate::error::{DataFusionError, Result};
use crate::logical_plan::Expr;
use crate::physical_plan::csv::CsvExec;
pub use crate::physical_plan::csv::CsvReadOptions;
use crate::physical_plan::{common, ExecutionPlan};
use crate::physical_plan::ExecutionPlan;

/// Represents a CSV file with a provided schema
/// Represents a CSV file with a provided scxhema
pub struct CsvFile {
source: Source,
schema: SchemaRef,
Expand All @@ -64,7 +69,8 @@ impl CsvFile {
let schema = Arc::new(match options.schema {
Some(s) => s.clone(),
None => {
let filenames = common::build_file_list(&path, options.file_extension)?;
let filenames =
LocalFileSystem.list(path.as_str(), options.file_extension)?;
if filenames.is_empty() {
return Err(DataFusionError::Plan(format!(
"No files found at {path} with file extension {file_extension}",
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/datasource/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ use crate::{
datasource::{Source, TableProvider},
error::{DataFusionError, Result},
physical_plan::{
common,
json::{NdJsonExec, NdJsonReadOptions},
ExecutionPlan,
},
};
use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable};

use super::datasource::Statistics;
use crate::datasource::object_store::local::LocalFileSystem;
use crate::datasource::object_store::ObjectStore;

trait SeekRead: Read + Seek {}

Expand All @@ -57,7 +58,7 @@ impl NdJsonFile {
let schema = if let Some(schema) = options.schema {
schema
} else {
let filenames = common::build_file_list(path, options.file_extension)?;
let filenames = LocalFileSystem.list(path, options.file_extension)?;
if filenames.is_empty() {
return Err(DataFusionError::Plan(format!(
"No files found at {path} with file extension {file_extension}",
Expand Down
Loading