Skip to content

Commit

Permalink
feat: Implement basic version of RLIKE (apache#734)
Browse files Browse the repository at this point in the history
(cherry picked from commit e33d560)
  • Loading branch information
andygrove authored and huaxingao committed Aug 7, 2024
1 parent f9ed671 commit 997400c
Show file tree
Hide file tree
Showing 16 changed files with 461 additions and 28 deletions.
7 changes: 7 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,13 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.regexp.allowIncompatible")
.doc("Comet is not currently fully compatible with Spark for all regular expressions. " +
"Set this config to true to allow them anyway using Rust's regular expression engine. " +
"See compatibility guide for more information.")
.booleanConf
.createWithDefault(false)
}

object ConfigHelpers {
Expand Down
6 changes: 6 additions & 0 deletions docs/source/user-guide/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ be used in production.

There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.

## Regular Expressions

Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.

## Cast

Cast operations in Comet fall into three levels of support:
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Comet provides the following configuration settings.
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
| spark.comet.rowToColumnar.supportedOperatorList | A comma-separated list of row-based operators that will be converted to columnar format when 'spark.comet.rowToColumnar.enabled' is true | Range,InMemoryTableScan |
| spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false |
Expand Down
6 changes: 6 additions & 0 deletions docs/templates/compatibility-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ be used in production.

There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support.

## Regular Expressions

Comet uses the Rust regexp crate for evaluating regular expressions, and this has different behavior from Java's
regular expression engine. Comet will fall back to Spark for patterns that are known to produce different results, but
this can be overridden by setting `spark.comet.regexp.allowIncompatible=true`.

## Cast

Cast operations in Comet fall into three levels of support:
Expand Down
2 changes: 0 additions & 2 deletions native/core/src/execution/datafusion/expressions/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,6 @@ make_predicate_function!(EndsWith, ends_with_dyn, ends_with_utf8_scalar_dyn);

make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn);

// make_predicate_function!(RLike, rlike_dyn, rlike_utf8_scalar_dyn);

#[derive(Debug, Hash)]
pub struct SubstringExpr {
pub child: Arc<dyn PhysicalExpr>,
Expand Down
15 changes: 14 additions & 1 deletion native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use datafusion_expr::expr::find_df_window_func;
use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use datafusion_physical_expr_common::expressions::Literal;
use itertools::Itertools;
use jni::objects::GlobalRef;
use num::{BigInt, ToPrimitive};
Expand Down Expand Up @@ -108,7 +109,7 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
use datafusion_comet_spark_expr::{
Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, SecondExpr, TimestampTruncExpr,
Cast, DateTruncExpr, HourExpr, IfExpr, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr,
};

// For clippy error on type_complexity.
Expand Down Expand Up @@ -447,6 +448,18 @@ impl PhysicalPlanner {

Ok(Arc::new(Like::new(left, right)))
}
ExprStruct::Rlike(expr) => {
let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?;
let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?;
match right.as_any().downcast_ref::<Literal>().unwrap().value() {
ScalarValue::Utf8(Some(pattern)) => {
Ok(Arc::new(RLike::try_new(left, pattern)?))
}
_ => Err(ExecutionError::GeneralError(
"RLike only supports scalar patterns".to_string(),
)),
}
}
ExprStruct::CheckOverflow(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap());
Expand Down
10 changes: 5 additions & 5 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ message Expr {
StartsWith startsWith = 27;
EndsWith endsWith = 28;
Contains contains = 29;
// RLike rlike = 30;
RLike rlike = 30;
ScalarFunc scalarFunc = 31;
EqualNullSafe eqNullSafe = 32;
NotEqualNullSafe neqNullSafe = 33;
Expand Down Expand Up @@ -368,10 +368,10 @@ message Like {
Expr right = 2;
}

// message RLike {
// Expr left = 1;
// Expr right = 2;
// }
message RLike {
Expr left = 1;
Expr right = 2;
}

message StartsWith {
Expr left = 1;
Expand Down
2 changes: 2 additions & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod error;
mod if_expr;

mod kernels;
mod regexp;
pub mod scalar_funcs;
pub mod spark_hash;
mod temporal;
Expand All @@ -30,6 +31,7 @@ mod xxhash64;
pub use cast::{spark_cast, Cast};
pub use error::{SparkError, SparkResult};
pub use if_expr::IfExpr;
pub use regexp::RLike;
pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr, TimestampTruncExpr};

/// Spark supports three evaluation modes when evaluating expressions, which affect
Expand Down
170 changes: 170 additions & 0 deletions native/spark-expr/src/regexp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::utils::down_cast_any_ref;
use crate::SparkError;
use arrow::compute::take;
use arrow_array::builder::BooleanBuilder;
use arrow_array::types::Int32Type;
use arrow_array::{Array, BooleanArray, DictionaryArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::{internal_err, Result};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use regex::Regex;
use std::any::Any;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

/// Implementation of RLIKE operator.
///
/// Note that this implementation is not yet Spark-compatible and simply delegates to
/// the Rust regexp crate. It will match Spark behavior for some simple cases but has
/// differences in whitespace handling and does not support all the features of Java's
/// regular expression engine, which are documented at:
///
/// https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html
#[derive(Debug)]
pub struct RLike {
child: Arc<dyn PhysicalExpr>,
// Only scalar patterns are supported
pattern_str: String,
pattern: Regex,
}

impl Hash for RLike {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(self.pattern_str.as_bytes());
}
}

impl RLike {
pub fn try_new(child: Arc<dyn PhysicalExpr>, pattern: &str) -> Result<Self> {
Ok(Self {
child,
pattern_str: pattern.to_string(),
pattern: Regex::new(pattern).map_err(|e| {
SparkError::Internal(format!("Failed to compile pattern {}: {}", pattern, e))
})?,
})
}

fn is_match(&self, inputs: &StringArray) -> BooleanArray {
let mut builder = BooleanBuilder::with_capacity(inputs.len());
if inputs.is_nullable() {
for i in 0..inputs.len() {
if inputs.is_null(i) {
builder.append_null();
} else {
builder.append_value(self.pattern.is_match(inputs.value(i)));
}
}
} else {
for i in 0..inputs.len() {
builder.append_value(self.pattern.is_match(inputs.value(i)));
}
}
builder.finish()
}
}

impl Display for RLike {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RLike [child: {}, pattern: {}] ",
self.child, self.pattern_str
)
}
}

impl PartialEq<dyn Any> for RLike {
fn eq(&self, other: &dyn Any) -> bool {
down_cast_any_ref(other)
.downcast_ref::<Self>()
.map(|x| self.child.eq(&x.child) && self.pattern_str.eq(&x.pattern_str))
.unwrap_or(false)
}
}

impl PhysicalExpr for RLike {
fn as_any(&self) -> &dyn Any {
self
}

fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
Ok(DataType::Boolean)
}

fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.child.nullable(input_schema)
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
match self.child.evaluate(batch)? {
ColumnarValue::Array(array) if array.as_any().is::<DictionaryArray<Int32Type>>() => {
let dict_array = array
.as_any()
.downcast_ref::<DictionaryArray<Int32Type>>()
.expect("dict array");
let dict_values = dict_array
.values()
.as_any()
.downcast_ref::<StringArray>()
.expect("strings");
// evaluate the regexp pattern against the dictionary values
let new_values = self.is_match(dict_values);
// convert to conventional (not dictionary-encoded) array
let result = take(&new_values, dict_array.keys(), None)?;
Ok(ColumnarValue::Array(result))
}
ColumnarValue::Array(array) => {
let inputs = array
.as_any()
.downcast_ref::<StringArray>()
.expect("string array");
let array = self.is_match(inputs);
Ok(ColumnarValue::Array(Arc::new(array)))
}
ColumnarValue::Scalar(_) => {
internal_err!("non scalar regexp patterns are not supported")
}
}
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.child]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
assert!(children.len() == 1);
Ok(Arc::new(RLike::try_new(
children[0].clone(),
&self.pattern_str,
)?))
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
use std::hash::Hash;
let mut s = state;
self.hash(&mut s);
}
}
24 changes: 24 additions & 0 deletions spark/benchmarks/CometAggregateBenchmark-jdk11-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
================================================================================================
Grouped Aggregate (single group key + single aggregate SUM)
================================================================================================

OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-41-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate SUM: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (SUM) 2663 2744 115 3.9 254.0 1.0X
SQL Parquet - Comet (Scan, Exec) (SUM) 1067 1084 24 9.8 101.8 2.5X


================================================================================================
Grouped Aggregate (single group key + single aggregate COUNT)
================================================================================================

OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-41-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT) 2532 2552 28 4.1 241.5 1.0X
SQL Parquet - Comet (Scan, Exec) (COUNT) 4590 4592 4 2.3 437.7 0.6X


32 changes: 32 additions & 0 deletions spark/src/main/scala/org/apache/comet/expressions/RegExp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.expressions

object RegExp {

/** Determine whether the regexp pattern is supported natively and compatible with Spark */
def isSupportedPattern(pattern: String): Boolean = {
// this is a placeholder for implementing logic to determine if the pattern
// is known to be compatible with Spark, so that we can enable regexp automatically
// for common cases and fallback to Spark for more complex cases
false
}

}
Loading

0 comments on commit 997400c

Please sign in to comment.