Skip to content

Commit

Permalink
🐛 Postgres Source: fixed unsupported date-time datatypes during incre…
Browse files Browse the repository at this point in the history
…mental sync (#13655)

* Postgres Source: fixed unsupposted date-time datatypes during incremental sync

* updated CHANGELOG

* add tests for incremental cursor check

* removed star import

* Postgres Source: fixed unsupposted date-time datatypes during incremental sync

* updated CHANGELOG

* add tests for incremental cursor check

* removed star import

* add timestamp datatype test

* Bump version in Dockerfile

* auto-bump connector version

Co-authored-by: grishick <greg@airbyte.io>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 13, 2022
1 parent e7f8128 commit a3ca3ab
Show file tree
Hide file tree
Showing 8 changed files with 385 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.21
dockerImageTag: 0.4.22
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6719,7 +6719,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.21"
- dockerImage: "airbyte/source-postgres:0.4.22"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,13 @@ void testReadOneColumn() throws Exception {

setEmittedAtToNull(actualMessages);

final List<AirbyteMessage> expectedMessages = getAirbyteMessagesReadOneColumn();
assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getAirbyteMessagesReadOneColumn() {
final List<AirbyteMessage> expectedMessages = getTestMessages().stream()
.map(Jsons::clone)
.peek(m -> {
Expand All @@ -397,9 +404,7 @@ void testReadOneColumn() throws Exception {
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
return expectedMessages;
}

@Test
Expand Down Expand Up @@ -432,17 +437,7 @@ void testReadMultipleTables() throws Exception {
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING)));

final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
m.getRecord().setNamespace(getDefaultNamespace());
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesSecondSync(streamName2);
expectedMessages.addAll(secondStreamExpectedMessages);
}

Expand All @@ -456,6 +451,21 @@ void testReadMultipleTables() throws Exception {
assertTrue(actualMessages.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getAirbyteMessagesSecondSync(String streamName2) {
return getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
m.getRecord().setNamespace(getDefaultNamespace());
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());

}

@Test
void testTablesWithQuoting() throws Exception {
final ConfiguredAirbyteStream streamForTableWithSpaces = createTableWithSpaces();
Expand All @@ -469,7 +479,17 @@ void testTablesWithQuoting() throws Exception {

setEmittedAtToNull(actualMessages);

final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesForTablesWithQuoting(streamForTableWithSpaces);
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages());
expectedMessages.addAll(secondStreamExpectedMessages);

assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) {
return getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
Expand All @@ -481,12 +501,6 @@ void testTablesWithQuoting() throws Exception {
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages());
expectedMessages.addAll(secondStreamExpectedMessages);

assertTrue(expectedMessages.size() == actualMessages.size());
assertTrue(expectedMessages.containsAll(actualMessages));
assertTrue(actualMessages.containsAll(expectedMessages));
}

@SuppressWarnings("ResultOfMethodCallIgnored")
Expand Down Expand Up @@ -532,6 +546,17 @@ void testIncrementalStringCheckCursor() throws Exception {
void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {
final ConfiguredAirbyteStream streamWithSpaces = createTableWithSpaces();

final ArrayList<AirbyteMessage> expectedRecordMessages = getAirbyteMessagesCheckCursorSpaceInColumnName(streamWithSpaces);
incrementalCursorCheck(
COL_LAST_NAME_WITH_SPACE,
COL_LAST_NAME_WITH_SPACE,
"patent",
"vash",
expectedRecordMessages,
streamWithSpaces);
}

protected ArrayList<AirbyteMessage> getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) {
final AirbyteMessage firstMessage = getTestMessages().get(0);
firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName());
((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT);
Expand All @@ -546,21 +571,15 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {

Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2));

incrementalCursorCheck(
COL_LAST_NAME_WITH_SPACE,
COL_LAST_NAME_WITH_SPACE,
"patent",
"vash",
Lists.newArrayList(firstMessage, secondMessage),
streamWithSpaces);
return Lists.newArrayList(firstMessage, secondMessage);
}

@Test
void testIncrementalTimestampCheckCursor() throws Exception {
incrementalTimestampCheck();
void testIncrementalDateCheckCursor() throws Exception {
incrementalDateCheck();
}

protected void incrementalTimestampCheck() throws Exception {
protected void incrementalDateCheck() throws Exception {
incrementalCursorCheck(
COL_UPDATED_AT,
"2005-10-18T00:00:00Z",
Expand Down Expand Up @@ -600,14 +619,7 @@ void testReadOneTableIncrementallyTwice() throws Exception {
.filter(r -> r.getType() == Type.STATE).findFirst();
assertTrue(stateAfterFirstSyncOptional.isPresent());

database.execute(connection -> {
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
});
executeStatementReadIncrementallyTwice();

final List<AirbyteMessage> actualMessagesSecondSync = MoreIterators
.toList(source.read(config, configuredCatalog,
Expand All @@ -624,6 +636,17 @@ void testReadOneTableIncrementallyTwice() throws Exception {
assertTrue(actualMessagesSecondSync.containsAll(expectedMessages));
}

protected void executeStatementReadIncrementallyTwice() throws SQLException {
database.execute(connection -> {
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
connection.createStatement().execute(
String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
getFullyQualifiedTableName(TABLE_NAME)));
});
}

protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(Type.RECORD)
Expand Down Expand Up @@ -696,16 +719,7 @@ void testReadMultipleTablesIncrementally() throws Exception {

// we know the second streams messages are the same as the first minus the updated at column. so we
// cheat and generate the expected messages off of the first expected messages.
final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesSecondStreamWithNamespace(streamName2);
final List<AirbyteMessage> expectedMessagesFirstSync = new ArrayList<>(getTestMessages());
expectedMessagesFirstSync.add(new AirbyteMessage()
.withType(Type.STATE)
Expand Down Expand Up @@ -748,6 +762,19 @@ void testReadMultipleTablesIncrementally() throws Exception {
assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync));
}

protected List<AirbyteMessage> getAirbyteMessagesSecondStreamWithNamespace(String streamName2) {
return getTestMessages()
.stream()
.map(Jsons::clone)
.peek(m -> {
m.getRecord().setStream(streamName2);
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
})
.collect(Collectors.toList());
}

// when initial and final cursor fields are the same.
protected void incrementalCursorCheck(
final String cursorField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
}

@Override
protected void incrementalTimestampCheck() throws Exception {
protected void incrementalDateCheck() throws Exception {
super.incrementalCursorCheck(COL_UPDATED_AT,
"2005-10-18",
"2006-10-19",
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.21
LABEL io.airbyte.version=0.4.22
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -30,6 +29,8 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.util.Collections;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.slf4j.Logger;
Expand Down Expand Up @@ -79,15 +80,57 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
Date date = Date.valueOf(value);
preparedStatement.setDate(parameterIndex, date);
} catch (final Exception e) {
throw new RuntimeException(e);
public void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
final JDBCType cursorFieldType,
final String value)
throws SQLException {
switch (cursorFieldType) {

case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value);
case TIMESTAMP_WITH_TIMEZONE -> setTimestampWithTimezone(preparedStatement, parameterIndex, value);
case TIME -> setTime(preparedStatement, parameterIndex, value);
case TIME_WITH_TIMEZONE -> setTimeWithTimezone(preparedStatement, parameterIndex, value);
case DATE -> setDate(preparedStatement, parameterIndex, value);
case BIT -> setBit(preparedStatement, parameterIndex, value);
case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value);
case TINYINT, SMALLINT -> setShortInt(preparedStatement, parameterIndex, value);
case INTEGER -> setInteger(preparedStatement, parameterIndex, value);
case BIGINT -> setBigInteger(preparedStatement, parameterIndex, value);
case FLOAT, DOUBLE -> setDouble(preparedStatement, parameterIndex, value);
case REAL -> setReal(preparedStatement, parameterIndex, value);
case NUMERIC, DECIMAL -> setDecimal(preparedStatement, parameterIndex, value);
case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> setString(preparedStatement, parameterIndex, value);
case BINARY, BLOB -> setBinary(preparedStatement, parameterIndex, value);
// since cursor are expected to be comparable, handle cursor typing strictly and error on
// unrecognized types
default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType));
}
}

private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
}

private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
}

@Override
protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
}

@Override
protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setObject(parameterIndex, LocalDate.parse(value));
}

@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();
Expand Down
Loading

0 comments on commit a3ca3ab

Please sign in to comment.