Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move from_unixtime, now, current_date, current_time functions to datafusion-functions #9537

Merged
merged 27 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bcd2bd1
Move date_part, date_trunc, date_bin functions to datafusion-functions
Omega359 Mar 3, 2024
761d873
Merge remote-tracking branch 'upstream/main' into feature/9421
Omega359 Mar 3, 2024
2a41960
I do not understand why the logical plan changed but updating the exp…
Omega359 Mar 3, 2024
35f8e36
Merge remote-tracking branch 'upstream/main' into feature/9421
Omega359 Mar 3, 2024
618cc40
Fix fmt
Omega359 Mar 3, 2024
7ea0527
Merge remote-tracking branch 'upstream/main' into feature/9421
Omega359 Mar 5, 2024
98d5ff7
Improvements to remove datafusion-functions dependency from sq and ph…
Omega359 Mar 5, 2024
42039da
Merge remote-tracking branch 'upstream/main' into feature/9421
Omega359 Mar 5, 2024
4c84f08
WIP
Omega359 Mar 5, 2024
9675dcd
Fix function arguments for date_bin, date_trunc and date_part.
Omega359 Mar 6, 2024
444337c
Merge remote-tracking branch 'upstream/main' into feature/9421
Omega359 Mar 6, 2024
b72bd55
Merge remote-tracking branch 'upstream/main' into feature/9466
Omega359 Mar 6, 2024
8840d50
WIP
Omega359 Mar 6, 2024
c0ce362
Fix projection change. Add new test date_bin monotonicity
mustafasrepo Mar 6, 2024
3ef85ad
Merge remote-tracking branch 'origin/feature/9421' into feature/9466
Omega359 Mar 6, 2024
e574abf
Move now, current_date and current_time functions to datafusion-funct…
Omega359 Mar 6, 2024
f9a7717
Merge remote-tracking branch 'upstream/main' into feature/9466
Omega359 Mar 9, 2024
ea89f71
Force exact version of chrono
Omega359 Mar 9, 2024
8abb99a
Merge updates.
Omega359 Mar 9, 2024
15c50eb
Updates for chrono changes
Omega359 Mar 9, 2024
b4e54cc
Merge remote-tracking branch 'upstream/main' into feature/9466
Omega359 Mar 10, 2024
a58a851
Merge fixes
Omega359 Mar 10, 2024
b63feb3
Removed make_now from incorrect merge.
Omega359 Mar 10, 2024
3986baa
fmt fix.
Omega359 Mar 10, 2024
c2e8b69
Merge remote-tracking branch 'upstream/main' into feature/9466
Omega359 Mar 13, 2024
92a1370
Updates after correcting merge conflicts.
Omega359 Mar 13, 2024
189190c
Only move the tests using now() function from optimizer_integration.r…
Omega359 Mar 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

198 changes: 198 additions & 0 deletions datafusion/core/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{plan_err, Result};
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{OptimizerConfig, OptimizerContext};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::TableReference;

use chrono::DateTime;
use datafusion_functions::datetime;

#[cfg(test)]
#[ctor::ctor]
fn init() {
// enable logging so RUST_LOG works
let _ = env_logger::try_init();
}

#[test]
fn timestamp_nano_ts_none_predicates() -> Result<()> {
let sql = "SELECT col_int32
FROM test
WHERE col_ts_nano_none < (now() - interval '1 hour')";
let plan = test_sql(sql)?;
// a scan should have the now()... predicate folded to a single
// constant and compared to the column without a cast so it can be
// pushed down / pruned
let expected =
"Projection: test.col_int32\
\n Filter: test.col_ts_nano_none < TimestampNanosecond(1666612093000000000, None)\
\n TableScan: test projection=[col_int32, col_ts_nano_none]";
assert_eq!(expected, format!("{plan:?}"));
Ok(())
}

#[test]
fn timestamp_nano_ts_utc_predicates() {
let sql = "SELECT col_int32
FROM test
WHERE col_ts_nano_utc < (now() - interval '1 hour')";
let plan = test_sql(sql).unwrap();
// a scan should have the now()... predicate folded to a single
// constant and compared to the column without a cast so it can be
// pushed down / pruned
let expected =
"Projection: test.col_int32\n Filter: test.col_ts_nano_utc < TimestampNanosecond(1666612093000000000, Some(\"+00:00\"))\
\n TableScan: test projection=[col_int32, col_ts_nano_utc]";
assert_eq!(expected, format!("{plan:?}"));
}

fn test_sql(sql: &str) -> Result<LogicalPlan> {
// parse the SQL
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...
let ast: Vec<Statement> = Parser::parse_sql(&dialect, sql).unwrap();
let statement = &ast[0];

// create a logical query plan
let now_udf = datetime::functions()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much of the optimizer integration test do you think needs Udfs? I am wondering maybe we could just port the tests case for now() into core/tests/sql_integration` or something and eave the rest of the optimizer_integration test in the optimizer crate?

The reason it might be good to leave the optimzer tests in the optimizer crate are

  1. Easier to run cargo test -p datafusion_optimizer and run all the relevant tests
  2. Ensure that we could still use datafusion_optimizer without the function definitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look. 6/20 tests use functions in the test (concat, sum, concat_ws, now(), avg). I can move just those over for sure. I wish this test dependency issue wasn't such an problem in Rust - most of the problems I've seen migrating the functions have been test related.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more - the issue is not test related because we can use dev dependencies. The issue is publishing to crates.io because cargo publishes with dev-dependencies and has been for many many years according to the bug report. Would it be possible to just fix that by the use of --no-dev-deps or something similar during publishing?

🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think moving the tests to the core (what this PR does) is probably the best solution for now, as it correctly reflects the dependencies (the optimizer tests are testing behavior of functions that are not available to optimizer)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved all but the tests that required now() back to the datafusion/optimizer/tests folder.

.iter()
.find(|f| f.name() == "now")
.unwrap()
.to_owned();
let context_provider = MyContextProvider::default().with_udf(now_udf);
let sql_to_rel = SqlToRel::new(&context_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// hard code the return value of now()
let now_time = DateTime::from_timestamp(1666615693, 0).unwrap();
let config = OptimizerContext::new()
.with_skip_failing_rules(false)
.with_query_execution_start_time(now_time);
let analyzer = Analyzer::new();
let optimizer = Optimizer::new();
// analyze and optimize the logical plan
let plan = analyzer.execute_and_check(&plan, config.options(), |_, _| {})?;
optimizer.optimize(&plan, &config, |_, _| {})
}

#[derive(Default)]
struct MyContextProvider {
options: ConfigOptions,
udfs: HashMap<String, Arc<ScalarUDF>>,
}

impl MyContextProvider {
fn with_udf(mut self, udf: Arc<ScalarUDF>) -> Self {
self.udfs.insert(udf.name().to_string(), udf);
self
}
}

impl ContextProvider for MyContextProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let table_name = name.table();
if table_name.starts_with("test") {
let schema = Schema::new_with_metadata(
vec![
Field::new("col_int32", DataType::Int32, true),
Field::new("col_uint32", DataType::UInt32, true),
Field::new("col_utf8", DataType::Utf8, true),
Field::new("col_date32", DataType::Date32, true),
Field::new("col_date64", DataType::Date64, true),
// timestamp with no timezone
Field::new(
"col_ts_nano_none",
DataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
// timestamp with UTC timezone
Field::new(
"col_ts_nano_utc",
DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())),
true,
),
],
HashMap::new(),
);

Ok(Arc::new(MyTableSource {
schema: Arc::new(schema),
}))
} else {
plan_err!("table does not exist")
}
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.udfs.get(name).cloned()
}

fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
None
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}

fn options(&self) -> &ConfigOptions {
&self.options
}

fn udfs_names(&self) -> Vec<String> {
Vec::new()
}

fn udafs_names(&self) -> Vec<String> {
Vec::new()
}

fn udwfs_names(&self) -> Vec<String> {
Vec::new()
}
}

struct MyTableSource {
schema: SchemaRef,
}

impl TableSource for MyTableSource {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
34 changes: 26 additions & 8 deletions datafusion/core/tests/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,6 @@ fn make_udf_add(volatility: Volatility) -> Arc<ScalarUDF> {
))
}

fn now_expr() -> Expr {
call_fn("now", vec![]).unwrap()
}

fn cast_to_int64_expr(expr: Expr) -> Expr {
Expr::Cast(Cast::new(expr.into(), DataType::Int64))
}
Expand Down Expand Up @@ -255,7 +251,7 @@ fn now_less_than_timestamp() -> Result<()> {
// cast(now() as int) < cast(to_timestamp(...) as int) + 50000_i64
let plan = LogicalPlanBuilder::from(table_scan)
.filter(
cast_to_int64_expr(now_expr())
cast_to_int64_expr(now())
.lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000_i64)),
)?
.build()?;
Expand Down Expand Up @@ -368,14 +364,14 @@ fn test_const_evaluator_now() {
let time = chrono::Utc.timestamp_nanos(ts_nanos);
let ts_string = "2020-09-08T12:05:00+00:00";
// now() --> ts
test_evaluate_with_start_time(now_expr(), lit_timestamp_nano(ts_nanos), &time);
test_evaluate_with_start_time(now(), lit_timestamp_nano(ts_nanos), &time);

// CAST(now() as int64) + 100_i64 --> ts + 100_i64
let expr = cast_to_int64_expr(now_expr()) + lit(100_i64);
let expr = cast_to_int64_expr(now()) + lit(100_i64);
test_evaluate_with_start_time(expr, lit(ts_nanos + 100), &time);

// CAST(now() as int64) < cast(to_timestamp(...) as int64) + 50000_i64 ---> true
let expr = cast_to_int64_expr(now_expr())
let expr = cast_to_int64_expr(now())
.lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000i64));
test_evaluate_with_start_time(expr, lit(true), &time);
}
Expand Down Expand Up @@ -413,3 +409,25 @@ fn test_evaluator_udfs() {
));
test_evaluate(expr, expected_expr);
}

#[test]
fn multiple_now() -> Result<()> {
let table_scan = test_table_scan();
let time = Utc::now();
let proj = vec![now(), now().alias("t2")];
let plan = LogicalPlanBuilder::from(table_scan)
.project(proj)?
.build()?;

// expect the same timestamp appears in both exprs
let actual = get_optimized_plan_formatted(&plan, &time);
let expected = format!(
"Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\
\n TableScan: test",
time.timestamp_nanos_opt().unwrap(),
time.timestamp_nanos_opt().unwrap()
);

assert_eq!(expected, actual);
Ok(())
}
33 changes: 0 additions & 33 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,6 @@ pub enum BuiltinScalarFunction {
Substr,
/// to_hex
ToHex,
/// from_unixtime
FromUnixtime,
///now
Now,
///current_date
CurrentDate,
/// current_time
CurrentTime,
/// make_date
MakeDate,
/// translate
Expand Down Expand Up @@ -369,17 +361,11 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Translate => Volatility::Immutable,
BuiltinScalarFunction::Trim => Volatility::Immutable,
BuiltinScalarFunction::Upper => Volatility::Immutable,
BuiltinScalarFunction::FromUnixtime => Volatility::Immutable,
BuiltinScalarFunction::OverLay => Volatility::Immutable,
BuiltinScalarFunction::Levenshtein => Volatility::Immutable,
BuiltinScalarFunction::SubstrIndex => Volatility::Immutable,
BuiltinScalarFunction::FindInSet => Volatility::Immutable,

// Stable builtin functions
BuiltinScalarFunction::Now => Volatility::Stable,
BuiltinScalarFunction::CurrentDate => Volatility::Stable,
BuiltinScalarFunction::CurrentTime => Volatility::Stable,

// Volatile builtin functions
BuiltinScalarFunction::Random => Volatility::Volatile,
BuiltinScalarFunction::Uuid => Volatility::Volatile,
Expand All @@ -396,7 +382,6 @@ impl BuiltinScalarFunction {
/// 2. Deduce the output `DataType` based on the provided `input_expr_types`.
pub fn return_type(self, input_expr_types: &[DataType]) -> Result<DataType> {
use DataType::*;
use TimeUnit::*;

// Note that this function *must* return the same type that the respective physical expression returns
// or the execution panics.
Expand Down Expand Up @@ -544,12 +529,6 @@ impl BuiltinScalarFunction {
utf8_to_int_type(&input_expr_types[0], "find_in_set")
}
BuiltinScalarFunction::ToChar => Ok(Utf8),
BuiltinScalarFunction::FromUnixtime => Ok(Timestamp(Second, None)),
BuiltinScalarFunction::Now => {
Ok(Timestamp(Nanosecond, Some("+00:00".into())))
}
BuiltinScalarFunction::CurrentDate => Ok(Date32),
BuiltinScalarFunction::CurrentTime => Ok(Time64(Nanosecond)),
BuiltinScalarFunction::MakeDate => Ok(Date32),
BuiltinScalarFunction::Translate => {
utf8_to_str_type(&input_expr_types[0], "translate")
Expand Down Expand Up @@ -757,9 +736,6 @@ impl BuiltinScalarFunction {
],
self.volatility(),
),
BuiltinScalarFunction::FromUnixtime => {
Signature::uniform(1, vec![Int64], self.volatility())
}
BuiltinScalarFunction::Digest => Signature::one_of(
vec![
Exact(vec![Utf8, Utf8]),
Expand Down Expand Up @@ -904,11 +880,6 @@ impl BuiltinScalarFunction {
// will be as good as the number of digits in the number
Signature::uniform(1, vec![Float64, Float32], self.volatility())
}
BuiltinScalarFunction::Now
| BuiltinScalarFunction::CurrentDate
| BuiltinScalarFunction::CurrentTime => {
Signature::uniform(0, vec![], self.volatility())
}
BuiltinScalarFunction::MakeDate => Signature::uniform(
3,
vec![Int32, Int64, UInt32, UInt64, Utf8],
Expand Down Expand Up @@ -1032,12 +1003,8 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::FindInSet => &["find_in_set"],

// time/date functions
BuiltinScalarFunction::Now => &["now"],
BuiltinScalarFunction::CurrentDate => &["current_date", "today"],
BuiltinScalarFunction::CurrentTime => &["current_time"],
BuiltinScalarFunction::MakeDate => &["make_date"],
BuiltinScalarFunction::ToChar => &["to_char", "date_format"],
BuiltinScalarFunction::FromUnixtime => &["from_unixtime"],

// hashing functions
BuiltinScalarFunction::Digest => &["digest"],
Expand Down
11 changes: 0 additions & 11 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,15 +788,6 @@ scalar_expr!(
datetime format,
"converts a date, time, timestamp or duration to a string based on the provided format"
);
scalar_expr!(
FromUnixtime,
from_unixtime,
unixtime,
"returns the unix time in format"
);
scalar_expr!(CurrentDate, current_date, ,"returns current UTC date as a [`DataType::Date32`] value");
scalar_expr!(Now, now, ,"returns current timestamp in nanoseconds, using the same value for all instances of now() in same statement");
scalar_expr!(CurrentTime, current_time, , "returns current UTC time as a [`DataType::Time64`] value");
scalar_expr!(MakeDate, make_date, year month day, "make a date from year, month and day component parts");
scalar_expr!(Nanvl, nanvl, x y, "returns x if x is not NaN otherwise returns y");
scalar_expr!(
Expand Down Expand Up @@ -1258,8 +1249,6 @@ mod test {
test_scalar_expr!(Trim, trim, string);
test_scalar_expr!(Upper, upper, string);

test_scalar_expr!(FromUnixtime, from_unixtime, unixtime);

test_scalar_expr!(ArrayPopFront, array_pop_front, array);
test_scalar_expr!(ArrayPopBack, array_pop_back, array);
test_scalar_expr!(ArrayPosition, array_position, array, element, index);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub enum Volatility {
Immutable,
/// A stable function may return different values given the same input across different
/// queries but must return the same value for a given input within a query. An example of
/// this is [super::BuiltinScalarFunction::Now]. DataFusion
/// this is the `Now` function. DataFusion
/// will attempt to inline `Stable` functions during planning, when possible.
/// For query `select col1, now() from t1`, it might take a while to execute but
/// `now()` column will be the same for each output row, which is evaluated
Expand Down
Loading
Loading