From 52f7032363c4bfdfb2c4db3214c0e2594853fbfa Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Wed, 30 Aug 2023 22:34:06 -0600 Subject: [PATCH] feat(flink): allow translation of decimal literals --- ibis/backends/flink/utils.py | 11 +++++++++++ ibis/backends/tests/test_numeric.py | 24 ++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/ibis/backends/flink/utils.py b/ibis/backends/flink/utils.py index 19762b365fe8..3910330f518b 100644 --- a/ibis/backends/flink/utils.py +++ b/ibis/backends/flink/utils.py @@ -282,6 +282,17 @@ def translate_literal(op: ops.Literal) -> str: raise ValueError("NaN is not supported in Flink SQL") elif math.isinf(value): raise ValueError("Infinity is not supported in Flink SQL") + elif dtype.is_decimal(): + # When PyFlink infers schema from `decimal.Decimal` objects, + # it will be `DecimalType(38, 18)`. + # https://github.com/apache/flink/blob/release-1.17.1/flink-python/pyflink/table/types.py#L336-L337 + precision = 38 if dtype.precision is None else dtype.precision + scale = 18 if dtype.scale is None else dtype.scale + + if precision > 38: + raise ValueError("The precision can be up to 38 in Flink") + + return f"CAST({value} AS DECIMAL({precision}, {scale}))" return f"CAST({value} AS {_to_pyflink_types[type(dtype)]!s})" elif dtype.is_timestamp(): # TODO(chloeh13q): support timestamp with local timezone diff --git a/ibis/backends/tests/test_numeric.py b/ibis/backends/tests/test_numeric.py index b7e35b99e7c2..b2bebea0720e 100644 --- a/ibis/backends/tests/test_numeric.py +++ b/ibis/backends/tests/test_numeric.py @@ -269,6 +269,7 @@ def test_numeric_literal(con, backend, expr, expected_types): "druid": 1.1, "datafusion": decimal.Decimal("1.1"), "oracle": 1.1, + "flink": decimal.Decimal("1.1"), }, { "bigquery": "NUMERIC", @@ -277,6 +278,7 @@ def test_numeric_literal(con, backend, expr, expected_types): "trino": "decimal(2,1)", "duckdb": "DECIMAL(18,3)", "postgres": "numeric", + "flink": "DECIMAL(38, 18) NOT NULL", }, marks=[ pytest.mark.notimpl( @@ -316,6 +318,7 @@ def test_numeric_literal(con, backend, expr, expected_types): "druid": 1.1, "datafusion": decimal.Decimal("1.1"), "oracle": 1.1, + "flink": decimal.Decimal("1.1"), }, { "bigquery": "NUMERIC", @@ -325,6 +328,7 @@ def test_numeric_literal(con, backend, expr, expected_types): "trino": "decimal(2,1)", "duckdb": "DECIMAL(38,9)", "postgres": "numeric", + "flink": "DECIMAL(38, 9) NOT NULL", }, marks=[ pytest.mark.broken( @@ -386,6 +390,11 @@ def test_numeric_literal(con, backend, expr, expected_types): raises=sa.exc.ProgrammingError, ), pytest.mark.notyet(["datafusion"], raises=Exception), + pytest.mark.notyet( + ["flink"], + "The precision can be up to 38 in Flink", + raises=ValueError, + ), ], id="decimal-big", ), @@ -461,6 +470,11 @@ def test_numeric_literal(con, backend, expr, expected_types): "(oracledb.exceptions.DatabaseError) DPY-4004: invalid number", raises=sa.exc.DatabaseError, ), + pytest.mark.notyet( + ["flink"], + "Infinity is not supported in Flink SQL", + raises=ValueError, + ), ], id="decimal-infinity+", ), @@ -536,6 +550,11 @@ def test_numeric_literal(con, backend, expr, expected_types): "(oracledb.exceptions.DatabaseError) DPY-4004: invalid number", raises=sa.exc.DatabaseError, ), + pytest.mark.notyet( + ["flink"], + "Infinity is not supported in Flink SQL", + raises=ValueError, + ), ], id="decimal-infinity-", ), @@ -622,6 +641,11 @@ def test_numeric_literal(con, backend, expr, expected_types): "(oracledb.exceptions.DatabaseError) DPY-4004: invalid number", raises=sa.exc.DatabaseError, ), + pytest.mark.notyet( + ["flink"], + "NaN is not supported in Flink SQL", + raises=ValueError, + ), ], id="decimal-NaN", ),