Skip to content

Commit

Permalink
fix tests (#15133)
Browse files Browse the repository at this point in the history
* fix tests

* fix redshift test
  • Loading branch information
mkhokh-33 authored Aug 1, 2022
1 parent 5e9759b commit e1eaed2
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public abstract class CdcSourceTest {
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME,
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER),
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))));
Expand Down Expand Up @@ -582,8 +582,8 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
final AirbyteStream streamWithoutPK = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_2",
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaType.NUMBER),
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID, JsonSchemaType.INTEGER),
Field.of(COL_MODEL, JsonSchemaType.STRING));
streamWithoutPK.setSourceDefinedPrimaryKey(Collections.emptyList());
streamWithoutPK.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
Expand All @@ -592,8 +592,8 @@ protected AirbyteCatalog expectedCatalogForDiscover() {
final AirbyteStream randomStream = CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME + "_random",
MODELS_SCHEMA + "_random",
Field.of(COL_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.NUMBER),
Field.of(COL_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MAKE_ID + "_random", JsonSchemaType.INTEGER),
Field.of(COL_MODEL + "_random", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,18 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_ID, JsonSchemaType.INTEGER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING),
Field.of(COL_ROW_ID, JsonSchemaType.NUMBER))
Field.of(COL_ROW_ID, JsonSchemaType.INTEGER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID))),
CatalogHelpers.createAirbyteStream(
Expand Down Expand Up @@ -507,7 +507,7 @@ void testDiscoverWithMultipleSchemas() throws Exception {
SCHEMA_NAME2,
Field.of(COL_ID, JsonSchemaType.STRING),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_ROW_ID, JsonSchemaType.NUMBER))
Field.of(COL_ROW_ID, JsonSchemaType.INTEGER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID))));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CockroachDbSourceTest {
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING),
Field.of("power", JsonSchemaType.NUMBER),
Field.of(COL_ROW_ID, JsonSchemaType.NUMBER))
Field.of(COL_ROW_ID, JsonSchemaType.INTEGER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID))),
CatalogHelpers.createAirbyteStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected void initTests() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("bigint")
.airbyteType(JsonSchemaType.NUMBER)
.airbyteType(JsonSchemaType.INTEGER)
.addInsertValues("-9223372036854775808", "9223372036854775807", "0", "null")
.addExpectedValues("-9223372036854775808", "9223372036854775807", "0", null)
.createTablePatternSql(CREATE_TABLE_SQL)
Expand All @@ -51,7 +51,7 @@ protected void initTests() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("int")
.airbyteType(JsonSchemaType.NUMBER)
.airbyteType(JsonSchemaType.INTEGER)
.addInsertValues("null", "-2147483648", "2147483647")
.addExpectedValues(null, "-2147483648", "2147483647")
.createTablePatternSql(CREATE_TABLE_SQL)
Expand All @@ -60,7 +60,7 @@ protected void initTests() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("smallint")
.airbyteType(JsonSchemaType.NUMBER)
.airbyteType(JsonSchemaType.INTEGER)
.addInsertValues("null", "-32768", "32767")
.addExpectedValues(null, "-32768", "32767")
.createTablePatternSql(CREATE_TABLE_SQL)
Expand All @@ -69,7 +69,7 @@ protected void initTests() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("tinyint")
.airbyteType(JsonSchemaType.NUMBER)
.airbyteType(JsonSchemaType.INTEGER)
.addInsertValues("null", "0", "255")
.addExpectedValues(null, "0", "255")
.createTablePatternSql(CREATE_TABLE_SQL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class MssqlSourceTest {
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
DB_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING),
Field.of("born", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@
import io.airbyte.integrations.source.oracle.OracleSource;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.math.BigDecimal;
import java.sql.Connection;
Expand All @@ -38,6 +42,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -256,6 +261,35 @@ void testSpec() throws Exception {
assertEquals(expected, actual);
}

protected AirbyteCatalog getCatalog(final String defaultNamespace) {
return new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

@Override
protected List<AirbyteMessage> getTestMessages() {
return Lists.newArrayList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.math.BigDecimal;
import java.sql.Connection;
Expand All @@ -37,6 +41,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -111,6 +116,35 @@ protected void incrementalDateCheck() throws Exception {
Lists.newArrayList(getTestMessages().get(1), getTestMessages().get(2)));
}

protected AirbyteCatalog getCatalog(final String defaultNamespace) {
return new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

@AfterEach
public void tearDownOracle() throws Exception {
// ORA-12519
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {

protected static final List<Field> FIELDS = List.of(
Field.of("c_custkey", JsonSchemaType.NUMBER),
Field.of("c_custkey", JsonSchemaType.INTEGER),
Field.of("c_name", JsonSchemaType.STRING),
Field.of("c_nation", JsonSchemaType.STRING));

Expand Down

0 comments on commit e1eaed2

Please sign in to comment.