Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49995][SQL] Add named argument support to more TVFs #48503

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ object FunctionRegistry {
expressionGeneratorBuilderOuter("explode_outer", ExplodeExpressionBuilder),
expression[Greatest]("greatest"),
expression[If]("if"),
expression[Inline]("inline"),
expressionGeneratorOuter[Inline]("inline_outer"),
expressionBuilder("inline", InlineExpressionBuilder),
expressionGeneratorBuilderOuter("inline_outer", InlineExpressionBuilder),
expression[IsNaN]("isnan"),
expression[Nvl]("ifnull", setAlias = true),
expression[IsNull]("isnull"),
Expand All @@ -379,8 +379,8 @@ object FunctionRegistry {
expression[NullIfZero]("nullifzero"),
expression[Nvl]("nvl"),
expression[Nvl2]("nvl2"),
expression[PosExplode]("posexplode"),
expressionGeneratorOuter[PosExplode]("posexplode_outer"),
expressionBuilder("posexplode", PosExplodeExpressionBuilder),
expressionGeneratorBuilderOuter("posexplode_outer", PosExplodeExpressionBuilder),
expression[Rand]("rand"),
expression[Rand]("random", true, Some("3.0.0")),
expression[Randn]("randn"),
Expand Down Expand Up @@ -1154,16 +1154,16 @@ object TableFunctionRegistry {
logicalPlan[Range]("range"),
generatorBuilder("explode", ExplodeGeneratorBuilder),
generatorBuilder("explode_outer", ExplodeOuterGeneratorBuilder),
generator[Inline]("inline"),
generator[Inline]("inline_outer", outer = true),
generatorBuilder("inline", InlineGeneratorBuilder),
generatorBuilder("inline_outer", InlineOuterGeneratorBuilder),
generator[JsonTuple]("json_tuple"),
generator[PosExplode]("posexplode"),
generator[PosExplode]("posexplode_outer", outer = true),
generatorBuilder("posexplode", PosExplodeGeneratorBuilder),
generatorBuilder("posexplode_outer", PosExplodeOuterGeneratorBuilder),
generator[Stack]("stack"),
generator[Collations]("collations"),
generator[SQLKeywords]("sql_keywords"),
generator[VariantExplode]("variant_explode"),
generator[VariantExplode]("variant_explode_outer", outer = true)
generatorBuilder("variant_explode", VariantExplodeGeneratorBuilder),
generatorBuilder("variant_explode_outer", VariantExplodeOuterGeneratorBuilder)
)

val builtin: SimpleTableFunctionRegistry = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,9 @@ object ExplodeGeneratorBuilder extends ExplodeGeneratorBuilderBase {
> SELECT _FUNC_(collection => array(10, 20));
10
20
> SELECT * FROM _FUNC_(collection => array(10, 20));
10
20
""",
since = "1.0.0",
group = "generator_funcs")
Expand All @@ -501,7 +504,6 @@ object ExplodeOuterGeneratorBuilder extends ExplodeGeneratorBuilderBase {
override def isOuter: Boolean = true
}


/**
* Given an input array produces a sequence of rows for each position and value in the array.
*
Expand All @@ -511,6 +513,21 @@ object ExplodeOuterGeneratorBuilder extends ExplodeGeneratorBuilderBase {
* 1 20
* }}}
*/
case class PosExplode(child: Expression) extends ExplodeBase {
override val position = true
override protected def withNewChildInternal(newChild: Expression): PosExplode =
copy(child = newChild)
}

trait PosExplodeGeneratorBuilderBase extends GeneratorBuilder {
override def functionSignature: Option[FunctionSignature] =
Some(FunctionSignature(Seq(InputParameter("collection"))))
override def buildGenerator(funcName: String, expressions: Seq[Expression]): Generator = {
assert(expressions.size == 1)
PosExplode(expressions(0))
}
}

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(expr) - Separates the elements of array `expr` into multiple rows with positions, or the elements of map `expr` into multiple rows and columns with positions. Unless specified otherwise, uses the column name `pos` for position, `col` for elements of the array or `key` and `value` for elements of the map.",
Expand All @@ -519,34 +536,71 @@ object ExplodeOuterGeneratorBuilder extends ExplodeGeneratorBuilderBase {
> SELECT _FUNC_(array(10,20));
0 10
1 20
> SELECT * FROM _FUNC_(array(10,20));
> SELECT _FUNC_(collection => array(10,20));
0 10
1 20
> SELECT * FROM _FUNC_(collection => array(10,20));
0 10
1 20
""",
since = "2.0.0",
group = "generator_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class PosExplode(child: Expression) extends ExplodeBase {
override val position = true
override protected def withNewChildInternal(newChild: Expression): PosExplode =
copy(child = newChild)
object PosExplodeExpressionBuilder extends ExpressionBuilder {
override def functionSignature: Option[FunctionSignature] =
Some(FunctionSignature(Seq(InputParameter("collection"))))

override def build(funcName: String, expressions: Seq[Expression]) : Expression =
PosExplode(expressions(0))
}

/**
* Explodes an array of structs into a table.
*/
// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(expr) - Explodes an array of structs into a table. Uses column names col1, col2, etc. by default unless specified otherwise.",
usage = "_FUNC_(expr) - Separates the elements of array `expr` into multiple rows with positions, or the elements of map `expr` into multiple rows and columns with positions. Unless specified otherwise, uses the column name `pos` for position, `col` for elements of the array or `key` and `value` for elements of the map.",
examples = """
Examples:
> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
> SELECT _FUNC_(array(10,20));
0 10
1 20
> SELECT _FUNC_(collection => array(10,20));
0 10
1 20
> SELECT * FROM _FUNC_(collection => array(10,20));
0 10
1 20
""",
since = "2.0.0",
group = "generator_funcs")
// scalastyle:on line.size.limit line.contains.tab
object PosExplodeGeneratorBuilder extends PosExplodeGeneratorBuilderBase {
override def isOuter: Boolean = false
}

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(expr) - Separates the elements of array `expr` into multiple rows with positions, or the elements of map `expr` into multiple rows and columns with positions. Unless specified otherwise, uses the column name `pos` for position, `col` for elements of the array or `key` and `value` for elements of the map.",
examples = """
Examples:
> SELECT _FUNC_(array(10,20));
0 10
1 20
> SELECT _FUNC_(collection => array(10,20));
0 10
1 20
> SELECT * FROM _FUNC_(collection => array(10,20));
0 10
1 20
""",
since = "2.0.0",
group = "generator_funcs")
// scalastyle:on line.size.limit line.contains.tab
object PosExplodeOuterGeneratorBuilder extends PosExplodeGeneratorBuilderBase {
override def isOuter: Boolean = true
}

/**
* Explodes an array of structs into a table.
*/
case class Inline(child: Expression) extends UnaryExpression with CollectionGenerator {
override val inline: Boolean = true
override val position: Boolean = false
Expand Down Expand Up @@ -595,6 +649,85 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene
override protected def withNewChildInternal(newChild: Expression): Inline = copy(child = newChild)
}

trait InlineGeneratorBuilderBase extends GeneratorBuilder {
override def functionSignature: Option[FunctionSignature] =
Some(FunctionSignature(Seq(InputParameter("input"))))
override def buildGenerator(funcName: String, expressions: Seq[Expression]): Generator = {
assert(expressions.size == 1)
Inline(expressions(0))
}
}

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(expr) - Explodes an array of structs into a table. Uses column names col1, col2, etc. by default unless specified otherwise.",
examples = """
Examples:
> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
> SELECT _FUNC_(input => array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
> SELECT * FROM _FUNC_(input => array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
""",
since = "2.0.0",
group = "generator_funcs")
// scalastyle:on line.size.limit line.contains.tab
object InlineExpressionBuilder extends ExpressionBuilder {
override def functionSignature: Option[FunctionSignature] =
Some(FunctionSignature(Seq(InputParameter("input"))))

override def build(funcName: String, expressions: Seq[Expression]) : Expression =
Inline(expressions(0))
}

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(expr) - Explodes an array of structs into a table. Uses column names col1, col2, etc. by default unless specified otherwise.",
examples = """
Examples:
> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
> SELECT _FUNC_(input => array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
> SELECT * FROM _FUNC_(input => array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
""",
since = "2.0.0",
group = "generator_funcs")
// scalastyle:on line.size.limit line.contains.tab
object InlineGeneratorBuilder extends InlineGeneratorBuilderBase {
override def isOuter: Boolean = false
}

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(expr) - Explodes an array of structs into a table. Uses column names col1, col2, etc. by default unless specified otherwise.",
examples = """
Examples:
> SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
> SELECT _FUNC_(input => array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
> SELECT * FROM _FUNC_(input => array(struct(1, 'a'), struct(2, 'b')));
1 a
2 b
""",
since = "2.0.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I don't think we can use them as table functions since 2.0. SELECT * FROM inline(...) is added recently (in spark 3.4 or 3.5)

Copy link
Member Author

@ueshin ueshin Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Now that we have separate definitions, we can maybe mention each of them in each definition.
Also the named arguments should be supported later than the tvfs were added, I guess? Do we have a way to mention about the updates?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should be fine. Users will only see the updated usage for the function in the latest version.

group = "generator_funcs")
// scalastyle:on line.size.limit line.contains.tab
object InlineOuterGeneratorBuilder extends InlineGeneratorBuilderBase {
override def isOuter: Boolean = true
}

@ExpressionDescription(
usage = """_FUNC_() - Get Spark SQL keywords""",
examples = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import scala.util.parsing.combinator.RegexParsers

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.{ExpressionBuilder, GeneratorBuilder, TypeCheckResult}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.json.JsonInferSchema
import org.apache.spark.sql.catalyst.plans.logical.{FunctionSignature, InputParameter}
import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, VARIANT_GET}
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, QuotingUtils}
Expand Down Expand Up @@ -617,21 +617,6 @@ object VariantGetExpressionBuilder extends VariantGetExpressionBuilderBase(true)
// scalastyle:on line.size.limit
object TryVariantGetExpressionBuilder extends VariantGetExpressionBuilderBase(false)

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(expr) - It separates a variant object/array into multiple rows containing its fields/elements. Its result schema is `struct<pos int, key string, value variant>`. `pos` is the position of the field/element in its parent object/array, and `value` is the field/element value. `key` is the field name when exploding a variant object, or is NULL when exploding a variant array. It ignores any input that is not a variant array/object, including SQL NULL, variant null, and any other variant values.",
examples = """
Examples:
> SELECT * from _FUNC_(parse_json('["hello", "world"]'));
0 NULL "hello"
1 NULL "world"
> SELECT * from _FUNC_(parse_json('{"a": true, "b": 3.14}'));
0 a true
1 b 3.14
""",
since = "4.0.0",
group = "variant_funcs")
// scalastyle:on line.size.limit line.contains.tab
case class VariantExplode(child: Expression) extends UnaryExpression with Generator
with ExpectsInputTypes {
override def inputTypes: Seq[AbstractDataType] = Seq(VariantType)
Expand Down Expand Up @@ -665,6 +650,53 @@ case class VariantExplode(child: Expression) extends UnaryExpression with Genera
}
}

trait VariantExplodeGeneratorBuilderBase extends GeneratorBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override def functionSignature: Option[FunctionSignature] =
Some(FunctionSignature(Seq(InputParameter("input"))))
override def buildGenerator(funcName: String, expressions: Seq[Expression]): Generator = {
assert(expressions.size == 1)
VariantExplode(expressions(0))
}
}

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(expr) - It separates a variant object/array into multiple rows containing its fields/elements. Its result schema is `struct<pos int, key string, value variant>`. `pos` is the position of the field/element in its parent object/array, and `value` is the field/element value. `key` is the field name when exploding a variant object, or is NULL when exploding a variant array. It ignores any input that is not a variant array/object, including SQL NULL, variant null, and any other variant values.",
examples = """
Examples:
> SELECT * from _FUNC_(parse_json('["hello", "world"]'));
0 NULL "hello"
1 NULL "world"
> SELECT * from _FUNC_(input => parse_json('{"a": true, "b": 3.14}'));
0 a true
1 b 3.14
""",
since = "4.0.0",
group = "variant_funcs")
// scalastyle:on line.size.limit line.contains.tab
object VariantExplodeGeneratorBuilder extends VariantExplodeGeneratorBuilderBase {
override def isOuter: Boolean = false
}

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(expr) - It separates a variant object/array into multiple rows containing its fields/elements. Its result schema is `struct<pos int, key string, value variant>`. `pos` is the position of the field/element in its parent object/array, and `value` is the field/element value. `key` is the field name when exploding a variant object, or is NULL when exploding a variant array. It ignores any input that is not a variant array/object, including SQL NULL, variant null, and any other variant values.",
examples = """
Examples:
> SELECT * from _FUNC_(parse_json('["hello", "world"]'));
0 NULL "hello"
1 NULL "world"
> SELECT * from _FUNC_(input => parse_json('{"a": true, "b": 3.14}'));
0 a true
1 b 3.14
""",
since = "4.0.0",
group = "variant_funcs")
// scalastyle:on line.size.limit line.contains.tab
object VariantExplodeOuterGeneratorBuilder extends VariantExplodeGeneratorBuilderBase {
override def isOuter: Boolean = true
}

object VariantExplode {
/**
* The actual implementation of the `VariantExplode` expression. We check `isNull` separately
Expand Down
Loading