Skip to content

Commit

Permalink
🎉 New Source: Big Query (#4457)
Browse files Browse the repository at this point in the history
New Source: BigQuery
  • Loading branch information
DoNotPanicUA authored Jul 22, 2021
1 parent c2d491e commit 4c0d1a0
Show file tree
Hide file tree
Showing 22 changed files with 1,351 additions and 63 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ jobs:
SOURCE_AWS_CLOUDTRAIL_CREDS: ${{ secrets.SOURCE_AWS_CLOUDTRAIL_CREDS }}
AZURE_STORAGE_INTEGRATION_TEST_CREDS: ${{ secrets.AZURE_STORAGE_INTEGRATION_TEST_CREDS }}
BIGQUERY_INTEGRATION_TEST_CREDS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDS }}
BIGQUERY_TEST_CREDS: ${{ secrets.BIGQUERY_TEST_CREDS }}
BRAINTREE_TEST_CREDS: ${{ secrets.BRAINTREE_TEST_CREDS }}
DESTINATION_PUBSUB_TEST_CREDS: ${{ secrets.DESTINATION_PUBSUB_TEST_CREDS }}
DESTINATION_KVDB_TEST_CREDS: ${{ secrets.DESTINATION_KVDB_TEST_CREDS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
AWS_REDSHIFT_INTEGRATION_TEST_CREDS: ${{ secrets.AWS_REDSHIFT_INTEGRATION_TEST_CREDS }}
AZURE_STORAGE_INTEGRATION_TEST_CREDS: ${{ secrets.AZURE_STORAGE_INTEGRATION_TEST_CREDS }}
BIGQUERY_INTEGRATION_TEST_CREDS: ${{ secrets.BIGQUERY_INTEGRATION_TEST_CREDS }}
BIGQUERY_TEST_CREDS: ${{ secrets.BIGQUERY_TEST_CREDS }}
BRAINTREE_TEST_CREDS: ${{ secrets.BRAINTREE_TEST_CREDS }}
DESTINATION_PUBSUB_TEST_CREDS: ${{ secrets.DESTINATION_PUBSUB_TEST_CREDS }}
DESTINATION_KVDB_TEST_CREDS: ${{ secrets.DESTINATION_KVDB_TEST_CREDS }}
Expand Down
8 changes: 8 additions & 0 deletions airbyte-db/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,12 @@ dependencies {

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation "org.testcontainers:postgresql:1.15.1"

// Big Query
implementation platform('com.google.cloud:libraries-bom:20.6.0')
implementation('com.google.cloud:google-cloud-bigquery:1.133.1')

// Lombok
implementation 'org.projectlombok:lombok:1.18.20'
annotationProcessor('org.projectlombok:lombok:1.18.20')
}
34 changes: 34 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/DataTypeSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import java.sql.SQLException;

@FunctionalInterface
public interface DataTypeSupplier<DataType> {

DataType apply() throws SQLException;

}
63 changes: 63 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/DataTypeUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db;

import java.sql.Date;
import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.function.Function;

public class DataTypeUtils {

public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); // Quoted "Z" to indicate UTC, no timezone offset

public static <T> T returnNullIfInvalid(DataTypeSupplier<T> valueProducer) {
return returnNullIfInvalid(valueProducer, ignored -> true);
}

public static <T> T returnNullIfInvalid(DataTypeSupplier<T> valueProducer, Function<T, Boolean> isValidFn) {
// Some edge case values (e.g: Infinity, NaN) have no java or JSON equivalent, and will throw an
// exception when parsed. We want to parse those
// values as null.
// This method reduces error handling boilerplate.
try {
T value = valueProducer.apply();
return isValidFn.apply(value) ? value : null;
} catch (SQLException e) {
return null;
}
}

public static String toISO8601String(long epochMillis) {
return DATE_FORMAT.format(Date.from(Instant.ofEpochMilli(epochMillis)));
}

public static String toISO8601String(java.util.Date date) {
return DATE_FORMAT.format(date);
}

}
5 changes: 5 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.jooq.impl.DSL.select;

import io.airbyte.commons.lang.Exceptions;
import io.airbyte.db.bigquery.BigQueryDatabase;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
Expand Down Expand Up @@ -196,4 +197,8 @@ private static BasicDataSource createBasicDataSource(final String username,
return connectionPool;
}

public static BigQueryDatabase createBigQueryDatabase(final String projectId, final String jsonCreds) {
return new BigQueryDatabase(projectId, jsonCreds);
}

}
5 changes: 2 additions & 3 deletions airbyte-db/src/main/java/io/airbyte/db/SqlDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@
package io.airbyte.db;

import com.fasterxml.jackson.databind.JsonNode;
import java.sql.SQLException;
import java.util.stream.Stream;

public abstract class SqlDatabase implements AutoCloseable {

private JsonNode sourceConfig;
private JsonNode databaseConfig;

public abstract void execute(String sql) throws SQLException;
public abstract void execute(String sql) throws Exception;

public abstract Stream<JsonNode> query(String sql, String... params) throws SQLException;
public abstract Stream<JsonNode> query(String sql, String... params) throws Exception;

public JsonNode getSourceConfig() {
return sourceConfig;
Expand Down
219 changes: 219 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.db.bigquery;

import static java.util.Objects.isNull;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.Table;
import com.google.common.base.Charsets;
import com.google.common.collect.Streams;
import io.airbyte.db.SqlDatabase;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

public class BigQueryDatabase extends SqlDatabase {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDatabase.class);

private final BigQuery bigQuery;

public BigQueryDatabase(String projectId, String jsonCreds) {
try {
BigQueryOptions.Builder bigQueryBuilder = BigQueryOptions.newBuilder();
ServiceAccountCredentials credentials = null;
if (jsonCreds != null && !jsonCreds.isEmpty()) {
credentials = ServiceAccountCredentials
.fromStream(new ByteArrayInputStream(jsonCreds.getBytes(Charsets.UTF_8)));
}
bigQuery = bigQueryBuilder
.setProjectId(projectId)
.setCredentials(!isNull(credentials) ? credentials : ServiceAccountCredentials.getApplicationDefault())
.setRetrySettings(RetrySettings
.newBuilder()
.setMaxAttempts(10)
.setRetryDelayMultiplier(1.5)
.setTotalTimeout(Duration.ofMinutes(60))
.build())
.build()
.getService();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void execute(String sql) throws SQLException {
final ImmutablePair<Job, String> result = executeQuery(bigQuery, getQueryConfig(sql, Collections.emptyList()));
if (result.getLeft() == null) {
throw new SQLException("BigQuery request is failed with error: " + result.getRight() + ". SQL: " + sql);
}
LOGGER.info("BigQuery successfully finished execution SQL: " + sql);
}

public Stream<JsonNode> query(String sql) throws Exception {
return query(sql, Collections.emptyList());
}

public Stream<JsonNode> query(String sql, QueryParameterValue... params) throws Exception {
return query(sql, (params == null ? Collections.emptyList() : Arrays.asList(params)));
}

@Override
public Stream<JsonNode> query(String sql, String... params) throws Exception {
List<QueryParameterValue> parameterValueList;
if (params == null)
parameterValueList = Collections.emptyList();
else
parameterValueList = Arrays.stream(params).map(param -> QueryParameterValue.newBuilder().setValue(param).setType(
StandardSQLTypeName.STRING).build()).collect(Collectors.toList());

return query(sql, parameterValueList);
}

public Stream<JsonNode> query(String sql, List<QueryParameterValue> params) throws Exception {
final ImmutablePair<Job, String> result = executeQuery(bigQuery, getQueryConfig(sql, params));

if (result.getLeft() != null) {
FieldList fieldList = result.getLeft().getQueryResults().getSchema().getFields();
return Streams.stream(result.getLeft().getQueryResults().iterateAll())
.map(fieldValues -> BigQueryUtils.rowToJson(fieldValues, fieldList));
} else
throw new Exception(
"Failed to execute query " + sql + (params != null && !params.isEmpty() ? " with params " + params : "") + ". Error: " + result.getRight());
}

@Override
public void close() throws Exception {
/**
* The BigQuery doesn't require connection close. It will be done automatically.
*/
}

public QueryJobConfiguration getQueryConfig(String sql, List<QueryParameterValue> params) {
return QueryJobConfiguration
.newBuilder(sql)
.setUseLegacySql(false)
.setPositionalParameters(params)
.build();
}

public ImmutablePair<Job, String> executeQuery(BigQuery bigquery, QueryJobConfiguration queryConfig) {
final JobId jobId = JobId.of(UUID.randomUUID().toString());
final Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
return executeQuery(queryJob);
}

/**
* Returns full information about all tables from entire project
*
* @param projectId BigQuery project id
* @return List of BigQuery tables
*/
public List<Table> getProjectTables(String projectId) {
List<Table> tableList = new ArrayList<>();
bigQuery.listDatasets(projectId)
.iterateAll()
.forEach(dataset -> bigQuery.listTables(dataset.getDatasetId())
.iterateAll()
.forEach(table -> tableList.add(bigQuery.getTable(table.getTableId()))));
return tableList;
}

/**
* Returns full information about all tables from specific Dataset
*
* @param datasetId BigQuery dataset id
* @return List of BigQuery tables
*/
public List<Table> getDatasetTables(String datasetId) {
List<Table> tableList = new ArrayList<>();
bigQuery.listTables(datasetId)
.iterateAll()
.forEach(table -> tableList.add(bigQuery.getTable(table.getTableId())));
return tableList;
}

public BigQuery getBigQuery() {
return bigQuery;
}

public void cleanDataSet(String dataSetId) {
// allows deletion of a dataset that has contents
final BigQuery.DatasetDeleteOption option = BigQuery.DatasetDeleteOption.deleteContents();

final boolean success = bigQuery.delete(dataSetId, option);
if (success) {
LOGGER.info("BQ Dataset " + dataSetId + " deleted...");
} else {
LOGGER.info("BQ Dataset cleanup for " + dataSetId + " failed!");
}
}

private ImmutablePair<Job, String> executeQuery(Job queryJob) {
final Job completedJob = waitForQuery(queryJob);
if (completedJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (completedJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
return ImmutablePair.of(null, (completedJob.getStatus().getError().toString()));
}

return ImmutablePair.of(completedJob, null);
}

private Job waitForQuery(Job queryJob) {
try {
return queryJob.waitFor();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}
Loading

0 comments on commit 4c0d1a0

Please sign in to comment.