Skip to content

Commit

Permalink
[18713] Destination-bigquery: updated Check method to test for non-bi…
Browse files Browse the repository at this point in the history
…llable project (#19489)

* [18713] Destination-bigquery: updated Check method to test for non-billable project

* fixed typo in comments

* Bumped version

* auto-bump connector version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
etsybaev and octavia-squidington-iii authored Nov 28, 2022
1 parent ccda38b commit be530bb
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
- name: BigQuery
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 1.2.7
dockerImageTag: 1.2.8
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
normalizationRepository: airbyte/normalization
Expand All @@ -55,7 +55,7 @@
- name: BigQuery (denormalized typed struct)
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerRepository: airbyte/destination-bigquery-denormalized
dockerImageTag: 1.2.7
dockerImageTag: 1.2.8
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
normalizationRepository: airbyte/normalization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-bigquery:1.2.7"
- dockerImage: "airbyte/destination-bigquery:1.2.8"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery"
connectionSpecification:
Expand Down Expand Up @@ -831,7 +831,7 @@
- "overwrite"
- "append"
- "append_dedup"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.7"
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.8"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/bigquery"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.7
LABEL io.airbyte.version=1.2.8
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.2.7
LABEL io.airbyte.version=1.2.8
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Clustering;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
Expand All @@ -25,12 +28,14 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.WorkerEnvConstants;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -45,6 +50,7 @@
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand All @@ -64,7 +70,8 @@ public class BigQueryUtils {
DateTimeFormatter.ofPattern("[yyyy][yy]['-']['/']['.'][' '][MMM][MM][M]['-']['/']['.'][' '][dd][d]" +
"[[' ']['T']HH:mm[':'ss[.][SSSSSS][SSSSS][SSSS][SSS][' '][z][zzz][Z][O][x][XXX][XX][X]]]");
private static final String USER_AGENT_FORMAT = "%s (GPN: Airbyte)";
private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp";
private static final String CHECK_TEST_DATASET_SUFFIX = "_airbyte_check_stage_tmp_" + System.currentTimeMillis();
private static final String CHECK_TEST_TMP_TABLE_NAME = "test_connection_table_name";

public static ImmutablePair<Job, String> executeQuery(final BigQuery bigquery, final QueryJobConfiguration queryConfig) {
final JobId jobId = JobId.of(UUID.randomUUID().toString());
Expand Down Expand Up @@ -119,16 +126,67 @@ public static Dataset getOrCreateDataset(final BigQuery bigquery, final String d

public static void checkHasCreateAndDeleteDatasetRole(final BigQuery bigquery, final String datasetId, final String datasetLocation) {
final String tmpTestDatasetId = datasetId + CHECK_TEST_DATASET_SUFFIX;
final Dataset dataset = bigquery.getDataset(tmpTestDatasetId);
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build();

bigquery.create(datasetInfo);

// remove possible tmp datasets from previous execution
if (dataset != null && dataset.exists()) {
try {
attemptCreateTableAndTestInsert(bigquery, tmpTestDatasetId);
} finally {
bigquery.delete(tmpTestDatasetId);
}
}

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(tmpTestDatasetId).setLocation(datasetLocation).build();
bigquery.create(datasetInfo);
bigquery.delete(tmpTestDatasetId);
/**
* Method is used to create tmp table and make dummy record insert. It's used in Check() connection
* method to make sure that user has all required roles for upcoming data sync/migration. It also
* verifies if BigQuery project is billable, if not - later sync will fail as non-billable project
* has limitations with stream uploading and DML queries. More details may be found there:
* https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery
* https://cloud.google.com/bigquery/docs/reference/standard-sql/data-manipulation-language
*
* @param bigquery - initialized bigquery client
* @param tmpTestDatasetId - dataset name where tmp table will be created
*/
private static void attemptCreateTableAndTestInsert(final BigQuery bigquery, final String tmpTestDatasetId) {
// Create dummy schema that will be used for tmp table creation
final Schema testTableSchema = Schema.of(
Field.of("id", StandardSQLTypeName.INT64),
Field.of("name", StandardSQLTypeName.STRING));

// Create tmp table to verify if user has a create table permission. Also below we will do test
// records insert in it
final Table test_connection_table_name = createTable(bigquery, tmpTestDatasetId,
CHECK_TEST_TMP_TABLE_NAME, testTableSchema);

// Try to make test (dummy records) insert to make sure that user has required permissions
try {
final InsertAllResponse response =
bigquery.insertAll(InsertAllRequest
.newBuilder(test_connection_table_name)
.addRow(Map.of("id", 1, "name", "James"))
.addRow(Map.of("id", 2, "name", "Eugene"))
.addRow(Map.of("id", 3, "name", "Angelina"))
.build());

if (response.hasErrors()) {
// If any of the insertions failed, this lets you inspect the errors
for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
throw new ConfigErrorException("Failed to check connection: \n" + entry.getValue());
}
}
} catch (final BigQueryException e) {
throw new ConfigErrorException("Failed to check connection: \n" + e.getMessage());
} finally {
test_connection_table_name.delete();
}
}

public static Table createTable(final BigQuery bigquery, String datasetName, String tableName, Schema schema) {
final TableId tableId = TableId.of(datasetName, tableName);
final TableDefinition tableDefinition = StandardTableDefinition.of(schema);
final TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
return bigquery.create(tableInfo);
}

// https://cloud.google.com/bigquery/docs/creating-partitioned-tables#java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class BigQueryDestinationTest {
protected static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json");
protected static final Path CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH =
Path.of("secrets/credentials-with-missed-dataset-creation-role.json");
protected static final Path CREDENTIALS_NON_BILLABLE_PROJECT_PATH =
Path.of("secrets/credentials-non-billable-project.json");

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestinationTest.class);
private static final String DATASET_NAME_PREFIX = "bq_dest_integration_test";
Expand Down Expand Up @@ -250,8 +252,7 @@ void testCheckFailureInsufficientPermissionForCreateDataset(final DatasetIdReset
please add file with creds to
../destination-bigquery/secrets/credentialsWithMissedDatasetCreationRole.json.""");
}
final String fullConfigAsString = Files.readString(
CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH);
final String fullConfigAsString = Files.readString(CREDENTIALS_WITH_MISSED_CREATE_DATASET_ROLE_PATH);
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG);
final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();
final String datasetId = Strings.addRandomSuffix(DATASET_NAME_PREFIX, "_", 8);
Expand All @@ -276,6 +277,41 @@ void testCheckFailureInsufficientPermissionForCreateDataset(final DatasetIdReset
assertThat(ex.getMessage()).contains("User does not have bigquery.datasets.create permission");
}

@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testCheckFailureNonBillableProject(final DatasetIdResetter resetDatasetId) throws IOException {

if (!Files.exists(CREDENTIALS_NON_BILLABLE_PROJECT_PATH)) {
throw new IllegalStateException("""
Json config not found. Must provide path to a big query credentials file,
please add file with creds to
../destination-bigquery/secrets/credentials-non-billable-project.json""");
}
final String fullConfigAsString = Files.readString(CREDENTIALS_NON_BILLABLE_PROJECT_PATH);

final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString).get(BigQueryConsts.BIGQUERY_BASIC_CONFIG);
final String projectId = credentialsJson.get(BigQueryConsts.CONFIG_PROJECT_ID).asText();

final JsonNode insufficientRoleConfig;

insufficientRoleConfig = Jsons.jsonNode(ImmutableMap.builder()
.put(BigQueryConsts.CONFIG_PROJECT_ID, projectId)
.put(BigQueryConsts.CONFIG_CREDS, credentialsJson.toString())
.put(BigQueryConsts.CONFIG_DATASET_ID, "testnobilling")
.put(BigQueryConsts.CONFIG_DATASET_LOCATION, "US")
.put(BIG_QUERY_CLIENT_CHUNK_SIZE, 10)
.build());

resetDatasetId.accept(insufficientRoleConfig);

// Assert that check throws exception. Later it will be handled by IntegrationRunner
final ConfigErrorException ex = assertThrows(ConfigErrorException.class, () -> {
new BigQueryDestination().check(insufficientRoleConfig);
});

assertThat(ex.getMessage()).contains("Access Denied: BigQuery BigQuery: Streaming insert is not allowed in the free tier");
}

@ParameterizedTest
@MethodSource("datasetIdResetterProvider")
void testWriteSuccess(final DatasetIdResetter resetDatasetId) throws Exception {
Expand Down
6 changes: 4 additions & 2 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.2.7 | 2022-11-11 | [#19358](https://github.com/airbytehq/airbyte/pull/19358) | fixed check method to capture mismatch dataset location |
| 1.2.8 | 2022-11-22 | [#19489](https://github.com/airbytehq/airbyte/pull/19489) | Added non-billable projects handle to check connection stage |
| 1.2.7 | 2022-11-11 | [#19358](https://github.com/airbytehq/airbyte/pull/19358) | Fixed check method to capture mismatch dataset location |
| 1.2.6 | 2022-11-10 | [#18554](https://github.com/airbytehq/airbyte/pull/18554) | Improve check connection method to handle more errors |
| 1.2.5 | 2022-10-19 | [#18162](https://github.com/airbytehq/airbyte/pull/18162) | Improve error logs |
| 1.2.4 | 2022-09-26 | [#16890](https://github.com/airbytehq/airbyte/pull/16890) | Add user-agent header |
Expand Down Expand Up @@ -189,7 +190,8 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.2.7 | 2022-11-11 | [#19358](https://github.com/airbytehq/airbyte/pull/19358) | fixed check method to capture mismatch dataset location |
| 1.2.8 | 2022-11-22 | [#19489](https://github.com/airbytehq/airbyte/pull/19489) | Added non-billable projects handle to check connection stage |
| 1.2.7 | 2022-11-11 | [#19358](https://github.com/airbytehq/airbyte/pull/19358) | Fixed check method to capture mismatch dataset location |
| 1.2.6 | 2022-11-10 | [#18554](https://github.com/airbytehq/airbyte/pull/18554) | Improve check connection method to handle more errors |
| 1.2.5 | 2022-10-19 | [#18162](https://github.com/airbytehq/airbyte/pull/18162) | Improve error logs |
| 1.2.4 | 2022-09-26 | [#16890](https://github.com/airbytehq/airbyte/pull/16890) | Add user-agent header |
Expand Down

0 comments on commit be530bb

Please sign in to comment.