Skip to content

Commit

Permalink
Add BIG_QUERY_DEFAULT_PROJECT config to sink (#1321)
Browse files Browse the repository at this point in the history
  • Loading branch information
relud committed Jul 7, 2020
1 parent e06abf4 commit 6a718a9
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class SinkConfig {
private static final String BATCH_MAX_DELAY = "BATCH_MAX_DELAY";
private static final String BATCH_MAX_MESSAGES = "BATCH_MAX_MESSAGES";
private static final String BIG_QUERY_OUTPUT_MODE = "BIG_QUERY_OUTPUT_MODE";
private static final String BIG_QUERY_DEFAULT_PROJECT = "BIG_QUERY_DEFAULT_PROJECT";
private static final String LOAD_MAX_BYTES = "LOAD_MAX_BYTES";
private static final String LOAD_MAX_DELAY = "LOAD_MAX_DELAY";
private static final String LOAD_MAX_FILES = "LOAD_MAX_FILES";
Expand All @@ -60,11 +61,11 @@ public class SinkConfig {

private static final Set<String> INCLUDE_ENV_VARS = ImmutableSet.of(INPUT_COMPRESSION,
INPUT_PARALLELISM, INPUT_SUBSCRIPTION, BATCH_MAX_BYTES, BATCH_MAX_DELAY, BATCH_MAX_MESSAGES,
BIG_QUERY_OUTPUT_MODE, LOAD_MAX_BYTES, LOAD_MAX_DELAY, LOAD_MAX_FILES, OUTPUT_BUCKET,
OUTPUT_COMPRESSION, OUTPUT_FORMAT, OUTPUT_PARALLELISM, OUTPUT_TABLE, OUTPUT_TOPIC,
MAX_OUTSTANDING_ELEMENT_COUNT, MAX_OUTSTANDING_REQUEST_BYTES, SCHEMAS_LOCATION,
STREAMING_BATCH_MAX_BYTES, STREAMING_BATCH_MAX_DELAY, STREAMING_BATCH_MAX_MESSAGES,
STREAMING_DOCTYPES, STRICT_SCHEMA_DOCTYPES);
BIG_QUERY_OUTPUT_MODE, BIG_QUERY_DEFAULT_PROJECT, LOAD_MAX_BYTES, LOAD_MAX_DELAY,
LOAD_MAX_FILES, OUTPUT_BUCKET, OUTPUT_COMPRESSION, OUTPUT_FORMAT, OUTPUT_PARALLELISM,
OUTPUT_TABLE, OUTPUT_TOPIC, MAX_OUTSTANDING_ELEMENT_COUNT, MAX_OUTSTANDING_REQUEST_BYTES,
SCHEMAS_LOCATION, STREAMING_BATCH_MAX_BYTES, STREAMING_BATCH_MAX_DELAY,
STREAMING_BATCH_MAX_MESSAGES, STREAMING_DOCTYPES, STRICT_SCHEMA_DOCTYPES);

// BigQuery.Write.Batch.getByteSize reports protobuf size, which can be ~1/3rd more
// efficient than the JSON that actually gets sent over HTTP, so we use to 60% of the
Expand Down Expand Up @@ -386,8 +387,11 @@ private static String getBigQueryOutputBucket(Env env) {
return getGcsOutputBucket(env) + OUTPUT_TABLE + "=" + env.getString(OUTPUT_TABLE) + "/";
}

private static com.google.cloud.bigquery.BigQuery getBigQueryService(Env env) {
return BigQueryOptions.getDefaultInstance().getService();
@VisibleForTesting
static com.google.cloud.bigquery.BigQuery getBigQueryService(Env env) {
BigQueryOptions.Builder builder = BigQueryOptions.getDefaultInstance().toBuilder();
env.optString(BIG_QUERY_DEFAULT_PROJECT).ifPresent(builder::setProjectId);
return builder.build().getService();
}

private static Storage getGcsService(Env env) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public boolean containsKey(String key) {
return env.containsKey(key);
}

private Optional<String> optString(String key) {
/** Get the value of an optional environment variable. */
public Optional<String> optString(String key) {
if (!include.contains(key)) {
throw new IllegalArgumentException("key missing from include: " + key);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.mozilla.telemetry.ingestion.sink.config;

import static org.junit.Assert.assertEquals;

import com.google.cloud.bigquery.BigQueryOptions;
import com.google.common.collect.ImmutableSet;
import com.mozilla.telemetry.ingestion.sink.util.Env;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.EnvironmentVariables;

public class SinkConfigTest {

@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();

@Test
public void canDetectBqProject() {
environmentVariables.set("GOOGLE_CLOUD_PROJECT", "gcp-project");
environmentVariables.set("BIG_QUERY_DEFAULT_PROJECT", "bq-project");
Env env = new Env(ImmutableSet.of("BIG_QUERY_DEFAULT_PROJECT"));
assertEquals("gcp-project", BigQueryOptions.getDefaultInstance().getProjectId());
assertEquals("bq-project", SinkConfig.getBigQueryService(env).getOptions().getProjectId());
}
}

0 comments on commit 6a718a9

Please sign in to comment.