Skip to content

Commit

Permalink
DB sources data type support - adding new data types (#19679)
Browse files Browse the repository at this point in the history
* Introduce JsonSchemaTypeUtil + refactor code

* Update JsonSchemaPrimitiveUtil.java

Comment

* Update dynamodb.md

* Create acceptance-test-config.yml

* fix merge break

* formatting

* Fix formatting

* Code cleanup

* Update DataTypeEnumTest.java
  • Loading branch information
akashkulk authored Nov 29, 2022
1 parent 34c0e95 commit 865b580
Show file tree
Hide file tree
Showing 16 changed files with 225 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import org.junit.jupiter.api.Test;

class DataTypeEnumTest {
Expand All @@ -18,7 +18,7 @@ class DataTypeEnumTest {
@Test
void testConversionFromJsonSchemaPrimitiveToDataType() {
assertEquals(5, DataType.class.getEnumConstants().length);
assertEquals(6, JsonSchemaPrimitive.class.getEnumConstants().length);
assertEquals(16, JsonSchemaPrimitive.class.getEnumConstants().length);

assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase()));
assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
package io.airbyte.db;

import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import java.util.Optional;

public class IncrementalUtils {
Expand All @@ -26,7 +27,7 @@ public static String getCursorField(final ConfiguredAirbyteStream stream) {
public static Optional<String> getCursorFieldOptional(final ConfiguredAirbyteStream stream) {
try {
return Optional.ofNullable(getCursorField(stream));
} catch (IllegalStateException e) {
} catch (final IllegalStateException e) {
return Optional.empty();
}
}
Expand All @@ -41,12 +42,18 @@ public static JsonSchemaPrimitive getCursorType(final ConfiguredAirbyteStream st
String.format("Could not find cursor field: %s in schema for stream: %s.", cursorField, stream.getStream().getName()));
}

if (stream.getStream().getJsonSchema().get(PROPERTIES).get(cursorField).get("type") == null) {
if (stream.getStream().getJsonSchema().get(PROPERTIES).get(cursorField).get("type") == null &&
stream.getStream().getJsonSchema().get(PROPERTIES).get(cursorField).get("$ref") == null) {
throw new IllegalStateException(
String.format("Could not find cursor type for field: %s in schema for stream: %s.", cursorField, stream.getStream().getName()));
}

return JsonSchemaPrimitive.valueOf(stream.getStream().getJsonSchema().get(PROPERTIES).get(cursorField).get("type").asText().toUpperCase());
if (stream.getStream().getJsonSchema().get(PROPERTIES).get(cursorField).get("type") == null) {
return JsonSchemaPrimitiveUtil.PRIMITIVE_TO_REFERENCE_BIMAP.inverse()
.get(stream.getStream().getJsonSchema().get(PROPERTIES).get(cursorField).get("$ref").asText());
} else {
return JsonSchemaPrimitive.valueOf(stream.getStream().getJsonSchema().get(PROPERTIES).get(cursorField).get("type").asText().toUpperCase());
}
}

/**
Expand All @@ -72,14 +79,14 @@ public static int compareCursors(final String original, final String candidate,
}

switch (type) {
case STRING -> {
case STRING, STRING_V1, DATE_V1, TIME_WITH_TIMEZONE_V1, TIME_WITHOUT_TIMEZONE_V1, TIMESTAMP_WITH_TIMEZONE_V1, TIMESTAMP_WITHOUT_TIMEZONE_V1 -> {
return original.compareTo(candidate);
}
case NUMBER -> {
case NUMBER, NUMBER_V1, INTEGER_V1 -> {
// todo (cgardens) - handle big decimal. this is currently an overflow risk.
return Double.compare(Double.parseDouble(original), Double.parseDouble(candidate));
}
case BOOLEAN -> {
case BOOLEAN, BOOLEAN_V1 -> {
return Boolean.compare(Boolean.parseBoolean(original), Boolean.parseBoolean(candidate));
}
// includes OBJECT, ARRAY, NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.Collections;
import org.junit.jupiter.api.Assertions;
Expand All @@ -27,6 +27,11 @@ class IncrementalUtilsTest {
STREAM_NAME,
null,
Field.of("ascending_inventory_uuid", JsonSchemaType.STRING));

private static final ConfiguredAirbyteStream STREAM_V1 = CatalogHelpers.createConfiguredAirbyteStream(
STREAM_NAME,
null,
Field.of("ascending_inventory_uuid", JsonSchemaType.STRING_V1));
private static final String ABC = "abc";

@Test
Expand Down Expand Up @@ -54,6 +59,11 @@ void testGetCursorType() {
Assertions.assertEquals(JsonSchemaPrimitive.STRING, IncrementalUtils.getCursorType(STREAM, UUID_FIELD_NAME));
}

@Test
void testGetCursorType_V1() {
Assertions.assertEquals(JsonSchemaPrimitive.STRING_V1, IncrementalUtils.getCursorType(STREAM_V1, UUID_FIELD_NAME));
}

@Test
void testGetCursorTypeNoProperties() {
final ConfiguredAirbyteStream stream = Jsons.clone(STREAM);
Expand All @@ -76,8 +86,10 @@ void testGetCursorTypeCursorHasNoType() {
@Test
void testCompareCursors() {
assertTrue(IncrementalUtils.compareCursors(ABC, "def", JsonSchemaPrimitive.STRING) < 0);
assertTrue(IncrementalUtils.compareCursors(ABC, "def", JsonSchemaPrimitive.STRING_V1) < 0);
Assertions.assertEquals(0, IncrementalUtils.compareCursors(ABC, ABC, JsonSchemaPrimitive.STRING));
assertTrue(IncrementalUtils.compareCursors("1", "2", JsonSchemaPrimitive.NUMBER) < 0);
assertTrue(IncrementalUtils.compareCursors("1", "2", JsonSchemaPrimitive.INTEGER_V1) < 0);
assertTrue(IncrementalUtils.compareCursors("5000000000", "5000000001", JsonSchemaPrimitive.NUMBER) < 0);
assertTrue(IncrementalUtils.compareCursors("false", "true", JsonSchemaPrimitive.BOOLEAN) < 0);
assertTrue(IncrementalUtils.compareCursors(null, "def", JsonSchemaPrimitive.STRING) < 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-dynamodb:dev
acceptance-tests:
spec:
tests:
- spec_path: "main/resources/spec.json"

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
Expand All @@ -25,8 +24,9 @@
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.List;
Expand All @@ -44,23 +44,23 @@ public class DynamodbSource extends BaseConnector implements Source {

private final ObjectMapper objectMapper = new ObjectMapper();

public static void main(String[] args) throws Exception {
Source source = new DynamodbSource();
public static void main(final String[] args) throws Exception {
final Source source = new DynamodbSource();
LOGGER.info("starting Source: {}", DynamodbSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed Source: {}", DynamodbSource.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);
public AirbyteConnectionStatus check(final JsonNode config) {
final var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);

try (var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {
try (final var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {
dynamodbOperations.listTables();

return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Error while listing Dynamodb tables with reason: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED);
Expand All @@ -69,13 +69,13 @@ public AirbyteConnectionStatus check(JsonNode config) {
}

@Override
public AirbyteCatalog discover(JsonNode config) {
public AirbyteCatalog discover(final JsonNode config) {

var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);
final var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);

try (var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {
try (final var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {

var airbyteStreams = dynamodbOperations.listTables().stream()
final var airbyteStreams = dynamodbOperations.listTables().stream()
.map(tb -> new AirbyteStream()
.withName(tb)
.withJsonSchema(Jsons.jsonNode(ImmutableMap.builder()
Expand All @@ -92,20 +92,20 @@ public AirbyteCatalog discover(JsonNode config) {
}

@Override
public AutoCloseableIterator<AirbyteMessage> read(JsonNode config,
ConfiguredAirbyteCatalog catalog,
JsonNode state) {
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final JsonNode state) {

var streamState = DynamodbUtils.deserializeStreamState(state, featureFlags.useStreamCapableState());
final var streamState = DynamodbUtils.deserializeStreamState(state, featureFlags.useStreamCapableState());

StateManager stateManager = StateManagerFactory
final StateManager stateManager = StateManagerFactory
.createStateManager(streamState.airbyteStateType(), streamState.airbyteStateMessages(), catalog);

var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);
final var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);

try (var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {
try (final var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {

var streamIterators = catalog.getStreams().stream()
final var streamIterators = catalog.getStreams().stream()
.map(str -> switch (str.getSyncMode()) {
case INCREMENTAL -> scanIncremental(dynamodbOperations, str.getStream(), str.getCursorField().get(0), stateManager);
case FULL_REFRESH -> scanFullRefresh(dynamodbOperations, str.getStream());
Expand All @@ -117,29 +117,29 @@ public AutoCloseableIterator<AirbyteMessage> read(JsonNode config,
}
}

private AutoCloseableIterator<AirbyteMessage> scanIncremental(DynamodbOperations dynamodbOperations,
AirbyteStream airbyteStream,
String cursorField,
StateManager stateManager) {
private AutoCloseableIterator<AirbyteMessage> scanIncremental(final DynamodbOperations dynamodbOperations,
final AirbyteStream airbyteStream,
final String cursorField,
final StateManager stateManager) {

var streamPair = new AirbyteStreamNameNamespacePair(airbyteStream.getName(), airbyteStream.getNamespace());
final var streamPair = new AirbyteStreamNameNamespacePair(airbyteStream.getName(), airbyteStream.getNamespace());

Optional<CursorInfo> cursorInfo = stateManager.getCursorInfo(streamPair);
final Optional<CursorInfo> cursorInfo = stateManager.getCursorInfo(streamPair);

Map<String, JsonNode> properties = objectMapper.convertValue(airbyteStream.getJsonSchema().get("properties"), new TypeReference<>() {});
Set<String> selectedAttributes = properties.keySet();
final Map<String, JsonNode> properties = objectMapper.convertValue(airbyteStream.getJsonSchema().get("properties"), new TypeReference<>() {});
final Set<String> selectedAttributes = properties.keySet();

// cursor type will be retrieved from the json schema to save time on db schema crawling reading
// large amount of items
String cursorType = properties.get(cursorField).get("type").asText();
final String cursorType = properties.get(cursorField).get("type").asText();

var messageStream = cursorInfo.map(cursor -> {
final var messageStream = cursorInfo.map(cursor -> {

var filterType = switch (cursorType) {
final var filterType = switch (cursorType) {
case "string" -> DynamodbOperations.FilterAttribute.FilterType.S;
case "integer" -> DynamodbOperations.FilterAttribute.FilterType.N;
case "number" -> {
JsonNode airbyteType = properties.get(cursorField).get("airbyte_type");
final JsonNode airbyteType = properties.get(cursorField).get("airbyte_type");
if (airbyteType != null && airbyteType.asText().equals("integer")) {
yield DynamodbOperations.FilterAttribute.FilterType.N;
} else {
Expand All @@ -149,7 +149,7 @@ private AutoCloseableIterator<AirbyteMessage> scanIncremental(DynamodbOperations
default -> throw new UnsupportedOperationException("Unsupported attribute type for filtering");
};

DynamodbOperations.FilterAttribute filterAttribute = new DynamodbOperations.FilterAttribute(
final DynamodbOperations.FilterAttribute filterAttribute = new DynamodbOperations.FilterAttribute(
cursor.getCursorField(),
cursor.getCursor(),
filterType);
Expand All @@ -176,12 +176,12 @@ private AutoCloseableIterator<AirbyteMessage> scanIncremental(DynamodbOperations

}

private AutoCloseableIterator<AirbyteMessage> scanFullRefresh(DynamodbOperations dynamodbOperations,
AirbyteStream airbyteStream) {
Map<String, JsonNode> properties = objectMapper.convertValue(airbyteStream.getJsonSchema().get("properties"), new TypeReference<>() {});
Set<String> selectedAttributes = properties.keySet();
private AutoCloseableIterator<AirbyteMessage> scanFullRefresh(final DynamodbOperations dynamodbOperations,
final AirbyteStream airbyteStream) {
final Map<String, JsonNode> properties = objectMapper.convertValue(airbyteStream.getJsonSchema().get("properties"), new TypeReference<>() {});
final Set<String> selectedAttributes = properties.keySet();

var messageStream = dynamodbOperations
final var messageStream = dynamodbOperations
.scanTable(airbyteStream.getName(), selectedAttributes, null)
.stream()
.map(jn -> DynamodbUtils.mapAirbyteMessage(airbyteStream.getName(), jn));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME;
import static io.airbyte.integrations.source.postgres.PostgresType.safeGetJdbcType;
import static io.airbyte.protocol.models.JsonSchemaType.INTEGER;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -28,7 +27,7 @@
import io.airbyte.db.SourceOperations;
import io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations;
import io.airbyte.db.jdbc.DateTimeConverter;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.sql.PreparedStatement;
Expand Down Expand Up @@ -275,7 +274,8 @@ private void putTimestampArray(final ObjectNode node, final String columnName, f
node.set(columnName, arrayNode);
}

private void putTimestampTzArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex) throws SQLException {
private void putTimestampTzArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex)
throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
Expand Down Expand Up @@ -548,7 +548,7 @@ protected <T extends PGobject> void putObject(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index,
Class<T> clazz)
final Class<T> clazz)
throws SQLException {
final T object = getObject(resultSet, index, clazz);
node.put(columnName, object.getValue());
Expand Down Expand Up @@ -599,7 +599,7 @@ static String parseMoneyValue(final String moneyString) {
}

@Override
public boolean isCursorType(PostgresType type) {
public boolean isCursorType(final PostgresType type) {
return PostgresUtils.ALLOWED_CURSOR_TYPES.contains(type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTest;
import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.Set;
import org.jooq.DSLContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSource
private JsonNode stateAfterFirstSync;

@Override
protected List<AirbyteMessage> runRead(ConfiguredAirbyteCatalog configuredCatalog) throws Exception {
protected List<AirbyteMessage> runRead(final ConfiguredAirbyteCatalog configuredCatalog) throws Exception {
if (stateAfterFirstSync == null) {
throw new RuntimeException("stateAfterFirstSync is null");
}
return super.runRead(configuredCatalog, stateAfterFirstSync);
}

@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
final Database database = setupDatabase();
initTests();
for (final TestDataHolder test : testDataHolders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil.JsonSchemaPrimitive;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
Expand Down
Loading

0 comments on commit 865b580

Please sign in to comment.