Skip to content

Commit

Permalink
[SPARK-42191][SQL] Support udf 'luhn_check'
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR  adds a built-in function to check if a given number string is a valid Luhn number. It shall return true if the number string is a valid Luhn number, and false otherwise.

### Why are the changes needed?
This checksum function is widely applied to credit card numbers and government identification numbers to distinguish valid numbers from mistyped, incorrect numbers
Ref : [Trino](https://trino.io/docs/current/functions/string.html)
         [Postgresql](https://wiki.postgresql.org/wiki/Luhn_algorithm)

### Does this PR introduce _any_ user-facing change?
Yes, new udf `luhn_check`

### How was this patch tested?
Added test cases

Closes apache#39747 from vinodkc/br_udf_luhn_check.

Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
vinodkc authored and cloud-fan committed Feb 1, 2023
1 parent 4d37e78 commit b0ac061
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,35 @@ public class ExpressionImplUtils {
private static final int GCM_IV_LEN = 12;
private static final int GCM_TAG_LEN = 128;

/**
* Function to check if a given number string is a valid Luhn number
* @param numberString
* the number string to check
* @return
* true if the number string is a valid Luhn number, false otherwise.
*/
public static boolean isLuhnNumber(UTF8String numberString) {
String digits = numberString.toString();
// Empty string is not a valid Luhn number.
if (digits.isEmpty()) return false;
int checkSum = 0;
boolean isSecond = false;
for (int i = digits.length() - 1; i >= 0; i--) {
char ch = digits.charAt(i);
if (!Character.isDigit(ch)) return false;

int digit = Character.getNumericValue(ch);
// Double the digit if it's the second digit in the sequence.
int doubled = isSecond ? digit * 2 : digit;
// Add the two digits of the doubled number to the sum.
checkSum += doubled % 10 + doubled / 10;
// Toggle the isSecond flag for the next iteration.
isSecond = !isSecond;
}
// Check if the final sum is divisible by 10.
return checkSum % 10 == 0;
}

public static byte[] aesEncrypt(byte[] input, byte[] key, UTF8String mode, UTF8String padding) {
return aesInternal(input, key, mode.toString(), padding.toString(), Cipher.ENCRYPT_MODE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ object FunctionRegistry {
expression[Length]("length"),
expression[Length]("len", setAlias = true, Some("3.4.0")),
expression[Levenshtein]("levenshtein"),
expression[Luhncheck]("luhn_check"),
expression[Like]("like"),
expression[ILike]("ilike"),
expression[Lower]("lower"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3048,3 +3048,43 @@ case class Empty2Null(child: Expression) extends UnaryExpression with String2Str
override protected def withNewChildInternal(newChild: Expression): Empty2Null =
copy(child = newChild)
}

/**
* Function to check if a given number string is a valid Luhn number. Returns true, if the number
* string is a valid Luhn number, false otherwise.
*/
@ExpressionDescription(
usage = """
_FUNC_(str ) - Checks that a string of digits is valid according to the Luhn algorithm.
This checksum function is widely applied on credit card numbers and government identification
numbers to distinguish valid numbers from mistyped, incorrect numbers.
""",
examples = """
Examples:
> SELECT _FUNC_('8112189876');
true
> SELECT _FUNC_('79927398713');
true
> SELECT _FUNC_('79927398714');
false
""",
since = "3.5.0",
group = "string_funcs")
case class Luhncheck(input: Expression) extends RuntimeReplaceable with ImplicitCastInputTypes {

override lazy val replacement: Expression = StaticInvoke(
classOf[ExpressionImplUtils],
BooleanType,
"isLuhnNumber",
Seq(input),
inputTypes)

override def inputTypes: Seq[AbstractDataType] = Seq(StringType)

override def prettyName: String = "luhn_check"

override def children: Seq[Expression] = Seq(input)

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): Expression = copy(newChildren(0))
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
| org.apache.spark.sql.catalyst.expressions.Logarithm | log | SELECT log(10, 100) | struct<LOG(10, 100):double> |
| org.apache.spark.sql.catalyst.expressions.Lower | lcase | SELECT lcase('SparkSql') | struct<lcase(SparkSql):string> |
| org.apache.spark.sql.catalyst.expressions.Lower | lower | SELECT lower('SparkSql') | struct<lower(SparkSql):string> |
| org.apache.spark.sql.catalyst.expressions.Luhncheck | luhn_check | SELECT luhn_check('8112189876') | struct<luhn_check(8112189876):boolean> |
| org.apache.spark.sql.catalyst.expressions.MakeDTInterval | make_dt_interval | SELECT make_dt_interval(1, 12, 30, 01.001001) | struct<make_dt_interval(1, 12, 30, 1.001001):interval day to second> |
| org.apache.spark.sql.catalyst.expressions.MakeDate | make_date | SELECT make_date(2013, 7, 15) | struct<make_date(2013, 7, 15):date> |
| org.apache.spark.sql.catalyst.expressions.MakeInterval | make_interval | SELECT make_interval(100, 11, 1, 1, 12, 30, 01.001001) | struct<make_interval(100, 11, 1, 1, 12, 30, 1.001001):interval> |
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/string-functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,29 @@ CREATE TEMPORARY VIEW fmtTable(fmtField) AS SELECT * FROM VALUES ('invalidFormat
SELECT to_binary('abc', fmtField) FROM fmtTable;
-- Clean up
DROP VIEW IF EXISTS fmtTable;
-- luhn_check
-- basic cases
select luhn_check('4111111111111111');
select luhn_check('5500000000000004');
select luhn_check('340000000000009');
select luhn_check('6011000000000004');
select luhn_check('6011000000000005');
select luhn_check('378282246310006');
select luhn_check('0');
-- spaces in the beginning/middle/end
select luhn_check('4111111111111111 ');
select luhn_check('4111111 111111111');
select luhn_check(' 4111111111111111');
-- space
select luhn_check('');
select luhn_check(' ');
-- non-digits
select luhn_check('510B105105105106');
select luhn_check('ABCDED');
-- null
select luhn_check(null);
-- non string (test implicit cast)
select luhn_check(6011111111111117);
select luhn_check(6011111111111118);
select luhn_check(123.456);

Original file line number Diff line number Diff line change
Expand Up @@ -1710,3 +1710,147 @@ DROP VIEW IF EXISTS fmtTable
struct<>
-- !query output



-- !query
select luhn_check('4111111111111111')
-- !query schema
struct<luhn_check(4111111111111111):boolean>
-- !query output
true


-- !query
select luhn_check('5500000000000004')
-- !query schema
struct<luhn_check(5500000000000004):boolean>
-- !query output
true


-- !query
select luhn_check('340000000000009')
-- !query schema
struct<luhn_check(340000000000009):boolean>
-- !query output
true


-- !query
select luhn_check('6011000000000004')
-- !query schema
struct<luhn_check(6011000000000004):boolean>
-- !query output
true


-- !query
select luhn_check('6011000000000005')
-- !query schema
struct<luhn_check(6011000000000005):boolean>
-- !query output
false


-- !query
select luhn_check('378282246310006')
-- !query schema
struct<luhn_check(378282246310006):boolean>
-- !query output
false


-- !query
select luhn_check('0')
-- !query schema
struct<luhn_check(0):boolean>
-- !query output
true


-- !query
select luhn_check('4111111111111111 ')
-- !query schema
struct<luhn_check(4111111111111111 ):boolean>
-- !query output
false


-- !query
select luhn_check('4111111 111111111')
-- !query schema
struct<luhn_check(4111111 111111111):boolean>
-- !query output
false


-- !query
select luhn_check(' 4111111111111111')
-- !query schema
struct<luhn_check( 4111111111111111):boolean>
-- !query output
false


-- !query
select luhn_check('')
-- !query schema
struct<luhn_check():boolean>
-- !query output
false


-- !query
select luhn_check(' ')
-- !query schema
struct<luhn_check( ):boolean>
-- !query output
false


-- !query
select luhn_check('510B105105105106')
-- !query schema
struct<luhn_check(510B105105105106):boolean>
-- !query output
false


-- !query
select luhn_check('ABCDED')
-- !query schema
struct<luhn_check(ABCDED):boolean>
-- !query output
false


-- !query
select luhn_check(null)
-- !query schema
struct<luhn_check(NULL):boolean>
-- !query output
NULL


-- !query
select luhn_check(6011111111111117)
-- !query schema
struct<luhn_check(6011111111111117):boolean>
-- !query output
true


-- !query
select luhn_check(6011111111111118)
-- !query schema
struct<luhn_check(6011111111111118):boolean>
-- !query output
false


-- !query
select luhn_check(123.456)
-- !query schema
struct<luhn_check(123.456):boolean>
-- !query output
false
Loading

0 comments on commit b0ac061

Please sign in to comment.