Skip to content

Commit

Permalink
addressed review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
peihe committed Mar 31, 2016
1 parent 11c3a67 commit b70452b
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.dataflow.sdk.io;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.QueryRequest;
Expand Down Expand Up @@ -55,14 +58,14 @@
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;

Expand Down Expand Up @@ -786,7 +789,6 @@ private Bound(String name, TableReference ref,
this.createDisposition = createDisposition;
this.writeDisposition = writeDisposition;
this.validate = validate;

}

/**
Expand Down Expand Up @@ -971,24 +973,29 @@ public PDone apply(PCollection<TableRow> input) {
}

String tempLocation = options.getTempLocation();
Preconditions.checkArgument(!Strings.isNullOrEmpty(tempLocation));
String tempFilePrefix = tempLocation + "/BigQuerySinkTemp";
checkArgument(!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Write needs a GCS temp location to store temp files.");
try {
String jobIdToken = UUID.randomUUID().toString();
String jsonTable = JSON_FACTORY.toString(table);
String jsonSchema = JSON_FACTORY.toString(schema);
return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to(
new BigQuerySink<>(
jobIdToken,
jsonTable,
jsonSchema,
getWriteDisposition(),
getCreateDisposition(),
tempFilePrefix,
input.getCoder())));
} catch (IOException e) {
throw new RuntimeException("Cannot initialize table and schema to JSON strings.", e);
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(String.format(
"BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
tempLocation), e);
}

String jobIdToken = UUID.randomUUID().toString();
String tempFilePrefix = tempLocation + "/BigQuerySinkTemp/" + jobIdToken;
String jsonTable = toJsonString(table);
String jsonSchema = toJsonString(schema);
return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to(
new BigQuerySink(
jobIdToken,
jsonTable,
jsonSchema,
getWriteDisposition(),
getCreateDisposition(),
tempFilePrefix,
input.getCoder())));
}

@Override
Expand Down Expand Up @@ -1042,13 +1049,13 @@ private Write() {}
*
* <p>It uses BigQuery load job to import files into BigQuery.
*/
static class BigQuerySink<T> extends FileBasedSink<T> {
static class BigQuerySink extends FileBasedSink<TableRow> {
private final String jobIdToken;
private final String jsonTable;
private final String jsonSchema;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final Coder<T> coder;
private final Coder<TableRow> coder;

public BigQuerySink(
String jobIdToken,
Expand All @@ -1057,55 +1064,38 @@ public BigQuerySink(
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
String tempFile,
Coder<T> coder) {
super(tempFile, "");
this.jobIdToken = jobIdToken;
this.jsonTable = jsonTable;
this.jsonSchema = jsonSchema;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.coder = coder;
Coder<TableRow> coder) {
super(tempFile, "" /* extension */);
this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
this.jsonTable = checkNotNull(jsonTable, "jsonTable");
this.jsonSchema = checkNotNull(jsonSchema, "jsonSchema");
this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
this.createDisposition = checkNotNull(createDisposition, "createDisposition");
this.coder = checkNotNull(coder, "coder");
}

@Override
public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
return new BigQueryWriteOperation<>(
this, jobIdToken, jsonTable, jsonSchema, writeDisposition, createDisposition, coder);
public FileBasedSink.FileBasedWriteOperation<TableRow> createWriteOperation(
PipelineOptions options) {
return new BigQueryWriteOperation(this);
}

private static class BigQueryWriteOperation<T> extends FileBasedWriteOperation<T> {
private final String jobIdToken;
private final String jsonTable;
private final String jsonSchema;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final Coder<T> coder;

private BigQueryWriteOperation(
BigQuerySink<T> sink,
String jobIdToken,
String jsonTable,
String jsonSchema,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
Coder<T> coder) {
super(sink);
this.jobIdToken = jobIdToken;
this.jsonTable = jsonTable;
this.jsonSchema = jsonSchema;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.coder = coder;
private static class BigQueryWriteOperation extends FileBasedWriteOperation<TableRow> {
private final BigQuerySink bigQuerySink;

private BigQueryWriteOperation(BigQuerySink sink) {
super(checkNotNull(sink, "sink"));
this.bigQuerySink = sink;
}

@Override
public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
return new TextWriter<>(this, coder);
public FileBasedWriter<TableRow> createWriter(PipelineOptions options) throws Exception {
return new TableRowWriter(this, bigQuerySink.coder);
}

@Override
public void finalize(Iterable<FileResult> writerResults, PipelineOptions options)
throws Exception {
throws IOException, InterruptedException {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
Bigquery client = Transport.newBigQueryClient(bqOptions).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client);
Expand All @@ -1115,22 +1105,23 @@ public void finalize(Iterable<FileResult> writerResults, PipelineOptions options
}
if (!tempFiles.isEmpty()) {
inserter.load(
jobIdToken,
JSON_FACTORY.fromString(jsonTable, TableReference.class),
bigQuerySink.jobIdToken,
JSON_FACTORY.fromString(bigQuerySink.jsonTable, TableReference.class),
tempFiles,
JSON_FACTORY.fromString(jsonSchema, TableSchema.class),
writeDisposition,
createDisposition);
JSON_FACTORY.fromString(bigQuerySink.jsonSchema, TableSchema.class),
bigQuerySink.writeDisposition,
bigQuerySink.createDisposition);
}
}
}

private static class TextWriter<T> extends FileBasedWriter<T> {
private static class TableRowWriter extends FileBasedWriter<TableRow> {
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
private final Coder<T> coder;
private final Coder<TableRow> coder;
private OutputStream out;

public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
public TableRowWriter(
FileBasedWriteOperation<TableRow> writeOperation, Coder<TableRow> coder) {
super(writeOperation);
this.mimeType = MimeTypes.TEXT;
this.coder = coder;
Expand All @@ -1142,7 +1133,8 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
}

@Override
public void write(T value) throws Exception {
public void write(TableRow value) throws Exception {
// Use Context.OUTER to encode and NEWLINE as the delimeter.
coder.encode(value, out, Context.OUTER);
out.write(NEWLINE);
}
Expand Down Expand Up @@ -1218,11 +1210,7 @@ private static class StreamingWriteFn

/** Constructor. */
StreamingWriteFn(TableSchema schema) {
try {
jsonTableSchema = JSON_FACTORY.toString(schema);
} catch (IOException e) {
throw new RuntimeException("Cannot initialize BigQuery streaming writer.", e);
}
jsonTableSchema = toJsonString(schema);
}

/** Prepares a target BigQuery table. */
Expand Down Expand Up @@ -1328,8 +1316,7 @@ public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
public static <KeyT> ShardedKeyCoder<KeyT> of(
@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
List<Coder<KeyT>> components) {
Preconditions.checkArgument(components.size() == 1,
"Expecting 1 component, got " + components.size());
checkArgument(components.size() == 1, "Expecting 1 component, got " + components.size());
return of(components.get(0));
}

Expand Down Expand Up @@ -1434,7 +1421,7 @@ private static class TagWithUniqueIdsAndTable

TagWithUniqueIdsAndTable(BigQueryOptions options, TableReference table,
SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
Preconditions.checkArgument(table == null ^ tableRefFunction == null,
checkArgument(table == null ^ tableRefFunction == null,
"Exactly one of table or tableRefFunction should be set");
if (table != null) {
if (table.getProjectId() == null) {
Expand Down Expand Up @@ -1539,6 +1526,16 @@ public PDone apply(PCollection<TableRow> input) {
}
}

private static String toJsonString(Object item) {
try {
return JSON_FACTORY.toString(item);
} catch (IOException e) {
throw new RuntimeException(
String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()),
e);
}
}

/////////////////////////////////////////////////////////////////////////////

/** Disallow construction of utility class. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ public class BigQueryTableInserter {
private static final int MAX_LOAD_JOB_POLL_RETRIES = 10;

// The initial backoff for polling the status of a load job.
private static final int INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = 60000;
private static final long INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60);

// The maximum number of attempts to execute a load job RPC.
private static final int MAX_LOAD_JOB_RPC_ATTEMPTS = 10;

// The initial backoff for executing a load job RPC.
private static final int INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = 1000;
private static final long INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);

private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor();

Expand Down Expand Up @@ -297,6 +297,13 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {

/**
* Import files into BigQuery with load jobs.
*
* <p>Returns if files are successfully loaded into BigQuery.
* Throws a RuntimeException if:
* 1. The status of one load job is UNKNOWN. This is to avoid duplicating data.
* 2. It exceeds {@code MAX_RETRY_LOAD_JOBS}.
*
* <p>If a load job failed, it will try another load job with a different job id.
*/
public void load(
String jobId,
Expand All @@ -315,22 +322,32 @@ public void load(

String projectId = ref.getProjectId();
for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) {
BackOff backoff = new AttemptBoundedExponentialBackOff(
MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS);
String retryingJobId = jobId + "-" + i;
insertLoadJob(retryingJobId, loadConfig);
insertLoadJob(retryingJobId, loadConfig, Sleeper.DEFAULT, backoff);
Status jobStatus = pollJobStatus(projectId, retryingJobId);
if (jobStatus == Status.SUCCEEDED) {
return;
} else if (jobStatus == Status.UNKNOWN) {
throw new RuntimeException("Failed to poll the load job status.");
switch (jobStatus) {
case SUCCEEDED:
return;
case UNKNOWN:
throw new RuntimeException("Failed to poll the load job status.");
case FAILED:
continue;
default:
throw new IllegalStateException("Unexpected job status: " + jobStatus);
}
// continue for Status.FAILED
}
throw new RuntimeException(
"Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS);
}

@VisibleForTesting
void insertLoadJob(String jobId, JobConfigurationLoad loadConfig)
void insertLoadJob(
String jobId,
JobConfigurationLoad loadConfig,
Sleeper sleeper,
BackOff backoff)
throws InterruptedException, IOException {
TableReference ref = loadConfig.getDestinationTable();
String projectId = ref.getProjectId();
Expand All @@ -345,9 +362,6 @@ void insertLoadJob(String jobId, JobConfigurationLoad loadConfig)
job.setConfiguration(config);

Exception lastException = null;
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = new AttemptBoundedExponentialBackOff(
MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS);
do {
try {
client.jobs().insert(projectId, job).execute();
Expand All @@ -367,7 +381,7 @@ void insertLoadJob(String jobId, JobConfigurationLoad loadConfig)
} while (nextBackOff(sleeper, backoff));
throw new IOException(
String.format(
"Unable to insert job: {}, aborting after {} retries.",
"Unable to insert job: %s, aborting after %d retries.",
jobId, MAX_LOAD_JOB_RPC_ATTEMPTS),
lastException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper;
import com.google.cloud.hadoop.util.RetryBoundedBackOff;
import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -259,8 +260,11 @@ public void testInsertLoadJobSucceeds() throws IOException, InterruptedException
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setDestinationTable(ref);

Sleeper sleeper = new FastNanoClockAndSleeper();
BackOff backoff = new AttemptBoundedExponentialBackOff(
5 /* attempts */, 1000 /* initialIntervalMillis */);
BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
inserter.insertLoadJob("jobId", loadConfig);
inserter.insertLoadJob("jobId", loadConfig, sleeper, backoff);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
Expand All @@ -278,8 +282,11 @@ public void testInsertLoadJobSucceedsAlreadyExists() throws IOException, Interru
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setDestinationTable(ref);

Sleeper sleeper = new FastNanoClockAndSleeper();
BackOff backoff = new AttemptBoundedExponentialBackOff(
5 /* attempts */, 1000 /* initialIntervalMillis */);
BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
inserter.insertLoadJob("jobId", loadConfig);
inserter.insertLoadJob("jobId", loadConfig, sleeper, backoff);

verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
Expand All @@ -305,8 +312,11 @@ public void testInsertLoadJobRetry() throws IOException, InterruptedException {
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setDestinationTable(ref);

Sleeper sleeper = new FastNanoClockAndSleeper();
BackOff backoff = new AttemptBoundedExponentialBackOff(
5 /* attempts */, 1000 /* initialIntervalMillis */);
BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
inserter.insertLoadJob("jobId", loadConfig);
inserter.insertLoadJob("jobId", loadConfig, sleeper, backoff);
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
Expand Down

0 comments on commit b70452b

Please sign in to comment.