From f201b0bf0755f90be7b2e88f548d61d8c79d9412 Mon Sep 17 00:00:00 2001 From: ntnhaatj Date: Fri, 17 Feb 2023 10:46:21 +0700 Subject: [PATCH] fix: coerce some types from query result set --- dbt/adapters/spark_livy/livysession.py | 56 +++++++++++++++++++++++--- tests/unit/test_livysession.py | 18 +++++++++ 2 files changed, 69 insertions(+), 5 deletions(-) create mode 100644 tests/unit/test_livysession.py diff --git a/dbt/adapters/spark_livy/livysession.py b/dbt/adapters/spark_livy/livysession.py index 7d10532b7..0afa3a6a7 100644 --- a/dbt/adapters/spark_livy/livysession.py +++ b/dbt/adapters/spark_livy/livysession.py @@ -19,12 +19,13 @@ import json import time +import re import requests from urllib import response import datetime as dt from types import TracebackType -from typing import Any +from typing import Any, Optional import dbt.exceptions from dbt.events import AdapterLogger @@ -37,6 +38,39 @@ DEFAULT_POLL_WAIT = 2 +_TIMESTAMP_PATTERN = re.compile(r'(\d+-\d+-\d+T\d+:\d+:\d+(\.\d{,6})?Z)') + + +def __parse_timestamp_from_string(value: str) -> Optional[dt.datetime]: + if not value: + return None + + match = _TIMESTAMP_PATTERN.match(value) + if not match: + raise Exception( + 'Cannot convert "{}" into a datetime'.format(value)) + + if match.group(2): + fmt = '%Y-%m-%dT%H:%M:%S.%fZ' + # use the pattern to truncate the value + value = match.group() + else: + fmt = '%Y-%m-%dT%H:%M:%SZ' + + return dt.datetime.strptime(value, fmt) + + +TYPES_CONVERTER = {"timestamp": __parse_timestamp_from_string} + + +def _maybe_coerce_value_from_string(value, type_=None): + """coerce string from response into supported data type (eg. datetime)""" + maybe_converter = TYPES_CONVERTER.get(type_, None) + if maybe_converter and type_: + return maybe_converter(value) + return value + + class LivySession: def __init__(self, connect_url, auth, headers, verify_ssl_certificate): self.connect_url = connect_url @@ -262,6 +296,20 @@ def _getLivyResult(self, res_obj): return res time.sleep(DEFAULT_POLL_WAIT) + @classmethod + def _coerce_some_primitive_types(cls, schema, rows): + """ as each SQL adapter is now required to ad logic + to coerce some types from response (eg. timestamp) + https://github.com/dbt-labs/dbt-core/pull/3499 + """ + converted_rows = [] + for row in rows: + converted_rows.append([ + _maybe_coerce_value_from_string(col, col_schema['type']) + for col, col_schema in zip(row, schema) + ]) + return converted_rows + def execute(self, sql: str, *parameters: Any) -> None: """ Execute a sql statement. @@ -293,10 +341,8 @@ def execute(self, sql: str, *parameters: Any) -> None: # values = res['output']['data']['application/json'] values = res['output']['data']['application/json'] if (len(values) >= 1): - self._rows = values['data'] # values[0]['values'] - self._schema = values['schema']['fields'] # values[0]['schema'] - # print("rows", self._rows) - # print("schema", self._schema) + self._schema = values['schema']['fields'] + self._rows = self._coerce_some_primitive_types(self._schema, values['data']) else: self._rows = [] self._schema = [] diff --git a/tests/unit/test_livysession.py b/tests/unit/test_livysession.py new file mode 100644 index 000000000..db6982c04 --- /dev/null +++ b/tests/unit/test_livysession.py @@ -0,0 +1,18 @@ +import unittest + +from dbt.adapters.spark_livy.livysession import LivyCursor +import datetime as dt + + +class TestLivySession(unittest.TestCase): + def test_coerce_some_column_types_from_response_rows(self): + schema = [ + {'name': 'created_at', 'type': 'timestamp', 'nullable': True, 'metadata': {}}, + {'name': 'literal_str', 'type': 'string', 'nullable': False, 'metadata': {}}, + {'name': 'number', 'type': 'decimal', 'nullable': False, 'metadata': {}} + ] + res_rows = [['2023-02-06T17:03:30Z', 'literal string', '12'], ['2023-02-16T09:20:24Z', 'literal a', '20']] + rows = LivyCursor._coerce_some_primitive_types(schema, res_rows) + expected_rows = [[dt.datetime(2023, 2, 6, 17, 3, 30), 'literal string', '12'], + [dt.datetime(2023, 2, 16, 9, 20, 24), 'literal a', '20']] + self.assertEqual(rows, expected_rows)