From 5d936cc7133e674202e30b8ba1e9f66cbcf6a91c Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Mon, 5 Dec 2022 17:57:17 -0800 Subject: [PATCH] Move handling TIMESTAMP_WITH_TIMEZONE to common JdbcSourceOperations (#19860) * Refactor * Fix build failures --- .../airbyte/db/jdbc/JdbcSourceOperations.java | 3 +- .../SnowflakeSourceOperations.java | 37 +++---------------- 2 files changed, 7 insertions(+), 33 deletions(-) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java index 5950da239a6a..2c38b91b679c 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java @@ -55,6 +55,7 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob case DATE -> putDate(json, columnName, resultSet, colIndex); case TIME -> putTime(json, columnName, resultSet, colIndex); case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); + case TIMESTAMP_WITH_TIMEZONE -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex); case ARRAY -> putArray(json, columnName, resultSet, colIndex); default -> putDefault(json, columnName, resultSet, colIndex); @@ -103,7 +104,7 @@ public JDBCType getFieldType(final JsonNode field) { } @Override - public boolean isCursorType(JDBCType type) { + public boolean isCursorType(final JDBCType type) { return ALLOWED_CURSOR_TYPES.contains(type); } diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java index f1791b157c07..48043913662a 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java @@ -93,44 +93,17 @@ public JsonSchemaType getJsonType(final JDBCType jdbcType) { }; } - /** - * The only difference between this method and the one in {@link JdbcSourceOperations} is that the - * TIMESTAMP_WITH_TIMEZONE columns are also converted using the putTimestamp method. This is - * necessary after the JDBC upgrade from 3.13.9 to 3.13.22. This change may need to be added to - * {@link JdbcSourceOperations#setJsonField} in the future. - *

- * See issue: https://github.com/airbytehq/airbyte/issues/16838. - */ @Override public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { - final int columnTypeInt = resultSet.getMetaData().getColumnType(colIndex); final String columnName = resultSet.getMetaData().getColumnName(colIndex); final String columnTypeName = resultSet.getMetaData().getColumnTypeName(colIndex).toLowerCase(); - final JDBCType columnType = safeGetJdbcType(columnTypeInt); // TIMESTAMPLTZ data type detected as JDBCType.TIMESTAMP which is not correct if ("TIMESTAMPLTZ".equalsIgnoreCase(columnTypeName)) { putTimestampWithTimezone(json, columnName, resultSet, colIndex); return; } - // https://www.cis.upenn.edu/~bcpierce/courses/629/jdkdocs/guide/jdbc/getstart/mapping.doc.html - switch (columnType) { - case BIT, BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); - case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex); - case INTEGER -> putInteger(json, columnName, resultSet, colIndex); - case BIGINT -> putBigInt(json, columnName, resultSet, colIndex); - case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex); - case REAL -> putFloat(json, columnName, resultSet, colIndex); - case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex); - case CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex); - case DATE -> putDate(json, columnName, resultSet, colIndex); - case TIME -> putTime(json, columnName, resultSet, colIndex); - case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); - case TIMESTAMP_WITH_TIMEZONE -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); - case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex); - case ARRAY -> putArray(json, columnName, resultSet, colIndex); - default -> putDefault(json, columnName, resultSet, colIndex); - } + super.setJsonField(resultSet, colIndex, json); } @Override @@ -140,25 +113,25 @@ protected void setDate(final PreparedStatement preparedStatement, final int para } @Override - protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + protected void putTimestampWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { final Timestamp timestamp = resultSet.getTimestamp(index); node.put(columnName, DateTimeConverter.convertToTimestampWithTimezone(timestamp)); } @Override - protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { final Timestamp timestamp = resultSet.getTimestamp(index); node.put(columnName, DateTimeConverter.convertToTimestamp(timestamp)); } @Override - protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { final Date date = resultSet.getDate(index); node.put(columnName, DateTimeConverter.convertToDate(date)); } @Override - protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { // resultSet.getTime() will lose nanoseconds precision final LocalTime localTime = resultSet.getTimestamp(index).toLocalDateTime().toLocalTime(); node.put(columnName, DateTimeConverter.convertToTime(localTime));