Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate RuntimeConfig, update code to use new builder style #13635

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::memory_pool::{human_readable_size, units};
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
Expand Down Expand Up @@ -195,10 +196,15 @@ impl ExternalAggrConfig {
let query_name =
format!("Q{query_id}({})", human_readable_size(mem_limit as usize));
let config = self.common.config();
let runtime_config = RuntimeConfig::new()
let runtime_env = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(mem_limit as usize)))
.build_arc()?;
let ctx = SessionContext::new_with_config_rt(config, runtime_config);
let state = SessionStateBuilder::new()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this follows the new builder style model more fully

.with_config(config)
.with_runtime_env(runtime_env)
.with_default_features()
.build();
let ctx = SessionContext::from(state);

// register tables
self.register_tables(&ctx).await?;
Expand Down
10 changes: 6 additions & 4 deletions benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::datasource::listing::{
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
Expand Down Expand Up @@ -188,9 +188,11 @@ impl RunOpt {
/// Benchmark query `query_id` in `SORT_QUERIES`
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self.common.config();

let runtime_config = RuntimeConfig::new().build_arc()?;
let ctx = SessionContext::new_with_config_rt(config, runtime_config);
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
let ctx = SessionContext::from(state);

// register tables
self.register_tables(&ctx).await?;
Expand Down
37 changes: 14 additions & 23 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::sync::{Arc, OnceLock};

use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool, MemoryPool};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
Expand Down Expand Up @@ -156,27 +156,22 @@ async fn main_inner() -> Result<()> {
session_config = session_config.with_batch_size(batch_size);
};

let rt_config = RuntimeConfig::new();
let rt_config =
// set memory pool size
if let Some(memory_limit) = args.memory_limit {
// set memory pool type
match args.mem_pool_type {
PoolType::Fair => rt_config
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
PoolType::Greedy => rt_config
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
}
} else {
rt_config
let mut rt_builder = RuntimeEnvBuilder::new();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code is now clearer -- previously the rt_config was actually a builder and used as such, but that was someone confusing given the code style

// set memory pool size
if let Some(memory_limit) = args.memory_limit {
// set memory pool type
let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)),
PoolType::Greedy => Arc::new(GreedyMemoryPool::new(memory_limit)),
};
rt_builder = rt_builder.with_memory_pool(pool)
}

let runtime_env = create_runtime_env(rt_config.clone())?;
let runtime_env = rt_builder.build_arc()?;

// enable dynamic file query
let ctx =
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env))
.enable_url_table();
let ctx = SessionContext::new_with_config_rt(session_config, runtime_env)
.enable_url_table();
ctx.refresh_catalogs().await?;
// install dynamic catalog provider that can register required object stores
ctx.register_catalog_list(Arc::new(DynamicObjectStoreCatalog::new(
Expand Down Expand Up @@ -231,10 +226,6 @@ async fn main_inner() -> Result<()> {
Ok(())
}

fn create_runtime_env(rn_config: RuntimeConfig) -> Result<RuntimeEnv> {
RuntimeEnv::try_new(rn_config)
}

fn parse_valid_file(dir: &str) -> Result<String, String> {
if Path::new(dir).is_file() {
Ok(dir.to_string())
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,6 @@ mod tests {
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{col, lit};

use chrono::DateTime;
Expand Down Expand Up @@ -984,12 +983,10 @@ mod tests {
async fn query_compress_data(
file_compression_type: FileCompressionType,
) -> Result<()> {
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This simply creates a default RuntimeEnv which the SessionStateBuilder already does, so there is no need to do it again

let mut cfg = SessionConfig::new();
cfg.options_mut().catalog.has_header = true;
let session_state = SessionStateBuilder::new()
.with_config(cfg)
.with_runtime_env(runtime)
.with_default_features()
.build();
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
Expand Down
3 changes: 0 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1792,7 +1792,6 @@ mod tests {
use super::{super::options::CsvReadOptions, *};
use crate::assert_batches_eq;
use crate::execution::memory_pool::MemoryConsumer;
use crate::execution::runtime_env::RuntimeEnvBuilder;
use crate::test;
use crate::test_util::{plan_and_collect, populate_csv_partitions};
use arrow_schema::{DataType, TimeUnit};
Expand Down Expand Up @@ -1932,14 +1931,12 @@ mod tests {
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());

let runtime = RuntimeEnvBuilder::new().build_arc()?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise here, this is the default runtime env, it isn't needed

let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
.set_str("datafusion.catalog.has_header", "true");
let session_state = SessionStateBuilder::new()
.with_config(cfg)
.with_runtime_env(runtime)
.with_default_features()
.build();
let ctx = SessionContext::new_with_state(session_state);
Expand Down
3 changes: 1 addition & 2 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Manages files generated during query execution, files are
//! hashed among the directories listed in RuntimeConfig::local_dirs.
//! [`DiskManager`]: Manages files generated during query execution

use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
use log::debug;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct GreedyMemoryPool {
}

impl GreedyMemoryPool {
/// Allocate up to `limit` bytes
/// Create a new pool that can allocate up to `pool_size` bytes
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
Expand Down
72 changes: 44 additions & 28 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,32 @@ use url::Url;
/// Execution runtime environment that manages system resources such
/// as memory, disk, cache and storage.
///
/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the
/// A [`RuntimeEnv`] can be created using [`RuntimeEnvBuilder`] and has the
/// following resource management functionality:
///
/// * [`MemoryPool`]: Manage memory
/// * [`DiskManager`]: Manage temporary files on local disk
/// * [`CacheManager`]: Manage temporary cache data during the session lifetime
/// * [`ObjectStoreRegistry`]: Manage mapping URLs to object store instances
///
/// # Example: Create default `RuntimeEnv`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new doc examples

/// ```
/// # use datafusion_execution::runtime_env::RuntimeEnv;
/// let runtime_env = RuntimeEnv::default();
/// ```
///
/// # Example: Create a `RuntimeEnv` from [`RuntimeEnvBuilder`] with a new memory pool
/// ```
/// # use std::sync::Arc;
/// # use datafusion_execution::memory_pool::GreedyMemoryPool;
/// # use datafusion_execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
/// // restrict to using at most 100MB of memory
/// let pool_size = 100 * 1024 * 1024;
/// let runtime_env = RuntimeEnvBuilder::new()
/// .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
/// .build()
/// .unwrap();
/// ```
pub struct RuntimeEnv {
/// Runtime memory management
pub memory_pool: Arc<dyn MemoryPool>,
Expand All @@ -66,28 +85,16 @@ impl Debug for RuntimeEnv {
}

impl RuntimeEnv {
#[deprecated(since = "43.0.0", note = "please use `try_new` instead")]
#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
#[allow(deprecated)]
pub fn new(config: RuntimeConfig) -> Result<Self> {
Self::try_new(config)
}
/// Create env based on configuration
#[deprecated(since = "44.0.0", note = "please use `RuntimeEnvBuilder` instead")]
#[allow(deprecated)]
pub fn try_new(config: RuntimeConfig) -> Result<Self> {
let RuntimeConfig {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is literally the same as RuntimeConfig::build so just call that directly

memory_pool,
disk_manager,
cache_manager,
object_store_registry,
} = config;

let memory_pool =
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));

Ok(Self {
memory_pool,
disk_manager: DiskManager::try_new(disk_manager)?,
cache_manager: CacheManager::try_new(&cache_manager)?,
object_store_registry,
})
config.build()
}

/// Registers a custom `ObjectStore` to be used with a specific url.
Expand All @@ -104,7 +111,7 @@ impl RuntimeEnv {
/// # use std::sync::Arc;
/// # use url::Url;
/// # use datafusion_execution::runtime_env::RuntimeEnv;
/// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
/// # let runtime_env = RuntimeEnv::default();
/// let url = Url::try_from("file://").unwrap();
/// let object_store = object_store::local::LocalFileSystem::new();
/// // register the object store with the runtime environment
Expand All @@ -119,11 +126,12 @@ impl RuntimeEnv {
/// # use std::sync::Arc;
/// # use url::Url;
/// # use datafusion_execution::runtime_env::RuntimeEnv;
/// # let runtime_env = RuntimeEnv::try_new(Default::default()).unwrap();
/// # let runtime_env = RuntimeEnv::default();
/// # // use local store for example as http feature is not enabled
/// # let http_store = object_store::local::LocalFileSystem::new();
/// // create a new object store via object_store::http::HttpBuilder;
/// let base_url = Url::parse("https://github.com").unwrap();
/// // (note this example can't depend on the http feature)
/// // let http_store = HttpBuilder::new()
/// // .with_url(base_url.clone())
/// // .build()
Expand Down Expand Up @@ -155,12 +163,15 @@ impl Default for RuntimeEnv {
}
}

/// Please see: <https://github.com/apache/datafusion/issues/12156>
/// Please see: <https://github.com/apache/datafusion/issues/12156a>
comphead marked this conversation as resolved.
Show resolved Hide resolved
/// This a type alias for backwards compatibility.
#[deprecated(since = "43.0.0", note = "please use `RuntimeEnvBuilder` instead")]
pub type RuntimeConfig = RuntimeEnvBuilder;

#[derive(Clone)]
/// Execution runtime configuration
/// Execution runtime configuration builder.
///
/// See example on [`RuntimeEnv`]
pub struct RuntimeEnvBuilder {
/// DiskManager to manage temporary disk file usage
pub disk_manager: DiskManagerConfig,
Expand Down Expand Up @@ -239,15 +250,20 @@ impl RuntimeEnvBuilder {

/// Build a RuntimeEnv
pub fn build(self) -> Result<RuntimeEnv> {
let memory_pool = self
.memory_pool
.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
let Self {
disk_manager,
memory_pool,
cache_manager,
object_store_registry,
} = self;
let memory_pool =
memory_pool.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));

Ok(RuntimeEnv {
memory_pool,
disk_manager: DiskManager::try_new(self.disk_manager)?,
cache_manager: CacheManager::try_new(&self.cache_manager)?,
object_store_registry: self.object_store_registry,
disk_manager: DiskManager::try_new(disk_manager)?,
cache_manager: CacheManager::try_new(&cache_manager)?,
object_store_registry,
})
}

Expand Down
10 changes: 3 additions & 7 deletions datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
// under the License.

use crate::{
config::SessionConfig,
memory_pool::MemoryPool,
registry::FunctionRegistry,
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
config::SessionConfig, memory_pool::MemoryPool, registry::FunctionRegistry,
runtime_env::RuntimeEnv,
};
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
use datafusion_expr::planner::ExprPlanner;
Expand Down Expand Up @@ -54,9 +52,7 @@ pub struct TaskContext {

impl Default for TaskContext {
fn default() -> Self {
let runtime = RuntimeEnvBuilder::new()
.build_arc()
.expect("default runtime created successfully");
let runtime = Arc::new(RuntimeEnv::default());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this does the same thing, just makes the intent clearer


// Create a default task context, mostly useful for testing
Self {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,7 @@ mod tests {

fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime = RuntimeEnvBuilder::default()
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
.build_arc()
.unwrap();
Expand Down Expand Up @@ -1914,7 +1914,7 @@ mod tests {
let input: Arc<dyn ExecutionPlan> = Arc::new(TestYieldingExec::new(true));
let input_schema = input.schema();

let runtime = RuntimeEnvBuilder::default()
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(1, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
|| matches!(tz_info, TimezoneInfo::WithTimeZone)
{
// Timestamp With Time Zone
// INPUT : [SQLDataType] TimestampTz + [RuntimeConfig] Time Zone
// INPUT : [SQLDataType] TimestampTz + [Config] Time Zone
// OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
self.context_provider.options().execution.time_zone.clone()
} else {
Expand Down
Loading