Skip to content

Commit

Permalink
[BEAM-50] Implement BigQueryIO.Write as a custom sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
peihe committed Mar 15, 2016
1 parent 362e2b6 commit 467a924
Showing 1 changed file with 140 additions and 1 deletion.
141 changes: 140 additions & 1 deletion sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.Coder.Context;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
Expand All @@ -35,6 +36,7 @@
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
import com.google.cloud.dataflow.sdk.options.GcpOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
Expand All @@ -46,6 +48,7 @@
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.BigQueryTableInserter;
import com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator;
import com.google.cloud.dataflow.sdk.util.MimeTypes;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.Reshuffle;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
Expand All @@ -60,6 +63,8 @@
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -70,6 +75,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -778,6 +786,7 @@ private Bound(String name, TableReference ref,
this.createDisposition = createDisposition;
this.writeDisposition = writeDisposition;
this.validate = validate;

}

/**
Expand Down Expand Up @@ -961,7 +970,25 @@ public PDone apply(PCollection<TableRow> input) {
return input.apply(new StreamWithDeDup(table, tableRefFunction, schema));
}

return PDone.in(input.getPipeline());
String tempLocation = options.getTempLocation();
Preconditions.checkArgument(!Strings.isNullOrEmpty(tempLocation));
String tempFilePrefix = tempLocation + "/BigQuerySinkTemp";
try {
String jobIdToken = UUID.randomUUID().toString();
String jsonTable = JSON_FACTORY.toString(table);
String jsonSchema = JSON_FACTORY.toString(schema);
return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to(
new BigQuerySink<>(
jobIdToken,
jsonTable,
jsonSchema,
getWriteDisposition(),
getCreateDisposition(),
tempFilePrefix,
input.getCoder())));
} catch (IOException e) {
throw new RuntimeException("Cannot initialize table and schema to JSON strings.", e);
}
}

@Override
Expand Down Expand Up @@ -1010,6 +1037,118 @@ public boolean getValidate() {
private Write() {}
}

/**
* {@link BigQuerySink} is implemented as a {@link FileBasedSink}.
*
* <p>It uses BigQuery load job to import files into BigQuery.
*/
static class BigQuerySink<T> extends FileBasedSink<T> {
private final String jobIdToken;
private final String jsonTable;
private final String jsonSchema;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final Coder<T> coder;

public BigQuerySink(
String jobIdToken,
String jsonTable,
String jsonSchema,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
String tempFile,
Coder<T> coder) {
super(tempFile, "");
this.jobIdToken = jobIdToken;
this.jsonTable = jsonTable;
this.jsonSchema = jsonSchema;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.coder = coder;
}

@Override
public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
return new BigQueryWriteOperation<>(
this, jobIdToken, jsonTable, jsonSchema, writeDisposition, createDisposition, coder);
}

private static class BigQueryWriteOperation<T> extends FileBasedWriteOperation<T> {
private final String jobIdToken;
private final String jsonTable;
private final String jsonSchema;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final Coder<T> coder;

private BigQueryWriteOperation(
BigQuerySink<T> sink,
String jobIdToken,
String jsonTable,
String jsonSchema,
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
Coder<T> coder) {
super(sink);
this.jobIdToken = jobIdToken;
this.jsonTable = jsonTable;
this.jsonSchema = jsonSchema;
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.coder = coder;
}

@Override
public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
return new TextWriter<>(this, coder);
}

@Override
public void finalize(Iterable<FileResult> writerResults, PipelineOptions options)
throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
Bigquery client = Transport.newBigQueryClient(bqOptions).build();
BigQueryTableInserter inserter = new BigQueryTableInserter(client);
List<String> tempFiles = Lists.newArrayList();
for (FileResult result : writerResults) {
tempFiles.add(result.getFilename());
}
if (!tempFiles.isEmpty()) {
inserter.load(
jobIdToken,
JSON_FACTORY.fromString(jsonTable, TableReference.class),
tempFiles,
JSON_FACTORY.fromString(jsonSchema, TableSchema.class),
writeDisposition,
createDisposition);
}
}
}

private static class TextWriter<T> extends FileBasedWriter<T> {
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
private final Coder<T> coder;
private OutputStream out;

public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
super(writeOperation);
this.mimeType = MimeTypes.TEXT;
this.coder = coder;
}

@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
out = Channels.newOutputStream(channel);
}

@Override
public void write(T value) throws Exception {
coder.encode(value, out, Context.OUTER);
out.write(NEWLINE);
}
}
}

private static void verifyDatasetPresence(BigQueryOptions options, TableReference table) {
try {
Bigquery client = Transport.newBigQueryClient(options).build();
Expand Down

0 comments on commit 467a924

Please sign in to comment.