From 480cde2ea0a4b8f3df77cf2dc5c89042bfb72ff1 Mon Sep 17 00:00:00 2001 From: grishick Date: Mon, 13 Jun 2022 19:57:44 -0700 Subject: [PATCH 1/5] Fall back to parsing w/ or w/o TZ if parsing a date or a time string fails --- .../postgres/PostgresSourceOperations.java | 62 +++++++++++++------ 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 798286efb297..a49553aaeb4b 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -31,6 +31,7 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.OffsetTime; +import java.time.format.DateTimeParseException; import java.util.Collections; import org.postgresql.jdbc.PgResultSetMetaData; import org.slf4j.Logger; @@ -108,27 +109,52 @@ public void setStatementField(final PreparedStatement preparedStatement, } } - private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { - preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); + private void setTimeWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); + } catch (final DateTimeParseException e) { + //attempt to parse the time w/o timezone. This can be caused by schema created with a different version of the connector + preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); + } } - private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { - preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + private void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } catch (final DateTimeParseException e) { + //attempt to parse the datetime w/o timezone. This can be caused by schema created with a different version of the connector + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } } @Override - protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { - preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + protected void setTimestamp(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } catch (final DateTimeParseException e) { + //attempt to parse the datetime with timezone. This can be caused by schema created with an older version of the connector + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } } @Override - protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { - preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); + protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } catch (final DateTimeParseException e) { + //attempt to parse the datetime with timezone. This can be caused by schema created with an older version of the connector + preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); + } } @Override protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { - preparedStatement.setObject(parameterIndex, LocalDate.parse(value)); + try { + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } catch (final DateTimeParseException e) { + //attempt to parse the date with timezone. This can be caused by schema created with an older version of the connector + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } } @Override @@ -170,21 +196,21 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob } @Override - protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class); + protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class); node.put(columnName, resolveEra(date, date.toString())); } @Override - protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class); + protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class); node.put(columnName, time.toString()); } @Override - protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); - LocalDate date = timestamp.toLocalDate(); + protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); + final LocalDate date = timestamp.toLocalDate(); node.put(columnName, resolveEra(date, timestamp.toString())); } @@ -214,7 +240,7 @@ public JDBCType getFieldType(final JsonNode field) { } @Override - public JsonSchemaType getJsonType(JDBCType jdbcType) { + public JsonSchemaType getJsonType(final JDBCType jdbcType) { return switch (jdbcType) { case BOOLEAN -> JsonSchemaType.BOOLEAN; case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaType.NUMBER; @@ -264,7 +290,7 @@ private void putHstoreAsJson(final ObjectNode node, final String columnName, fin final var data = resultSet.getObject(index); try { node.put(columnName, OBJECT_MAPPER.writeValueAsString(data)); - } catch (JsonProcessingException e) { + } catch (final JsonProcessingException e) { throw new RuntimeException("Could not parse 'hstore' value:" + e); } } From 0dc0945cd6f9623ea2f37d2de30017d689e06347 Mon Sep 17 00:00:00 2001 From: grishick Date: Mon, 13 Jun 2022 19:58:25 -0700 Subject: [PATCH 2/5] Bump connector version --- airbyte-integrations/connectors/source-postgres/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 1b07db6a7749..f201d4184b74 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.22 +LABEL io.airbyte.version=0.4.23 LABEL io.airbyte.name=airbyte/source-postgres From 9faa268bb9e901dbd9898e2c7ff0b87a8fcf6e54 Mon Sep 17 00:00:00 2001 From: grishick Date: Mon, 13 Jun 2022 20:15:11 -0700 Subject: [PATCH 3/5] Fix a typo and remove catch clause for setDate --- .../source/postgres/PostgresSourceOperations.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index a49553aaeb4b..3b2a9e8e29ff 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -140,7 +140,7 @@ protected void setTimestamp(final PreparedStatement preparedStatement, final int @Override protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { try { - preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); } catch (final DateTimeParseException e) { //attempt to parse the datetime with timezone. This can be caused by schema created with an older version of the connector preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); @@ -149,12 +149,7 @@ protected void setTime(final PreparedStatement preparedStatement, final int para @Override protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { - try { - preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); - } catch (final DateTimeParseException e) { - //attempt to parse the date with timezone. This can be caused by schema created with an older version of the connector - preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); - } + preparedStatement.setObject(parameterIndex, LocalDate.parse(value)); } @Override From 3bc972704d8676020a449fc0bc48d93b64cc5b0f Mon Sep 17 00:00:00 2001 From: grishick Date: Mon, 13 Jun 2022 21:02:07 -0700 Subject: [PATCH 4/5] Update change history --- docs/integrations/sources/postgres.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 4b3781a3e55c..0f8437a4f373 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -275,6 +275,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 0.4.23 | 2022-06-13 | [13655](https://github.com/airbytehq/airbyte/pull/13745) | Fixed handling datetime cursors when upgrading from older versions of the connector | | 0.4.22 | 2022-06-09 | [13655](https://github.com/airbytehq/airbyte/pull/13655) | Fixed bug with unsupported date-time datatypes during incremental sync | | 0.4.21 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size | | 0.4.20 | 2022-06-02 | [13367](https://github.com/airbytehq/airbyte/pull/13367) | Added convertion hstore to json format | From 7fdeae5ff9e7ecbb1f37536554c8160f6677b870 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Tue, 14 Jun 2022 06:26:49 +0000 Subject: [PATCH 5/5] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 58334a7a5717..37c10f5c055d 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -715,7 +715,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 0.4.22 + dockerImageTag: 0.4.23 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 88c0959a6e00..61ae57001ebe 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6719,7 +6719,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:0.4.22" +- dockerImage: "airbyte/source-postgres:0.4.23" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: