From b70452bc463c9a2df80caeaa9175e59236a5223d Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 30 Mar 2016 12:43:34 -0700 Subject: [PATCH] addressed review feedback --- .../cloud/dataflow/sdk/io/BigQueryIO.java | 145 +++++++++--------- .../sdk/util/BigQueryTableInserter.java | 40 +++-- .../sdk/util/BigQueryTableInserterTest.java | 16 +- 3 files changed, 111 insertions(+), 90 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index f0a8e4ddec73d..18172d34936b4 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -16,6 +16,9 @@ package com.google.cloud.dataflow.sdk.io; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.QueryRequest; @@ -55,6 +58,7 @@ import com.google.cloud.dataflow.sdk.util.Transport; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; @@ -62,7 +66,6 @@ import com.google.cloud.dataflow.sdk.values.PInput; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -786,7 +789,6 @@ private Bound(String name, TableReference ref, this.createDisposition = createDisposition; this.writeDisposition = writeDisposition; this.validate = validate; - } /** @@ -971,24 +973,29 @@ public PDone apply(PCollection input) { } String tempLocation = options.getTempLocation(); - Preconditions.checkArgument(!Strings.isNullOrEmpty(tempLocation)); - String tempFilePrefix = tempLocation + "/BigQuerySinkTemp"; + checkArgument(!Strings.isNullOrEmpty(tempLocation), + "BigQueryIO.Write needs a GCS temp location to store temp files."); try { - String jobIdToken = UUID.randomUUID().toString(); - String jsonTable = JSON_FACTORY.toString(table); - String jsonSchema = JSON_FACTORY.toString(schema); - return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to( - new BigQuerySink<>( - jobIdToken, - jsonTable, - jsonSchema, - getWriteDisposition(), - getCreateDisposition(), - tempFilePrefix, - input.getCoder()))); - } catch (IOException e) { - throw new RuntimeException("Cannot initialize table and schema to JSON strings.", e); + GcsPath.fromUri(tempLocation); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format( + "BigQuery temp location expected a valid 'gs://' path, but was given '%s'", + tempLocation), e); } + + String jobIdToken = UUID.randomUUID().toString(); + String tempFilePrefix = tempLocation + "/BigQuerySinkTemp/" + jobIdToken; + String jsonTable = toJsonString(table); + String jsonSchema = toJsonString(schema); + return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to( + new BigQuerySink( + jobIdToken, + jsonTable, + jsonSchema, + getWriteDisposition(), + getCreateDisposition(), + tempFilePrefix, + input.getCoder()))); } @Override @@ -1042,13 +1049,13 @@ private Write() {} * *

It uses BigQuery load job to import files into BigQuery. */ - static class BigQuerySink extends FileBasedSink { + static class BigQuerySink extends FileBasedSink { private final String jobIdToken; private final String jsonTable; private final String jsonSchema; private final WriteDisposition writeDisposition; private final CreateDisposition createDisposition; - private final Coder coder; + private final Coder coder; public BigQuerySink( String jobIdToken, @@ -1057,55 +1064,38 @@ public BigQuerySink( WriteDisposition writeDisposition, CreateDisposition createDisposition, String tempFile, - Coder coder) { - super(tempFile, ""); - this.jobIdToken = jobIdToken; - this.jsonTable = jsonTable; - this.jsonSchema = jsonSchema; - this.writeDisposition = writeDisposition; - this.createDisposition = createDisposition; - this.coder = coder; + Coder coder) { + super(tempFile, "" /* extension */); + this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); + this.jsonTable = checkNotNull(jsonTable, "jsonTable"); + this.jsonSchema = checkNotNull(jsonSchema, "jsonSchema"); + this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); + this.createDisposition = checkNotNull(createDisposition, "createDisposition"); + this.coder = checkNotNull(coder, "coder"); } @Override - public FileBasedSink.FileBasedWriteOperation createWriteOperation(PipelineOptions options) { - return new BigQueryWriteOperation<>( - this, jobIdToken, jsonTable, jsonSchema, writeDisposition, createDisposition, coder); + public FileBasedSink.FileBasedWriteOperation createWriteOperation( + PipelineOptions options) { + return new BigQueryWriteOperation(this); } - private static class BigQueryWriteOperation extends FileBasedWriteOperation { - private final String jobIdToken; - private final String jsonTable; - private final String jsonSchema; - private final WriteDisposition writeDisposition; - private final CreateDisposition createDisposition; - private final Coder coder; - - private BigQueryWriteOperation( - BigQuerySink sink, - String jobIdToken, - String jsonTable, - String jsonSchema, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - Coder coder) { - super(sink); - this.jobIdToken = jobIdToken; - this.jsonTable = jsonTable; - this.jsonSchema = jsonSchema; - this.writeDisposition = writeDisposition; - this.createDisposition = createDisposition; - this.coder = coder; + private static class BigQueryWriteOperation extends FileBasedWriteOperation { + private final BigQuerySink bigQuerySink; + + private BigQueryWriteOperation(BigQuerySink sink) { + super(checkNotNull(sink, "sink")); + this.bigQuerySink = sink; } @Override - public FileBasedWriter createWriter(PipelineOptions options) throws Exception { - return new TextWriter<>(this, coder); + public FileBasedWriter createWriter(PipelineOptions options) throws Exception { + return new TableRowWriter(this, bigQuerySink.coder); } @Override public void finalize(Iterable writerResults, PipelineOptions options) - throws Exception { + throws IOException, InterruptedException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); Bigquery client = Transport.newBigQueryClient(bqOptions).build(); BigQueryTableInserter inserter = new BigQueryTableInserter(client); @@ -1115,22 +1105,23 @@ public void finalize(Iterable writerResults, PipelineOptions options } if (!tempFiles.isEmpty()) { inserter.load( - jobIdToken, - JSON_FACTORY.fromString(jsonTable, TableReference.class), + bigQuerySink.jobIdToken, + JSON_FACTORY.fromString(bigQuerySink.jsonTable, TableReference.class), tempFiles, - JSON_FACTORY.fromString(jsonSchema, TableSchema.class), - writeDisposition, - createDisposition); + JSON_FACTORY.fromString(bigQuerySink.jsonSchema, TableSchema.class), + bigQuerySink.writeDisposition, + bigQuerySink.createDisposition); } } } - private static class TextWriter extends FileBasedWriter { + private static class TableRowWriter extends FileBasedWriter { private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); - private final Coder coder; + private final Coder coder; private OutputStream out; - public TextWriter(FileBasedWriteOperation writeOperation, Coder coder) { + public TableRowWriter( + FileBasedWriteOperation writeOperation, Coder coder) { super(writeOperation); this.mimeType = MimeTypes.TEXT; this.coder = coder; @@ -1142,7 +1133,8 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception { } @Override - public void write(T value) throws Exception { + public void write(TableRow value) throws Exception { + // Use Context.OUTER to encode and NEWLINE as the delimeter. coder.encode(value, out, Context.OUTER); out.write(NEWLINE); } @@ -1218,11 +1210,7 @@ private static class StreamingWriteFn /** Constructor. */ StreamingWriteFn(TableSchema schema) { - try { - jsonTableSchema = JSON_FACTORY.toString(schema); - } catch (IOException e) { - throw new RuntimeException("Cannot initialize BigQuery streaming writer.", e); - } + jsonTableSchema = toJsonString(schema); } /** Prepares a target BigQuery table. */ @@ -1328,8 +1316,7 @@ public static ShardedKeyCoder of(Coder keyCoder) { public static ShardedKeyCoder of( @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List> components) { - Preconditions.checkArgument(components.size() == 1, - "Expecting 1 component, got " + components.size()); + checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size()); return of(components.get(0)); } @@ -1434,7 +1421,7 @@ private static class TagWithUniqueIdsAndTable TagWithUniqueIdsAndTable(BigQueryOptions options, TableReference table, SerializableFunction tableRefFunction) { - Preconditions.checkArgument(table == null ^ tableRefFunction == null, + checkArgument(table == null ^ tableRefFunction == null, "Exactly one of table or tableRefFunction should be set"); if (table != null) { if (table.getProjectId() == null) { @@ -1539,6 +1526,16 @@ public PDone apply(PCollection input) { } } + private static String toJsonString(Object item) { + try { + return JSON_FACTORY.toString(item); + } catch (IOException e) { + throw new RuntimeException( + String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()), + e); + } + } + ///////////////////////////////////////////////////////////////////////////// /** Disallow construction of utility class. */ diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java index 19a1f1a199fe5..05db870b4bddf 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java @@ -86,13 +86,13 @@ public class BigQueryTableInserter { private static final int MAX_LOAD_JOB_POLL_RETRIES = 10; // The initial backoff for polling the status of a load job. - private static final int INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = 60000; + private static final long INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60); // The maximum number of attempts to execute a load job RPC. private static final int MAX_LOAD_JOB_RPC_ATTEMPTS = 10; // The initial backoff for executing a load job RPC. - private static final int INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = 1000; + private static final long INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); @@ -297,6 +297,13 @@ public List call() throws IOException { /** * Import files into BigQuery with load jobs. + * + *

Returns if files are successfully loaded into BigQuery. + * Throws a RuntimeException if: + * 1. The status of one load job is UNKNOWN. This is to avoid duplicating data. + * 2. It exceeds {@code MAX_RETRY_LOAD_JOBS}. + * + *

If a load job failed, it will try another load job with a different job id. */ public void load( String jobId, @@ -315,22 +322,32 @@ public void load( String projectId = ref.getProjectId(); for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) { + BackOff backoff = new AttemptBoundedExponentialBackOff( + MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS); String retryingJobId = jobId + "-" + i; - insertLoadJob(retryingJobId, loadConfig); + insertLoadJob(retryingJobId, loadConfig, Sleeper.DEFAULT, backoff); Status jobStatus = pollJobStatus(projectId, retryingJobId); - if (jobStatus == Status.SUCCEEDED) { - return; - } else if (jobStatus == Status.UNKNOWN) { - throw new RuntimeException("Failed to poll the load job status."); + switch (jobStatus) { + case SUCCEEDED: + return; + case UNKNOWN: + throw new RuntimeException("Failed to poll the load job status."); + case FAILED: + continue; + default: + throw new IllegalStateException("Unexpected job status: " + jobStatus); } - // continue for Status.FAILED } throw new RuntimeException( "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS); } @VisibleForTesting - void insertLoadJob(String jobId, JobConfigurationLoad loadConfig) + void insertLoadJob( + String jobId, + JobConfigurationLoad loadConfig, + Sleeper sleeper, + BackOff backoff) throws InterruptedException, IOException { TableReference ref = loadConfig.getDestinationTable(); String projectId = ref.getProjectId(); @@ -345,9 +362,6 @@ void insertLoadJob(String jobId, JobConfigurationLoad loadConfig) job.setConfiguration(config); Exception lastException = null; - Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = new AttemptBoundedExponentialBackOff( - MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS); do { try { client.jobs().insert(projectId, job).execute(); @@ -367,7 +381,7 @@ void insertLoadJob(String jobId, JobConfigurationLoad loadConfig) } while (nextBackOff(sleeper, backoff)); throw new IOException( String.format( - "Unable to insert job: {}, aborting after {} retries.", + "Unable to insert job: %s, aborting after %d retries.", jobId, MAX_LOAD_JOB_RPC_ATTEMPTS), lastException); } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java index 31aa3f2516d56..9f0c1ecca7c91 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java @@ -49,6 +49,7 @@ import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition; import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; +import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper; import com.google.cloud.hadoop.util.RetryBoundedBackOff; import com.google.common.collect.ImmutableList; @@ -259,8 +260,11 @@ public void testInsertLoadJobSucceeds() throws IOException, InterruptedException JobConfigurationLoad loadConfig = new JobConfigurationLoad(); loadConfig.setDestinationTable(ref); + Sleeper sleeper = new FastNanoClockAndSleeper(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + 5 /* attempts */, 1000 /* initialIntervalMillis */); BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); - inserter.insertLoadJob("jobId", loadConfig); + inserter.insertLoadJob("jobId", loadConfig, sleeper, backoff); verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); verify(response, times(1)).getContentType(); @@ -278,8 +282,11 @@ public void testInsertLoadJobSucceedsAlreadyExists() throws IOException, Interru JobConfigurationLoad loadConfig = new JobConfigurationLoad(); loadConfig.setDestinationTable(ref); + Sleeper sleeper = new FastNanoClockAndSleeper(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + 5 /* attempts */, 1000 /* initialIntervalMillis */); BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); - inserter.insertLoadJob("jobId", loadConfig); + inserter.insertLoadJob("jobId", loadConfig, sleeper, backoff); verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); @@ -305,8 +312,11 @@ public void testInsertLoadJobRetry() throws IOException, InterruptedException { JobConfigurationLoad loadConfig = new JobConfigurationLoad(); loadConfig.setDestinationTable(ref); + Sleeper sleeper = new FastNanoClockAndSleeper(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + 5 /* attempts */, 1000 /* initialIntervalMillis */); BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); - inserter.insertLoadJob("jobId", loadConfig); + inserter.insertLoadJob("jobId", loadConfig, sleeper, backoff); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); verify(response, times(2)).getContentType();