Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 22, 2024
1 parent 7440c1c commit 473ceff
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 11 deletions.
7 changes: 4 additions & 3 deletions datafusion-examples/examples/thread_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
//! due to congestion control and increased latencies for processing network
//! messages.
use arrow::util::pretty::pretty_format_batches;
use datafusion::common::runtime::dedicated_executor;
use datafusion::error::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::{dedicated_executor, DedicatedExecutor};
use datafusion::physical_plan::DedicatedExecutor;
use datafusion::prelude::*;
use futures::stream::StreamExt;
use object_store::http::HttpBuilder;
Expand Down Expand Up @@ -167,13 +168,13 @@ async fn different_runtime_advanced() -> Result<()> {
// ctx.register_object_store(&base_url, http_store);
// A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.

let http_store = DedicatedExecutor::wrap_object_store(http_store);
let http_store = dedicated_executor.wrap_object_store(http_store);

// Tell datafusion about processing http:// urls with this wrapped object store
ctx.register_object_store(&base_url, http_store);

// Plan (and execute) the query on the dedicated runtime
let mut stream = dedicated_executor
let stream = dedicated_executor
.spawn(async move {
// Plan / execute the query
let url = "https://github.com/apache/arrow-testing/raw/master/data/csv/aggregate_test_100.csv";
Expand Down
10 changes: 8 additions & 2 deletions datafusion/physical-plan/src/dedicated_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use crate::io_object_store::IoObjectStore;
use crate::stream::RecordBatchStreamAdapter;
use crate::SendableRecordBatchStream;
use datafusion_common::DataFusionError;
use futures::{future::{BoxFuture, Shared}, Future, FutureExt, Stream, TryFutureExt};
use futures::{
future::{BoxFuture, Shared},
Future, FutureExt, Stream, TryFutureExt,
};
use log::{info, warn};
use object_store::ObjectStore;
use parking_lot::RwLock;
Expand Down Expand Up @@ -221,7 +224,10 @@ impl DedicatedExecutor {
/// Note that this object store will only work correctly if run on this
/// dedicated executor. If you try and use it on another executor, it will
/// panic with "no IO runtime registered" type error.
pub fn wrap_object_store(&self, object_store: Arc<dyn ObjectStore>) -> Arc<IoObjectStore> {
pub fn wrap_object_store(
&self,
object_store: Arc<dyn ObjectStore>,
) -> Arc<IoObjectStore> {
Arc::new(IoObjectStore::new(self.clone(), object_store))
}

Expand Down
9 changes: 3 additions & 6 deletions datafusion/physical-plan/src/io_object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

use std::sync::Arc;

use crate::dedicated_executor::JobError;
use crate::DedicatedExecutor;
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
};
use crate::dedicated_executor::JobError;

/// 'ObjectStore' that wraps an inner `ObjectStore` and wraps all the underlying
/// methods with [`DedicatedExecutor::spawn_io`] so that they are run on the Tokio Runtime
Expand Down Expand Up @@ -57,7 +56,7 @@ impl std::fmt::Display for IoObjectStore {
fn convert_error(e: JobError) -> object_store::Error {
object_store::Error::Generic {
store: "IoObjectStore",
source: Box::new(e)
source: Box::new(e),
}
}

Expand Down Expand Up @@ -98,7 +97,7 @@ impl ObjectStore for IoObjectStore {

inner_stream
//self.executor.run_stream(inner_stream, convert_error)
// .boxed()
// .boxed()
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down Expand Up @@ -136,6 +135,4 @@ impl ObjectStore for IoObjectStore {
})
.await
}


}

0 comments on commit 473ceff

Please sign in to comment.