Skip to content

Commit

Permalink
feat(flink): allow translation of decimal literals
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman authored and jcrist committed Aug 31, 2023
1 parent 6f37a06 commit 52f7032
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
11 changes: 11 additions & 0 deletions ibis/backends/flink/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ def translate_literal(op: ops.Literal) -> str:
raise ValueError("NaN is not supported in Flink SQL")
elif math.isinf(value):
raise ValueError("Infinity is not supported in Flink SQL")
elif dtype.is_decimal():
# When PyFlink infers schema from `decimal.Decimal` objects,
# it will be `DecimalType(38, 18)`.
# https://github.com/apache/flink/blob/release-1.17.1/flink-python/pyflink/table/types.py#L336-L337
precision = 38 if dtype.precision is None else dtype.precision
scale = 18 if dtype.scale is None else dtype.scale

if precision > 38:
raise ValueError("The precision can be up to 38 in Flink")

return f"CAST({value} AS DECIMAL({precision}, {scale}))"
return f"CAST({value} AS {_to_pyflink_types[type(dtype)]!s})"
elif dtype.is_timestamp():
# TODO(chloeh13q): support timestamp with local timezone
Expand Down
24 changes: 24 additions & 0 deletions ibis/backends/tests/test_numeric.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ def test_numeric_literal(con, backend, expr, expected_types):
"druid": 1.1,
"datafusion": decimal.Decimal("1.1"),
"oracle": 1.1,
"flink": decimal.Decimal("1.1"),
},
{
"bigquery": "NUMERIC",
Expand All @@ -277,6 +278,7 @@ def test_numeric_literal(con, backend, expr, expected_types):
"trino": "decimal(2,1)",
"duckdb": "DECIMAL(18,3)",
"postgres": "numeric",
"flink": "DECIMAL(38, 18) NOT NULL",
},
marks=[
pytest.mark.notimpl(
Expand Down Expand Up @@ -316,6 +318,7 @@ def test_numeric_literal(con, backend, expr, expected_types):
"druid": 1.1,
"datafusion": decimal.Decimal("1.1"),
"oracle": 1.1,
"flink": decimal.Decimal("1.1"),
},
{
"bigquery": "NUMERIC",
Expand All @@ -325,6 +328,7 @@ def test_numeric_literal(con, backend, expr, expected_types):
"trino": "decimal(2,1)",
"duckdb": "DECIMAL(38,9)",
"postgres": "numeric",
"flink": "DECIMAL(38, 9) NOT NULL",
},
marks=[
pytest.mark.broken(
Expand Down Expand Up @@ -386,6 +390,11 @@ def test_numeric_literal(con, backend, expr, expected_types):
raises=sa.exc.ProgrammingError,
),
pytest.mark.notyet(["datafusion"], raises=Exception),
pytest.mark.notyet(
["flink"],
"The precision can be up to 38 in Flink",
raises=ValueError,
),
],
id="decimal-big",
),
Expand Down Expand Up @@ -461,6 +470,11 @@ def test_numeric_literal(con, backend, expr, expected_types):
"(oracledb.exceptions.DatabaseError) DPY-4004: invalid number",
raises=sa.exc.DatabaseError,
),
pytest.mark.notyet(
["flink"],
"Infinity is not supported in Flink SQL",
raises=ValueError,
),
],
id="decimal-infinity+",
),
Expand Down Expand Up @@ -536,6 +550,11 @@ def test_numeric_literal(con, backend, expr, expected_types):
"(oracledb.exceptions.DatabaseError) DPY-4004: invalid number",
raises=sa.exc.DatabaseError,
),
pytest.mark.notyet(
["flink"],
"Infinity is not supported in Flink SQL",
raises=ValueError,
),
],
id="decimal-infinity-",
),
Expand Down Expand Up @@ -622,6 +641,11 @@ def test_numeric_literal(con, backend, expr, expected_types):
"(oracledb.exceptions.DatabaseError) DPY-4004: invalid number",
raises=sa.exc.DatabaseError,
),
pytest.mark.notyet(
["flink"],
"NaN is not supported in Flink SQL",
raises=ValueError,
),
],
id="decimal-NaN",
),
Expand Down

0 comments on commit 52f7032

Please sign in to comment.