diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index d1e35301b5ea..1e3a301d5d99 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -77,6 +77,7 @@ jobs: SOURCE_AWS_CLOUDTRAIL_CREDS: ${{ secrets.SOURCE_AWS_CLOUDTRAIL_CREDS }} AZURE_STORAGE_INTEGRATION_TEST_CREDS: ${{ secrets.AZURE_STORAGE_INTEGRATION_TEST_CREDS }} BIGQUERY_INTEGRATION_TEST_CREDS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDS }} + BIGQUERY_TEST_CREDS: ${{ secrets.BIGQUERY_TEST_CREDS }} BRAINTREE_TEST_CREDS: ${{ secrets.BRAINTREE_TEST_CREDS }} DESTINATION_PUBSUB_TEST_CREDS: ${{ secrets.DESTINATION_PUBSUB_TEST_CREDS }} DESTINATION_KVDB_TEST_CREDS: ${{ secrets.DESTINATION_KVDB_TEST_CREDS }} diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index ba6f7f306cee..79ba35ee10ec 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -75,6 +75,7 @@ jobs: AWS_REDSHIFT_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_REDSHIFT_INTEGRATION_TEST_CREDS }} AZURE_STORAGE_INTEGRATION_TEST_CREDS: ${{ secrets.AZURE_STORAGE_INTEGRATION_TEST_CREDS }} BIGQUERY_INTEGRATION_TEST_CREDS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDS }} + BIGQUERY_TEST_CREDS: ${{ secrets.BIGQUERY_TEST_CREDS }} BRAINTREE_TEST_CREDS: ${{ secrets.BRAINTREE_TEST_CREDS }} DESTINATION_PUBSUB_TEST_CREDS: ${{ secrets.DESTINATION_PUBSUB_TEST_CREDS }} DESTINATION_KVDB_TEST_CREDS: ${{ secrets.DESTINATION_KVDB_TEST_CREDS }} diff --git a/airbyte-db/build.gradle b/airbyte-db/build.gradle index 0205b835d1a2..e62dac724e4d 100644 --- a/airbyte-db/build.gradle +++ b/airbyte-db/build.gradle @@ -14,4 +14,12 @@ dependencies { testImplementation 'org.apache.commons:commons-lang3:3.11' testImplementation "org.testcontainers:postgresql:1.15.1" + + // Big Query + implementation platform('com.google.cloud:libraries-bom:20.6.0') + implementation('com.google.cloud:google-cloud-bigquery:1.133.1') + + // Lombok + implementation 'org.projectlombok:lombok:1.18.20' + annotationProcessor('org.projectlombok:lombok:1.18.20') } diff --git a/airbyte-db/src/main/java/io/airbyte/db/DataTypeSupplier.java b/airbyte-db/src/main/java/io/airbyte/db/DataTypeSupplier.java new file mode 100644 index 000000000000..18062b8e4ccd --- /dev/null +++ b/airbyte-db/src/main/java/io/airbyte/db/DataTypeSupplier.java @@ -0,0 +1,34 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.db; + +import java.sql.SQLException; + +@FunctionalInterface +public interface DataTypeSupplier { + + DataType apply() throws SQLException; + +} diff --git a/airbyte-db/src/main/java/io/airbyte/db/DataTypeUtils.java b/airbyte-db/src/main/java/io/airbyte/db/DataTypeUtils.java new file mode 100644 index 000000000000..04ab4a98af25 --- /dev/null +++ b/airbyte-db/src/main/java/io/airbyte/db/DataTypeUtils.java @@ -0,0 +1,63 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.db; + +import java.sql.Date; +import java.sql.SQLException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.util.function.Function; + +public class DataTypeUtils { + + public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset + + public static T returnNullIfInvalid(DataTypeSupplier valueProducer) { + return returnNullIfInvalid(valueProducer, ignored -> true); + } + + public static T returnNullIfInvalid(DataTypeSupplier valueProducer, Function isValidFn) { + // Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will throw an + // exception when parsed. We want to parse those + // values as null. + // This method reduces error handling boilerplate. + try { + T value = valueProducer.apply(); + return isValidFn.apply(value) ? value : null; + } catch (SQLException e) { + return null; + } + } + + public static String toISO8601String(long epochMillis) { + return DATE_FORMAT.format(Date.from(Instant.ofEpochMilli(epochMillis))); + } + + public static String toISO8601String(java.util.Date date) { + return DATE_FORMAT.format(date); + } + +} diff --git a/airbyte-db/src/main/java/io/airbyte/db/Databases.java b/airbyte-db/src/main/java/io/airbyte/db/Databases.java index ef84903a550d..0c19fc95d29f 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/Databases.java +++ b/airbyte-db/src/main/java/io/airbyte/db/Databases.java @@ -27,6 +27,7 @@ import static org.jooq.impl.DSL.select; import io.airbyte.commons.lang.Exceptions; +import io.airbyte.db.bigquery.BigQueryDatabase; import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; @@ -196,4 +197,8 @@ private static BasicDataSource createBasicDataSource(final String username, return connectionPool; } + public static BigQueryDatabase createBigQueryDatabase(final String projectId, final String jsonCreds) { + return new BigQueryDatabase(projectId, jsonCreds); + } + } diff --git a/airbyte-db/src/main/java/io/airbyte/db/SqlDatabase.java b/airbyte-db/src/main/java/io/airbyte/db/SqlDatabase.java index a09738d3d734..f51ace86236f 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/SqlDatabase.java +++ b/airbyte-db/src/main/java/io/airbyte/db/SqlDatabase.java @@ -25,7 +25,6 @@ package io.airbyte.db; import com.fasterxml.jackson.databind.JsonNode; -import java.sql.SQLException; import java.util.stream.Stream; public abstract class SqlDatabase implements AutoCloseable { @@ -33,9 +32,9 @@ public abstract class SqlDatabase implements AutoCloseable { private JsonNode sourceConfig; private JsonNode databaseConfig; - public abstract void execute(String sql) throws SQLException; + public abstract void execute(String sql) throws Exception; - public abstract Stream query(String sql, String... params) throws SQLException; + public abstract Stream query(String sql, String... params) throws Exception; public JsonNode getSourceConfig() { return sourceConfig; diff --git a/airbyte-db/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java b/airbyte-db/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java new file mode 100644 index 000000000000..15cfc38e2e64 --- /dev/null +++ b/airbyte-db/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java @@ -0,0 +1,219 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.db.bigquery; + +import static java.util.Objects.isNull; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.api.gax.retrying.RetrySettings; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.QueryParameterValue; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.Table; +import com.google.common.base.Charsets; +import com.google.common.collect.Streams; +import io.airbyte.db.SqlDatabase; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.threeten.bp.Duration; + +public class BigQueryDatabase extends SqlDatabase { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDatabase.class); + + private final BigQuery bigQuery; + + public BigQueryDatabase(String projectId, String jsonCreds) { + try { + BigQueryOptions.Builder bigQueryBuilder = BigQueryOptions.newBuilder(); + ServiceAccountCredentials credentials = null; + if (jsonCreds != null && !jsonCreds.isEmpty()) { + credentials = ServiceAccountCredentials + .fromStream(new ByteArrayInputStream(jsonCreds.getBytes(Charsets.UTF_8))); + } + bigQuery = bigQueryBuilder + .setProjectId(projectId) + .setCredentials(!isNull(credentials) ? credentials : ServiceAccountCredentials.getApplicationDefault()) + .setRetrySettings(RetrySettings + .newBuilder() + .setMaxAttempts(10) + .setRetryDelayMultiplier(1.5) + .setTotalTimeout(Duration.ofMinutes(60)) + .build()) + .build() + .getService(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void execute(String sql) throws SQLException { + final ImmutablePair result = executeQuery(bigQuery, getQueryConfig(sql, Collections.emptyList())); + if (result.getLeft() == null) { + throw new SQLException("BigQuery request is failed with error: " + result.getRight() + ". SQL: " + sql); + } + LOGGER.info("BigQuery successfully finished execution SQL: " + sql); + } + + public Stream query(String sql) throws Exception { + return query(sql, Collections.emptyList()); + } + + public Stream query(String sql, QueryParameterValue... params) throws Exception { + return query(sql, (params == null ? Collections.emptyList() : Arrays.asList(params))); + } + + @Override + public Stream query(String sql, String... params) throws Exception { + List parameterValueList; + if (params == null) + parameterValueList = Collections.emptyList(); + else + parameterValueList = Arrays.stream(params).map(param -> QueryParameterValue.newBuilder().setValue(param).setType( + StandardSQLTypeName.STRING).build()).collect(Collectors.toList()); + + return query(sql, parameterValueList); + } + + public Stream query(String sql, List params) throws Exception { + final ImmutablePair result = executeQuery(bigQuery, getQueryConfig(sql, params)); + + if (result.getLeft() != null) { + FieldList fieldList = result.getLeft().getQueryResults().getSchema().getFields(); + return Streams.stream(result.getLeft().getQueryResults().iterateAll()) + .map(fieldValues -> BigQueryUtils.rowToJson(fieldValues, fieldList)); + } else + throw new Exception( + "Failed to execute query " + sql + (params != null && !params.isEmpty() ? " with params " + params : "") + ". Error: " + result.getRight()); + } + + @Override + public void close() throws Exception { + /** + * The BigQuery doesn't require connection close. It will be done automatically. + */ + } + + public QueryJobConfiguration getQueryConfig(String sql, List params) { + return QueryJobConfiguration + .newBuilder(sql) + .setUseLegacySql(false) + .setPositionalParameters(params) + .build(); + } + + public ImmutablePair executeQuery(BigQuery bigquery, QueryJobConfiguration queryConfig) { + final JobId jobId = JobId.of(UUID.randomUUID().toString()); + final Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + return executeQuery(queryJob); + } + + /** + * Returns full information about all tables from entire project + * + * @param projectId BigQuery project id + * @return List of BigQuery tables + */ + public List getProjectTables(String projectId) { + List
tableList = new ArrayList<>(); + bigQuery.listDatasets(projectId) + .iterateAll() + .forEach(dataset -> bigQuery.listTables(dataset.getDatasetId()) + .iterateAll() + .forEach(table -> tableList.add(bigQuery.getTable(table.getTableId())))); + return tableList; + } + + /** + * Returns full information about all tables from specific Dataset + * + * @param datasetId BigQuery dataset id + * @return List of BigQuery tables + */ + public List
getDatasetTables(String datasetId) { + List
tableList = new ArrayList<>(); + bigQuery.listTables(datasetId) + .iterateAll() + .forEach(table -> tableList.add(bigQuery.getTable(table.getTableId()))); + return tableList; + } + + public BigQuery getBigQuery() { + return bigQuery; + } + + public void cleanDataSet(String dataSetId) { + // allows deletion of a dataset that has contents + final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents(); + + final boolean success = bigQuery.delete(dataSetId, option); + if (success) { + LOGGER.info("BQ Dataset " + dataSetId + " deleted..."); + } else { + LOGGER.info("BQ Dataset cleanup for " + dataSetId + " failed!"); + } + } + + private ImmutablePair executeQuery(Job queryJob) { + final Job completedJob = waitForQuery(queryJob); + if (completedJob == null) { + throw new RuntimeException("Job no longer exists"); + } else if (completedJob.getStatus().getError() != null) { + // You can also look at queryJob.getStatus().getExecutionErrors() for all + // errors, not just the latest one. + return ImmutablePair.of(null, (completedJob.getStatus().getError().toString())); + } + + return ImmutablePair.of(completedJob, null); + } + + private Job waitForQuery(Job queryJob) { + try { + return queryJob.waitFor(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/airbyte-db/src/main/java/io/airbyte/db/bigquery/BigQueryUtils.java b/airbyte-db/src/main/java/io/airbyte/db/bigquery/BigQueryUtils.java new file mode 100644 index 000000000000..30db7d4f5045 --- /dev/null +++ b/airbyte-db/src/main/java/io/airbyte/db/bigquery/BigQueryUtils.java @@ -0,0 +1,141 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.db.bigquery; + +import static io.airbyte.db.DataTypeUtils.returnNullIfInvalid; +import static io.airbyte.db.DataTypeUtils.toISO8601String; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.FieldValue.Attribute; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.QueryParameterValue; +import com.google.cloud.bigquery.StandardSQLTypeName; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.DataTypeUtils; +import io.airbyte.protocol.models.JsonSchemaPrimitive; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.Date; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQueryUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryUtils.class); + + public static final DateFormat BIG_QUERY_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd"); + public static final DateFormat BIG_QUERY_DATETIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + public static final DateFormat BIG_QUERY_TIMESTAMP_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSSSS z"); + + public static JsonNode rowToJson(FieldValueList rowValues, FieldList fieldList) { + ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); + fieldList.forEach(field -> setJsonField(field, rowValues.get(field.getName()), jsonNode)); + return jsonNode; + } + + private static void fillObjectNode(String fieldName, StandardSQLTypeName fieldType, FieldValue fieldValue, ObjectNode node) { + switch (fieldType) { + case BOOL -> node.put(fieldName, fieldValue.getBooleanValue()); + case INT64 -> node.put(fieldName, fieldValue.getLongValue()); + case FLOAT64 -> node.put(fieldName, fieldValue.getDoubleValue()); + case NUMERIC -> node.put(fieldName, fieldValue.getNumericValue()); + case BIGNUMERIC -> node.put(fieldName, returnNullIfInvalid(fieldValue::getNumericValue)); + case STRING -> node.put(fieldName, fieldValue.getStringValue()); + case BYTES -> node.put(fieldName, fieldValue.getBytesValue()); + case DATE -> node.put(fieldName, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATE_FORMAT))); + case DATETIME -> node.put(fieldName, toISO8601String(getDateValue(fieldValue, BIG_QUERY_DATETIME_FORMAT))); + case TIMESTAMP -> node.put(fieldName, toISO8601String(fieldValue.getTimestampValue() / 1000)); + case TIME -> node.put(fieldName, fieldValue.getStringValue()); + default -> node.put(fieldName, fieldValue.getStringValue()); + } + } + + private static void setJsonField(Field field, FieldValue fieldValue, ObjectNode node) { + String fieldName = field.getName(); + if (fieldValue.getAttribute().equals(Attribute.PRIMITIVE)) { + if (fieldValue.isNull()) { + node.put(fieldName, (String) null); + } else { + fillObjectNode(fieldName, field.getType().getStandardType(), fieldValue, node); + } + } else if (fieldValue.getAttribute().equals(Attribute.REPEATED)) { + ArrayNode arrayNode = node.putArray(fieldName); + StandardSQLTypeName fieldType = field.getType().getStandardType(); + fieldValue.getRepeatedValue().forEach(arrayFieldValue -> fillObjectNode(fieldName, fieldType, arrayFieldValue, arrayNode.addObject())); + } else if (fieldValue.getAttribute().equals(Attribute.RECORD)) { + ObjectNode newNode = node.putObject(fieldName); + field.getSubFields().forEach(recordField -> setJsonField(recordField, fieldValue.getRecordValue().get(recordField.getName()), newNode)); + } + } + + public static Date getDateValue(FieldValue fieldValue, DateFormat dateFormat) { + Date parsedValue = null; + String value = fieldValue.getStringValue(); + try { + parsedValue = dateFormat.parse(value); + } catch (ParseException e) { + LOGGER.error("Fail to parse date value : " + value + ". Null is returned."); + } + return parsedValue; + } + + public static JsonSchemaPrimitive getType(StandardSQLTypeName bigQueryType) { + return switch (bigQueryType) { + case BOOL -> JsonSchemaPrimitive.BOOLEAN; + case INT64, FLOAT64, NUMERIC, BIGNUMERIC -> JsonSchemaPrimitive.NUMBER; + case STRING, BYTES, TIMESTAMP, DATE, TIME, DATETIME -> JsonSchemaPrimitive.STRING; + default -> JsonSchemaPrimitive.STRING; + }; + } + + private static String getFormattedValue(StandardSQLTypeName paramType, String paramValue) { + try { + return switch (paramType) { + case DATE -> BIG_QUERY_DATE_FORMAT.format(DataTypeUtils.DATE_FORMAT.parse(paramValue)); + case DATETIME -> BIG_QUERY_DATETIME_FORMAT + .format(DataTypeUtils.DATE_FORMAT.parse(paramValue)); + case TIMESTAMP -> BIG_QUERY_TIMESTAMP_FORMAT + .format(DataTypeUtils.DATE_FORMAT.parse(paramValue)); + default -> paramValue; + }; + } catch (ParseException e) { + throw new RuntimeException("Fail to parse value " + paramValue + " to type " + paramType.name()); + } + } + + public static QueryParameterValue getQueryParameter(StandardSQLTypeName paramType, String paramValue) { + String value = getFormattedValue(paramType, paramValue); + LOGGER.info("Query parameter for set : " + value + ". Type: " + paramType.name()); + return QueryParameterValue.newBuilder().setType(paramType).setValue(value).build(); + } + +} diff --git a/airbyte-db/src/main/java/io/airbyte/db/bigquery/TempBigQueryJoolDatabaseImpl.java b/airbyte-db/src/main/java/io/airbyte/db/bigquery/TempBigQueryJoolDatabaseImpl.java new file mode 100644 index 000000000000..57297009c716 --- /dev/null +++ b/airbyte-db/src/main/java/io/airbyte/db/bigquery/TempBigQueryJoolDatabaseImpl.java @@ -0,0 +1,89 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.db.bigquery; + +import io.airbyte.db.ContextQueryFunction; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import java.sql.SQLException; +import org.jooq.Record; +import org.jooq.Result; +import org.jooq.SQLDialect; +import org.jooq.exception.DataAccessException; +import org.jooq.impl.DefaultDSLContext; + +/** + * This class is a temporary and will be removed as part of the issue @TODO #4547 + */ +public class TempBigQueryJoolDatabaseImpl extends Database { + + private final BigQueryDatabase realDatabase; + + public TempBigQueryJoolDatabaseImpl(final String projectId, final String jsonCreds) { + super(null, null); + realDatabase = Databases.createBigQueryDatabase(projectId, jsonCreds); + } + + @Override + public T query(ContextQueryFunction transform) throws SQLException { + return transform.query(new FakeDefaultDSLContext(realDatabase)); + } + + @Override + public T transaction(ContextQueryFunction transform) throws SQLException { + return transform.query(new FakeDefaultDSLContext(realDatabase)); + } + + @Override + public void close() throws Exception { + realDatabase.close(); + } + + public BigQueryDatabase getRealDatabase() { + return realDatabase; + } + + private static class FakeDefaultDSLContext extends DefaultDSLContext { + + private final BigQueryDatabase database; + + public FakeDefaultDSLContext(BigQueryDatabase database) { + super((SQLDialect) null); + this.database = database; + } + + @Override + public Result fetch(String sql) throws DataAccessException { + try { + database.execute(sql); + } catch (SQLException e) { + throw new DataAccessException(e.getMessage()); + } + return null; + } + + } + +} diff --git a/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java b/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java index cd153385c236..1df721e471db 100644 --- a/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java +++ b/airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; +import io.airbyte.db.DataTypeUtils; import io.airbyte.protocol.models.JsonSchemaPrimitive; import java.math.BigDecimal; import java.sql.Date; @@ -36,23 +37,17 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; -import java.text.DateFormat; import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.time.Instant; import java.util.Collections; import java.util.Spliterator; import java.util.Spliterators; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.xml.bind.DatatypeConverter; public class JdbcUtils { - public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset - /** * Map records returned in a result set. * @@ -130,18 +125,19 @@ private static void setJsonField(ResultSet r, int i, ObjectNode o) throws SQLExc case BIT, BOOLEAN -> o.put(columnName, r.getBoolean(i)); case TINYINT, SMALLINT -> o.put(columnName, r.getShort(i)); case INTEGER -> putInteger(o, columnName, r, i); - case BIGINT -> o.put(columnName, nullIfInvalid(() -> r.getLong(i))); - case FLOAT, DOUBLE -> o.put(columnName, nullIfInvalid(() -> r.getDouble(i), Double::isFinite)); - case REAL -> o.put(columnName, nullIfInvalid(() -> r.getFloat(i), Float::isFinite)); - case NUMERIC, DECIMAL -> o.put(columnName, nullIfInvalid(() -> r.getBigDecimal(i))); + case BIGINT -> o.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> r.getLong(i))); + case FLOAT, DOUBLE -> o.put(columnName, DataTypeUtils + .returnNullIfInvalid(() -> r.getDouble(i), Double::isFinite)); + case REAL -> o.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> r.getFloat(i), Float::isFinite)); + case NUMERIC, DECIMAL -> o.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> r.getBigDecimal(i))); case CHAR, VARCHAR, LONGVARCHAR -> o.put(columnName, r.getString(i)); - case DATE -> o.put(columnName, toISO8601String(r.getDate(i))); - case TIME -> o.put(columnName, toISO8601String(r.getTime(i))); + case DATE -> o.put(columnName, DataTypeUtils.toISO8601String(r.getDate(i))); + case TIME -> o.put(columnName, DataTypeUtils.toISO8601String(r.getTime(i))); case TIMESTAMP -> { // https://www.cis.upenn.edu/~bcpierce/courses/629/jdkdocs/guide/jdbc/getstart/mapping.doc.html final Timestamp t = r.getTimestamp(i); java.util.Date d = new java.util.Date(t.getTime() + (t.getNanos() / 1000000)); - o.put(columnName, toISO8601String(d)); + o.put(columnName, DataTypeUtils.toISO8601String(d)); } case BLOB, BINARY, VARBINARY, LONGVARBINARY -> o.put(columnName, r.getBytes(i)); default -> o.put(columnName, r.getString(i)); @@ -157,20 +153,12 @@ private static void putInteger(ObjectNode node, String columnName, ResultSet res try { node.put(columnName, resultSet.getInt(index)); } catch (SQLException e) { - node.put(columnName, nullIfInvalid(() -> resultSet.getLong(index))); + node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getLong(index))); } } // todo (cgardens) - move generic date helpers to commons. - public static String toISO8601String(long epochMillis) { - return DATE_FORMAT.format(Date.from(Instant.ofEpochMilli(epochMillis))); - } - - public static String toISO8601String(java.util.Date date) { - return DATE_FORMAT.format(date); - } - public static void setStatementField(PreparedStatement preparedStatement, int parameterIndex, JDBCType cursorFieldType, @@ -183,7 +171,8 @@ public static void setStatementField(PreparedStatement preparedStatement, // value in the following format case TIME, TIMESTAMP -> { try { - preparedStatement.setTimestamp(parameterIndex, Timestamp.from(DATE_FORMAT.parse(value).toInstant())); + preparedStatement.setTimestamp(parameterIndex, Timestamp.from( + DataTypeUtils.DATE_FORMAT.parse(value).toInstant())); } catch (ParseException e) { throw new RuntimeException(e); } @@ -191,7 +180,7 @@ public static void setStatementField(PreparedStatement preparedStatement, case DATE -> { try { - Timestamp from = Timestamp.from(DATE_FORMAT.parse(value).toInstant()); + Timestamp from = Timestamp.from(DataTypeUtils.DATE_FORMAT.parse(value).toInstant()); preparedStatement.setDate(parameterIndex, new Date(from.getTime())); } catch (ParseException e) { throw new RuntimeException(e); @@ -241,28 +230,4 @@ public static JsonSchemaPrimitive getType(JDBCType jdbcType) { }; } - private static T nullIfInvalid(SQLSupplier valueProducer) { - return nullIfInvalid(valueProducer, ignored -> true); - } - - private static T nullIfInvalid(SQLSupplier valueProducer, Function isValidFn) { - // Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will throw an - // exception when parsed. We want to parse those - // values as null. - // This method reduces error handling boilerplate. - try { - T value = valueProducer.apply(); - return isValidFn.apply(value) ? value : null; - } catch (SQLException e) { - return null; - } - } - - @FunctionalInterface - private interface SQLSupplier { - - O apply() throws SQLException; - - } - } diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceComprehensiveTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceComprehensiveTest.java index be8af318dec7..09e19ab44186 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceComprehensiveTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/SourceComprehensiveTest.java @@ -39,6 +39,7 @@ import io.airbyte.protocol.models.JsonSchemaPrimitive; import io.airbyte.protocol.models.SyncMode; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -107,7 +108,6 @@ public void testDataTypes() throws Exception { ConfiguredAirbyteCatalog catalog = getConfiguredCatalog(); List allMessages = runRead(catalog); final List recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).collect(Collectors.toList()); - Map> expectedValues = new HashMap<>(); testDataHolders.forEach(testDataHolder -> { if (!testDataHolder.getExpectedValues().isEmpty()) @@ -125,15 +125,19 @@ public void testDataTypes() throws Exception { } }); - expectedValues.forEach((streamName, values) -> { - assertTrue(values.isEmpty(), "The streamer " + streamName + " should return all expected values. Missing values: " + values); - }); + expectedValues.forEach((streamName, values) -> assertTrue(values.isEmpty(), + "The streamer " + streamName + " should return all expected values. Missing values: " + values)); } private String getValueFromJsonNode(JsonNode jsonNode) { - String value = (jsonNode != null ? jsonNode.asText() : null); - value = (value != null && value.equals("null") ? null : value); - return value; + if (jsonNode != null) { + String nodeText = jsonNode.asText(); + String nodeString = jsonNode.toString(); + String value = (nodeText != null && !nodeText.equals("") ? nodeText : nodeString); + value = (value != null && value.equals("null") ? null : value); + return value; + } else + return null; } /** @@ -201,4 +205,32 @@ public void addDataTypeTestData(TestDataHolder test) { test.setTestColumnName(getTestColumnName()); } + private String formatCollection(Collection collection) { + return collection.stream().map(s -> "`" + s + "`").collect(Collectors.joining(", ")); + } + + /** + * Builds a table with all registered test cases with values using Markdown syntax (can be used in + * the github). + * + * @return formatted list of test cases + */ + public String getMarkdownTestTable() { + StringBuilder table = new StringBuilder() + .append("|**Data Type**|**Insert values**|**Expected values**|**Comment**|**Common test result**|\n") + .append("|----|----|----|----|----|\n"); + + testDataHolders.forEach(test -> table.append(String.format("| %s | %s | %s | %s | %s |\n", + test.getSourceType(), + formatCollection(test.getValues()), + formatCollection(test.getExpectedValues()), + "", + "Ok"))); + return table.toString(); + } + + protected void printMarkdownTestTable() { + LOGGER.info(getMarkdownTestTable()); + } + } diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java index f59f41d4d143..3353c9b4c22b 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestDataHolder.java @@ -224,6 +224,10 @@ public List getExpectedValues() { return expectedValues; } + public List getValues() { + return values; + } + public String getNameWithTestPrefix() { return nameSpace + "_" + testNumber + "_" + sourceType; } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtilsTest.java b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtilsTest.java index 2822f29733ac..07e5350e8d92 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtilsTest.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/test/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtilsTest.java @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; +import io.airbyte.db.DataTypeUtils; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; @@ -111,12 +112,14 @@ void testInsertRawRecordsInSingleQuery() throws SQLException { Jsons.jsonNode(ImmutableMap.builder() .put(JavaBaseConstants.COLUMN_NAME_AB_ID, RECORD1_UUID) .put(JavaBaseConstants.COLUMN_NAME_DATA, records.get(0).getData()) - .put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, JdbcUtils.toISO8601String(records.get(0).getEmittedAt())) + .put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, DataTypeUtils + .toISO8601String(records.get(0).getEmittedAt())) .build()), Jsons.jsonNode(ImmutableMap.builder() .put(JavaBaseConstants.COLUMN_NAME_AB_ID, RECORD2_UUID) .put(JavaBaseConstants.COLUMN_NAME_DATA, records.get(1).getData()) - .put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, JdbcUtils.toISO8601String(records.get(1).getEmittedAt())) + .put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, DataTypeUtils + .toISO8601String(records.get(1).getEmittedAt())) .build())); actualRecords.forEach( diff --git a/airbyte-integrations/connectors/source-bigquery/Dockerfile b/airbyte-integrations/connectors/source-bigquery/Dockerfile new file mode 100644 index 000000000000..cfdaa65dde59 --- /dev/null +++ b/airbyte-integrations/connectors/source-bigquery/Dockerfile @@ -0,0 +1,13 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte + +ENV APPLICATION source-bigquery + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile. +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-bigquery \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-bigquery/build.gradle b/airbyte-integrations/connectors/source-bigquery/build.gradle new file mode 100644 index 000000000000..0214905cf428 --- /dev/null +++ b/airbyte-integrations/connectors/source-bigquery/build.gradle @@ -0,0 +1,31 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.source.bigquery.BigQuerySource' +} + +dependencies { + implementation 'com.google.cloud:google-cloud-bigquery:1.122.2' + implementation 'org.apache.commons:commons-lang3:3.11' + implementation project(':airbyte-db') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:connectors:source-jdbc') + implementation project(':airbyte-integrations:connectors:source-relational-db') + + //TODO Add jdbc driver import here. Ex: implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14' + + testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) + + testImplementation 'org.apache.commons:commons-lang3:3.11' + + integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-bigquery') + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') + + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java new file mode 100644 index 000000000000..d1b4348e8508 --- /dev/null +++ b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java @@ -0,0 +1,188 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.bigquery; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.cloud.bigquery.QueryParameterValue; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.Table; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.db.Databases; +import io.airbyte.db.SqlDatabase; +import io.airbyte.db.bigquery.BigQueryDatabase; +import io.airbyte.db.bigquery.BigQueryUtils; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.source.relationaldb.AbstractRelationalDbSource; +import io.airbyte.integrations.source.relationaldb.TableInfo; +import io.airbyte.protocol.models.CommonField; +import io.airbyte.protocol.models.JsonSchemaPrimitive; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BigQuerySource extends AbstractRelationalDbSource implements Source { + + private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySource.class); + + public static final String CONFIG_DATASET_ID = "dataset_id"; + public static final String CONFIG_PROJECT_ID = "project_id"; + public static final String CONFIG_CREDS = "credentials_json"; + + private String quote = ""; + private JsonNode dbConfig; + + @Override + public JsonNode toDatabaseConfig(JsonNode config) { + return Jsons.jsonNode(ImmutableMap.builder() + .put(CONFIG_PROJECT_ID, config.get(CONFIG_PROJECT_ID).asText()) + .put(CONFIG_CREDS, config.get(CONFIG_CREDS).asText()) + .put(CONFIG_DATASET_ID, config.get(CONFIG_DATASET_ID).asText()) + .build()); + } + + @Override + protected BigQueryDatabase createDatabase(JsonNode config) { + dbConfig = Jsons.clone(config); + return Databases.createBigQueryDatabase(config.get(CONFIG_PROJECT_ID).asText(), config.get(CONFIG_CREDS).asText()); + } + + @Override + public List> getCheckOperations(JsonNode config) { + List> checkList = new ArrayList<>(); + checkList.add(database -> { + if (database.query("select 1").count() < 1) + throw new Exception("Unable to execute any query on the source!"); + else + LOGGER.info("The source passed the basic query test!"); + }); + + checkList.add(database -> { + if (isDatasetConfigured(database)) { + database.query(String.format("select 1 from %s where 1=0", + getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES"))); + LOGGER.info("The source passed the Dataset query test!"); + } else { + LOGGER.info("The Dataset query test is skipped due to not configured datasetId!"); + } + }); + + return checkList; + } + + @Override + protected JsonSchemaPrimitive getType(StandardSQLTypeName columnType) { + return BigQueryUtils.getType(columnType); + } + + @Override + public Set getExcludedInternalNameSpaces() { + return Collections.emptySet(); + } + + @Override + protected List>> discoverInternal(BigQueryDatabase database) { + String projectId = dbConfig.get(CONFIG_PROJECT_ID).asText(); + String datasetId = getConfigDatasetId(database); + List
tables = + (isDatasetConfigured(database) ? database.getDatasetTables(getConfigDatasetId(database)) : database.getProjectTables(projectId)); + List>> result = new ArrayList<>(); + tables.stream().map(table -> TableInfo.>builder() + .nameSpace(datasetId) + .name(table.getTableId().getTable()) + .fields(Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream() + .map(f -> { + StandardSQLTypeName standardType = f.getType().getStandardType(); + return new CommonField<>(f.getName(), standardType); + }) + .collect(Collectors.toList())) + .build()) + .forEach(result::add); + return result; + } + + @Override + protected Map> discoverPrimaryKeys(BigQueryDatabase database, List>> tableInfos) { + return Collections.emptyMap(); + } + + @Override + protected String getQuoteString() { + return quote; + } + + @Override + public AutoCloseableIterator queryTableIncremental(BigQueryDatabase database, + List columnNames, + String schemaName, + String tableName, + String cursorField, + StandardSQLTypeName cursorFieldType, + String cursor) { + return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?", + enquoteIdentifierList(columnNames), + getFullTableName(schemaName, tableName), + cursorField), + BigQueryUtils.getQueryParameter(cursorFieldType, cursor)); + } + + private AutoCloseableIterator queryTableWithParams(BigQueryDatabase database, String sqlQuery, QueryParameterValue... params) { + return AutoCloseableIterators.lazyIterator(() -> { + try { + final Stream stream = database.query(sqlQuery, params); + return AutoCloseableIterators.fromStream(stream); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + private boolean isDatasetConfigured(SqlDatabase database) { + return database.getSourceConfig().hasNonNull(CONFIG_DATASET_ID); + } + + private String getConfigDatasetId(SqlDatabase database) { + return (isDatasetConfigured(database) ? database.getSourceConfig().get(CONFIG_DATASET_ID).asText() : null); + } + + public static void main(String[] args) throws Exception { + final Source source = new BigQuerySource(); + LOGGER.info("starting source: {}", BigQuerySource.class); + new IntegrationRunner(source).run(args); + LOGGER.info("completed source: {}", BigQuerySource.class); + } + +} diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/resources/spec.json b/airbyte-integrations/connectors/source-bigquery/src/main/resources/spec.json new file mode 100644 index 000000000000..8a9b181d0dc9 --- /dev/null +++ b/airbyte-integrations/connectors/source-bigquery/src/main/resources/spec.json @@ -0,0 +1,32 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/source/bigquery", + "supportsIncremental": true, + "supportsNormalization": true, + "supportsDBT": true, + "supported_sync_modes": ["overwrite", "append", "append_dedup"], + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "BigQuery Source Spec", + "type": "object", + "required": ["project_id", "credentials_json"], + "additionalProperties": false, + "properties": { + "project_id": { + "type": "string", + "description": "The GCP project ID for the project containing the target BigQuery dataset.", + "title": "Project ID" + }, + "dataset_id": { + "type": "string", + "description": "The BigQuery Dataset ID to look for tables to replicate from.", + "title": "Default Dataset ID" + }, + "credentials_json": { + "type": "string", + "description": "The contents of the JSON service account key. Check out the docs if you need help generating this key.", + "title": "Credentials JSON", + "airbyte_secret": true + } + } + } +} diff --git a/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceAcceptanceTest.java b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceAcceptanceTest.java new file mode 100644 index 000000000000..1ded1aaed18a --- /dev/null +++ b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceAcceptanceTest.java @@ -0,0 +1,128 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.bigquery; + +import static io.airbyte.integrations.source.bigquery.BigQuerySource.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.string.Strings; +import io.airbyte.db.bigquery.BigQueryDatabase; +import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.*; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +public class BigQuerySourceAcceptanceTest extends SourceAcceptanceTest { + + private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + private static final String STREAM_NAME = "id_and_name"; + + private BigQueryDatabase database; + private Dataset dataset; + private JsonNode config; + + @Override + protected void setupEnvironment(TestDestinationEnv testEnv) throws IOException, SQLException { + if (!Files.exists(CREDENTIALS_PATH)) { + throw new IllegalStateException( + "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH + + ". Override by setting setting path with the CREDENTIALS_PATH constant."); + } + + final String credentialsJsonString = new String(Files.readAllBytes(CREDENTIALS_PATH)); + + final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString); + final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText(); + final String datasetLocation = "US"; + + final String datasetId = Strings.addRandomSuffix("airbyte_tests_acceptance", "_", 8); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put(CONFIG_PROJECT_ID, projectId) + .put(CONFIG_CREDS, credentialsJsonString) + .put(CONFIG_DATASET_ID, datasetId) + .build()); + + database = new BigQueryDatabase(config.get(CONFIG_PROJECT_ID).asText(), credentialsJsonString); + + final DatasetInfo datasetInfo = + DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(datasetLocation).build(); + dataset = database.getBigQuery().create(datasetInfo); + + database.execute("CREATE TABLE " + datasetId + ".id_and_name(id INT64, name STRING);"); + database.execute("INSERT INTO " + datasetId + ".id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) { + database.cleanDataSet(dataset.getDatasetId().getDataset()); + } + + @Override + protected String getImageName() { + return "airbyte/source-bigquery:dev"; + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + return CatalogHelpers.createConfiguredAirbyteCatalog( + STREAM_NAME, + config.get(CONFIG_DATASET_ID).asText(), + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)); + } + + @Override + protected List getRegexTests() { + return Collections.emptyList(); + } + + @Override + protected JsonNode getState() { + return Jsons.jsonNode(new HashMap<>()); + } + +} diff --git a/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceComprehensiveTest.java b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceComprehensiveTest.java new file mode 100644 index 000000000000..ff5e46a68dd0 --- /dev/null +++ b/airbyte-integrations/connectors/source-bigquery/src/test-integration/java/io/airbyte/integrations/source/bigquery/BigQuerySourceComprehensiveTest.java @@ -0,0 +1,331 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.bigquery; + +import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_CREDS; +import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_DATASET_ID; +import static io.airbyte.integrations.source.bigquery.BigQuerySource.CONFIG_PROJECT_ID; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetInfo; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.db.Database; +import io.airbyte.db.bigquery.TempBigQueryJoolDatabaseImpl; +import io.airbyte.integrations.standardtest.source.SourceComprehensiveTest; +import io.airbyte.integrations.standardtest.source.TestDataHolder; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.JsonSchemaPrimitive; +import java.nio.file.Files; +import java.nio.file.Path; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; + +@TestInstance(Lifecycle.PER_CLASS) +public class BigQuerySourceComprehensiveTest extends SourceComprehensiveTest { + + private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); + private static final String CREATE_SQL_PATTERN = "CREATE TABLE %1$s(%2$s NUMERIC(29), %3$s %4$s)"; + + private TempBigQueryJoolDatabaseImpl database; + private Dataset dataset; + private JsonNode config; + + @Override + protected String getImageName() { + return "airbyte/source-bigquery:dev"; + } + + @Override + protected JsonNode getConfig() throws Exception { + return config; + } + + @Override + protected void tearDown(TestDestinationEnv testEnv) throws Exception { + + } + + @Override + protected Database setupDatabase() throws Exception { + if (!Files.exists(CREDENTIALS_PATH)) { + throw new IllegalStateException( + "Must provide path to a big query credentials file. By default {module-root}/" + CREDENTIALS_PATH + + ". Override by setting setting path with the CREDENTIALS_PATH constant."); + } + + final String credentialsJsonString = new String(Files.readAllBytes(CREDENTIALS_PATH)); + + final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString); + final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText(); + final String datasetLocation = "US"; + + final String datasetId = Strings.addRandomSuffix("airbyte_tests_compr", "_", 8); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put(CONFIG_PROJECT_ID, projectId) + .put(CONFIG_CREDS, credentialsJsonString) + .put(CONFIG_DATASET_ID, datasetId) + .build()); + + database = new TempBigQueryJoolDatabaseImpl(config.get(CONFIG_PROJECT_ID).asText(), credentialsJsonString); + + final DatasetInfo datasetInfo = + DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(datasetLocation).build(); + dataset = database.getRealDatabase().getBigQuery().create(datasetInfo); + + return database; + } + + @Override + protected void initTests() { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("int64") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127", "9223372036854775807", "-9223372036854775808") + .addExpectedValues(null, "-128", "127", "9223372036854775807", "-9223372036854775808") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("int") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127") + .addExpectedValues(null, "-128", "127") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("smallint") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127") + .addExpectedValues(null, "-128", "127") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("integer") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127") + .addExpectedValues(null, "-128", "127") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bigint") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127") + .addExpectedValues(null, "-128", "127") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("tinyint") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127") + .addExpectedValues(null, "-128", "127") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("byteint") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127") + .addExpectedValues(null, "-128", "127") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("numeric") + .fullSourceDataType("numeric(29,9)") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127", "999999999999999999", "-999999999999999999", "0.123456789", "-0.123456789") + .addExpectedValues(null, "-128", "127", "999999999999999999", "-999999999999999999", "0.123456789", "-0.123456789") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bignumeric") + .fullSourceDataType("bignumeric(76,38)") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127", "999999999999999999", "-999999999999999999", "0.123456789", "-0.123456789") + .addExpectedValues(null, "-128", "127", "999999999999999999", "-999999999999999999", "0.123456789", "-0.123456789") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("decimal") + .fullSourceDataType("decimal(29,9)") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127", "999999999999999999", "-999999999999999999", "0.123456789", "-0.123456789") + .addExpectedValues(null, "-128", "127", "999999999999999999", "-999999999999999999", "0.123456789", "-0.123456789") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bigdecimal") + .fullSourceDataType("bigdecimal(76,38)") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127", "999999999999999999", "-999999999999999999", "0.123456789", "-0.123456789") + .addExpectedValues(null, "-128", "127", "999999999999999999", "-999999999999999999", "0.123456789", "-0.123456789") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("float64") + .airbyteType(JsonSchemaPrimitive.NUMBER) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("null", "-128", "127", "0.123456789", "-0.123456789") + .addExpectedValues(null, "-128.0", "127.0", "0.123456789", "-0.123456789") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bool") + .airbyteType(JsonSchemaPrimitive.BOOLEAN) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("true", "false", "null") + .addExpectedValues("true", "false", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("bytes") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("FROM_BASE64(\"test\")", "null") + .addExpectedValues("test", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("date") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("date('2021-10-20')", "date('9999-12-31')", "date('0001-01-01')", "null") + .addExpectedValues("2021-10-20T00:00:00Z", "9999-12-31T00:00:00Z", "0001-01-01T00:00:00Z", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("datetime") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("datetime('2021-10-20 11:22:33')", "datetime('9999-12-31 11:22:33')", "datetime('0001-01-01 11:22:33')", "null") + .addExpectedValues("2021-10-20T11:22:33Z", "9999-12-31T11:22:33Z", "0001-01-01T11:22:33Z", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timestamp") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("timestamp('2021-10-20 11:22:33')", "null") + .addExpectedValues("2021-10-20T11:22:33Z", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("geography") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("ST_GEOGFROMTEXT('POINT(1 2)')", "null") + .addExpectedValues("POINT(1 2)", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("string") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("'qwe'", "'йцу'", "null") + .addExpectedValues("qwe", "йцу", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("struct") + .fullSourceDataType("STRUCT") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("STRUCT(\"B.A\",12)", "null") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("time") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("TIME(15, 30, 00)", "null") + .addExpectedValues("15:30:00", null) + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("array") + .fullSourceDataType("array") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("['a', 'b']") + .addExpectedValues("[{\"test_column\":\"a\"},{\"test_column\":\"b\"}]") + .build()); + + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("struct") + .fullSourceDataType("STRUCT>>") + .airbyteType(JsonSchemaPrimitive.STRING) + .createTablePatternSql(CREATE_SQL_PATTERN) + .addInsertValues("STRUCT('s' as frst, 1 as sec, STRUCT(555 as id_col, STRUCT(TIME(15, 30, 00) as time) as mega_obbj) as obbj)") + .addExpectedValues("{\"frst\":\"s\",\"sec\":1,\"obbj\":{\"id_col\":555,\"mega_obbj\":{\"last_col\":\"15:30:00\"}}}") + .build()); + } + + @Override + protected String getNameSpace() { + return dataset.getDatasetId().getDataset(); + } + + @AfterAll + public void cleanTestInstance() { + database.getRealDatabase().cleanDataSet(getNameSpace()); + } + +} diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/SourceJdbcUtils.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/SourceJdbcUtils.java index 4db786f8f717..97165b6fe894 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/SourceJdbcUtils.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/SourceJdbcUtils.java @@ -24,7 +24,7 @@ package io.airbyte.integrations.source.jdbc; -import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.DataTypeUtils; import io.airbyte.protocol.models.JsonSchemaPrimitive; import java.math.BigDecimal; import java.sql.Connection; @@ -53,7 +53,7 @@ public static void setStatementField(PreparedStatement preparedStatement, case TIME, TIMESTAMP -> { try { preparedStatement.setTimestamp(parameterIndex, Timestamp - .from(JdbcUtils.DATE_FORMAT.parse(value).toInstant())); + .from(DataTypeUtils.DATE_FORMAT.parse(value).toInstant())); } catch (ParseException e) { throw new RuntimeException(e); } @@ -61,7 +61,7 @@ public static void setStatementField(PreparedStatement preparedStatement, case DATE -> { try { - Timestamp from = Timestamp.from(JdbcUtils.DATE_FORMAT.parse(value).toInstant()); + Timestamp from = Timestamp.from(DataTypeUtils.DATE_FORMAT.parse(value).toInstant()); preparedStatement.setDate(parameterIndex, new Date(from.getTime())); } catch (ParseException e) { throw new RuntimeException(e); diff --git a/tools/bin/ci_credentials.sh b/tools/bin/ci_credentials.sh index b6f2251c4515..aff7ef17f799 100755 --- a/tools/bin/ci_credentials.sh +++ b/tools/bin/ci_credentials.sh @@ -40,6 +40,7 @@ write_standard_creds base-normalization "$AWS_REDSHIFT_INTEGRATION_TEST_CREDS" " write_standard_creds source-amplitude "$AMPLITUDE_INTEGRATION_TEST_CREDS" write_standard_creds source-asana "$SOURCE_ASANA_TEST_CREDS" write_standard_creds source-aws-cloudtrail "$SOURCE_AWS_CLOUDTRAIL_CREDS" +write_standard_creds source-bigquery "$BIGQUERY_TEST_CREDS" "credentials.json" write_standard_creds source-braintree-singer "$BRAINTREE_TEST_CREDS" write_standard_creds source-drift "$DRIFT_INTEGRATION_TEST_CREDS" write_standard_creds source-exchange-rates "$EXCHANGE_RATES_TEST_CREDS"