Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback to parsing datetime and time strings w/ and w/o timezones in case DateTimeParseException is thrown #13745

Merged
merged 5 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.22
dockerImageTag: 0.4.23
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6719,7 +6719,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.22"
- dockerImage: "airbyte/source-postgres:0.4.23"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.22
LABEL io.airbyte.version=0.4.23
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.format.DateTimeParseException;
import java.util.Collections;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.slf4j.Logger;
Expand Down Expand Up @@ -108,22 +109,42 @@ public void setStatementField(final PreparedStatement preparedStatement,
}
}

private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
private void setTimeWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
} catch (final DateTimeParseException e) {
//attempt to parse the time w/o timezone. This can be caused by schema created with a different version of the connector
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
}
}

private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
private void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
} catch (final DateTimeParseException e) {
//attempt to parse the datetime w/o timezone. This can be caused by schema created with a different version of the connector
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
}
}

@Override
protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
protected void setTimestamp(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
} catch (final DateTimeParseException e) {
//attempt to parse the datetime with timezone. This can be caused by schema created with an older version of the connector
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
}
}

@Override
protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
} catch (final DateTimeParseException e) {
//attempt to parse the datetime with timezone. This can be caused by schema created with an older version of the connector
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
}
}

@Override
Expand Down Expand Up @@ -170,21 +191,21 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
}

@Override
protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
node.put(columnName, resolveEra(date, date.toString()));
}

@Override
protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
node.put(columnName, time.toString());
}

@Override
protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
LocalDate date = timestamp.toLocalDate();
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();
node.put(columnName, resolveEra(date, timestamp.toString()));
}

Expand Down Expand Up @@ -214,7 +235,7 @@ public JDBCType getFieldType(final JsonNode field) {
}

@Override
public JsonSchemaType getJsonType(JDBCType jdbcType) {
public JsonSchemaType getJsonType(final JDBCType jdbcType) {
return switch (jdbcType) {
case BOOLEAN -> JsonSchemaType.BOOLEAN;
case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaType.NUMBER;
Expand Down Expand Up @@ -264,7 +285,7 @@ private void putHstoreAsJson(final ObjectNode node, final String columnName, fin
final var data = resultSet.getObject(index);
try {
node.put(columnName, OBJECT_MAPPER.writeValueAsString(data));
} catch (JsonProcessingException e) {
} catch (final JsonProcessingException e) {
throw new RuntimeException("Could not parse 'hstore' value:" + e);
}
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------|
| 0.4.23 | 2022-06-13 | [13655](https://github.com/airbytehq/airbyte/pull/13745) | Fixed handling datetime cursors when upgrading from older versions of the connector |
| 0.4.22 | 2022-06-09 | [13655](https://github.com/airbytehq/airbyte/pull/13655) | Fixed bug with unsupported date-time datatypes during incremental sync |
| 0.4.21 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size |
| 0.4.20 | 2022-06-02 | [13367](https://github.com/airbytehq/airbyte/pull/13367) | Added convertion hstore to json format |
Expand Down