Skip to content

Commit

Permalink
Add array_compact function support
Browse files Browse the repository at this point in the history
  • Loading branch information
erenavsarogullari committed Feb 5, 2025
1 parent ea788c7 commit e6894c9
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 0 deletions.
37 changes: 37 additions & 0 deletions datafusion/core/tests/sql/sql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,43 @@ use datafusion::prelude::*;

use tempfile::TempDir;

#[tokio::test]
async fn dml_output_schema2() {
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field};

let ctx = SessionContext::new();
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let sql = "INSERT INTO test VALUES (1)";
let df = ctx.sql(sql).await.unwrap();
let count_schema = Schema::new(vec![Field::new("count", DataType::UInt64, false)]);
assert_eq!(Schema::from(df.schema()), count_schema);

let sql2 = "Select array_compact(make_array(5, NULL, 3, NULL, 4, 6))";
let df2 = ctx.sql(sql2).await.unwrap();
df2.show().await;

// let sql3 = "Select array_min(make_array('a', 'b', 'c'))";
// let df3 = ctx.sql(sql3).await.unwrap();
// df3.show().await;
//
// let sql4 = "Select array_min()";
// let df4 = ctx.sql(sql4).await.unwrap();
// df4.show().await;



// let sql3 = "Select array_sort(make_array('d', 'b', 'c'))";
// let df3 = ctx.sql(sql3).await.unwrap();
// df3.show().await;

// let batches = df2.collect().await;
// let result = batches.unwrap().iter().collect::<Vec<RecordBatch>>();
// let res = result.iter().collect::<i64>();
// assert_eq!(res, Some(&3));
}


#[tokio::test]
async fn unsupported_ddl_returns_error() {
// Verify SessionContext::with_sql_options errors appropriately
Expand Down
201 changes: 201 additions & 0 deletions datafusion/functions-nested/src/array_compact.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! [`ScalarUDFImpl`] definitions for array_compact function.
use crate::utils::make_scalar_function;
use arrow_array::{Array, ArrayRef, GenericListArray, Int32Array, ListArray};
use arrow_schema::{DataType, Field};
use arrow_schema::DataType::{FixedSizeList, LargeList, List, Null};
use datafusion_common::cast::as_list_array;
use datafusion_common::exec_err;
use datafusion_doc::Documentation;
use datafusion_expr::{ColumnarValue, Expr, ScalarUDFImpl, Signature, Volatility};
use datafusion_macros::user_doc;
use std::any::Any;
use std::sync::Arc;
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use itertools::Itertools;
use datafusion_common::tree_node::TreeNodeIterator;

make_udf_expr_and_func!(
ArrayCompact,
array_compact,
array,
"returns an array of the same type as the input argument where all NULL values have been removed.",
array_compact_udf
);

#[user_doc(
doc_section(label = "Array Functions"),
description = "Returns an array of the same type as the input argument where all NULL values have been removed.",
syntax_example = "array_compact(array)",
sql_example = r#"```sql
> select array_compact([3,1,NULL,4,NULL,2]);
+-----------------------------------------+
| array_compact(List([3,1,4,2])) |
+-----------------------------------------+
| 1 |
+-----------------------------------------+
```"#,
argument(
name = "array",
description = "Array expression. Can be a constant, column, or function, and any combination of array operators."
)
)]
#[derive(Debug)]
pub struct ArrayCompact {
signature: Signature,
aliases: Vec<String>,
}

impl Default for ArrayCompact {
fn default() -> Self {
Self::new()
}
}

impl ArrayCompact {
pub fn new() -> Self {
Self {
signature: Signature::array(Volatility::Immutable),
aliases: vec!["list_compact".to_string()],
}
}
}

impl ScalarUDFImpl for ArrayCompact {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"array_compact"
}

fn display_name(&self, args: &[Expr]) -> datafusion_common::Result<String> {
let args_name = args.iter().map(ToString::to_string).collect::<Vec<_>>();
if args_name.len() != 1 {
return exec_err!("expects 1 arg, got {}", args_name.len());
}

Ok(format!("{}", args_name[0]))
}

fn schema_name(&self, args: &[Expr]) -> datafusion_common::Result<String> {
let args_name = args
.iter()
.map(|e| e.schema_name().to_string())
.collect::<Vec<_>>();
if args_name.len() != 1 {
return exec_err!("expects 1 arg, got {}", args_name.len());
}

Ok(format!("{}", args_name[0]))
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
match &arg_types[0] {
List(field) | FixedSizeList(field, _) => Ok(List(Arc::new(
Field::new_list_field(field.data_type().clone(), true),
))),
LargeList(field) => Ok(LargeList(Arc::new(Field::new_list_field(
field.data_type().clone(),
true,
)))),
_ => exec_err!(
"Not reachable, data_type should be List, LargeList or FixedSizeList"
),
}
}

fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> datafusion_common::Result<ColumnarValue> {
make_scalar_function(array_compact_inner)(args)
}

fn aliases(&self) -> &[String] {
&self.aliases
}

fn documentation(&self) -> Option<&Documentation> {
self.doc()
}
}

/// array_compact SQL function
///
/// There is one argument for array_compact as the array.
/// `array_compact(array)`
///
/// For example:
/// > array_compact(\[3, NULL, 1, NULL, 2]) -> 3,1,2
pub fn array_compact_inner(args: &[ArrayRef]) -> datafusion_common::Result<ArrayRef> {
if args.len() != 1 {
return exec_err!("array_compact needs one argument");
}

match &args[0].data_type() {
List(_) | LargeList(_) | FixedSizeList(_, _) => array_compact_internal(&args),
_ => exec_err!("array_compact does not support type: {:?}", args[0].data_type()),
}
}

fn array_compact_internal(args: &[ArrayRef]) -> datafusion_common::Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
println!("list_array: {:?}", list_array);
println!("list_array Results => {:?}", list_array.iter().filter(|x| {
println!("x: {:?}", x.iter().filter(|x1| {
let dt = x1.as_ref().data_type();
println!("dt: {:?}", dt);
dt.is_null()
}).collect_vec());
let t = x.as_ref().unwrap().is_null(1);//data_type();
println!("t Results => {:?}", t);
!t
}).collect_vec());



// let res = Int32Array::new(list_array).iter().collect_vec();//.filter(|x| !x.as_ref().unwrap().is_empty()).collect_vec();
// println!("non-null_array Results => {:?}", res);
// println!("null_array Results => {:?}", list_array.as_primitive::<_>().iter().filter(|x: &Option<_>| !x.as_ref().unwrap().is_nullable())).collect_vec();;
// let results = list_array.nuas_list().iter().filter(|x|
// !x.as_ref().unwrap().is_empty()
// ).collect_vec();
// println!("Filtered Results => {:?}", results);
// let temp_arr = results.;
// Ok(temp_arr)
// let arr = as_list_array(results);
// let v = ListArray::from_iter_primitive::<Int32Type, _, _>(results);
Ok(Arc::new(Int32Array::from(vec![5,5,5])))//

// let sorted_array = array_sort_inner(args)?;
// let result_array = as_list_array(&sorted_array)?.value(0);
// if result_array.is_empty() {
// return exec_err!("array_min needs one argument as non-empty array");
// }
// let min_result = result_array.slice(0, 1);
// Ok(min_result)
}
2 changes: 2 additions & 0 deletions datafusion/functions-nested/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod macros;

pub mod array_has;
pub mod cardinality;
pub mod array_compact;
pub mod concat;
pub mod dimension;
pub mod distance;
Expand Down Expand Up @@ -132,6 +133,7 @@ pub fn all_default_nested_functions() -> Vec<Arc<ScalarUDF>> {
extract::array_slice_udf(),
extract::array_any_value_udf(),
make_array::make_array_udf(),
array_compact::array_compact_udf(),
array_has::array_has_udf(),
array_has::array_has_all_udf(),
array_has::array_has_any_udf(),
Expand Down

0 comments on commit e6894c9

Please sign in to comment.