Skip to content

Commit

Permalink
Revert ":tada: Extend logic for JDBC connectors to provide additional…
Browse files Browse the repository at this point in the history
… properties in JSON schema (#7859)" (#7969)

This reverts commit 2fe927b.
  • Loading branch information
VitaliiMaltsev authored Nov 15, 2021
1 parent f6d81ff commit 63c6249
Show file tree
Hide file tree
Showing 12 changed files with 25 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class DataTypeEnumTest {
@Test
void testConversionFromJsonSchemaPrimitiveToDataType() {
assertEquals(5, DataType.class.getEnumConstants().length);
assertEquals(9, JsonSchemaPrimitive.class.getEnumConstants().length);
assertEquals(6, JsonSchemaPrimitive.class.getEnumConstants().length);

assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase()));
assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ public JsonSchemaPrimitive getType(final JDBCType bigQueryType) {
case REAL -> JsonSchemaPrimitive.NUMBER;
case NUMERIC, DECIMAL -> JsonSchemaPrimitive.NUMBER;
case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> JsonSchemaPrimitive.STRING;
case DATE -> JsonSchemaPrimitive.STRING_DATE;
case TIME -> JsonSchemaPrimitive.STRING_TIME;
case TIMESTAMP -> JsonSchemaPrimitive.STRING_DATETIME;
case DATE -> JsonSchemaPrimitive.STRING;
case TIME -> JsonSchemaPrimitive.STRING;
case TIMESTAMP -> JsonSchemaPrimitive.STRING;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaPrimitive.STRING;
// since column types aren't necessarily meaningful to Airbyte, liberally convert all unrecgonised
// types to String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@ private static void assertExpectedOutputTypes(final Connection connection) throw
.put("decimal", JsonSchemaPrimitive.NUMBER)
.put("char", JsonSchemaPrimitive.STRING)
.put("varchar", JsonSchemaPrimitive.STRING)
.put("date", JsonSchemaPrimitive.STRING_DATE)
.put("time", JsonSchemaPrimitive.STRING_TIME)
.put("timestamp", JsonSchemaPrimitive.STRING_DATETIME)
.put("date", JsonSchemaPrimitive.STRING)
.put("time", JsonSchemaPrimitive.STRING)
.put("timestamp", JsonSchemaPrimitive.STRING)
.put("binary1", JsonSchemaPrimitive.STRING)
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.cockroachdb;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -68,5 +64,4 @@ private void putCockroachSpecialDataType(ResultSet resultSet, int index, ObjectN
node.put(columnName, (Double) null);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ protected void initTests() {
.addNullExpectedValue()
.build());

// Time (04:05:06) would be represented like "1970-01-01T04:05:06Z"
// Time (04:05:06) would be represented like "1970-01-01T04:05:06Z"
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("timetz")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING_DATE))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING_DATE),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING),
Field.of(COL_ROW_ID, JsonSchemaPrimitive.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID))),
Expand All @@ -138,7 +138,7 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_LAST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING_DATE))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,8 @@ protected void tearDown(TestDestinationEnv testEnv) {
/* Helpers */

private String getCertificate() throws IOException, InterruptedException {
// To enable SSL connection on the server, we need to generate self-signed certificates for the
// server and add them to the configuration.
// Then you need to enable SSL connection and specify on which port it will work. These changes will
// take effect after restart.
// To enable SSL connection on the server, we need to generate self-signed certificates for the server and add them to the configuration.
// Then you need to enable SSL connection and specify on which port it will work. These changes will take effect after restart.
// The certificate for generating a user certificate has the extension *.arm.
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -keydb -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS + "\" -stash");
db.execInContainer("su", "-", "db2inst1", "-c", "gsk8capicmd_64 -cert -create -db \"server.kdb\" -pw \"" + TEST_KEY_STORE_PASS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,32 +803,28 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, resolveJsonSchemaType()))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, resolveJsonSchemaType()))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_LAST_NAME, JsonSchemaPrimitive.STRING),
Field.of(COL_UPDATED_AT, resolveJsonSchemaType()))
Field.of(COL_UPDATED_AT, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

private JsonSchemaPrimitive resolveJsonSchemaType() {
return getDriverClass().toLowerCase().contains("oracle") ? JsonSchemaPrimitive.STRING_DATETIME : JsonSchemaPrimitive.STRING_DATE;
}

protected List<AirbyteMessage> getTestMessages() {
return Lists.newArrayList(
new AirbyteMessage().withType(Type.RECORD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static JsonNode fieldsToJsonSchema(final List<Field> fields) {
.stream()
.collect(Collectors.toMap(
Field::getName,
field -> field.getType().getJsonSchemaTypeMap())))
field -> ImmutableMap.of("type", field.getTypeAsJsonSchemaString()))))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,11 @@

package io.airbyte.protocol.models;

import com.google.common.collect.ImmutableMap;

public enum JsonSchemaPrimitive {

STRING_DATE(ImmutableMap.of("type", "string", "format", "date")),
STRING_TIME(ImmutableMap.of("type", "string", "format", "time")),
STRING_DATETIME(ImmutableMap.of("type", "string", "format", "date-time")),
STRING(ImmutableMap.of("type", "string")),
NUMBER(ImmutableMap.of("type", "number")),
OBJECT(ImmutableMap.of("type", "object")),
ARRAY(ImmutableMap.of("type", "array")),
BOOLEAN(ImmutableMap.of("type", "boolean")),
NULL(ImmutableMap.of("type", "null"));

private final ImmutableMap<String, String> jsonSchemaTypeMap;

JsonSchemaPrimitive(ImmutableMap<String, String> jsonSchemaTypeMap) {
this.jsonSchemaTypeMap = jsonSchemaTypeMap;
}

public ImmutableMap<String, String> getJsonSchemaTypeMap() {
return jsonSchemaTypeMap;
}

STRING,
NUMBER,
OBJECT,
ARRAY,
BOOLEAN,
NULL;
}

0 comments on commit 63c6249

Please sign in to comment.