Skip to content

Commit

Permalink
addressed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
peihe committed Apr 7, 2016
1 parent 4feae9d commit ff7706e
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -741,10 +741,10 @@ public static Bound withoutValidation() {
public static class Bound extends PTransform<PCollection<TableRow>, PDone> {
@Nullable final String jsonTableRef;

final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
@Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;

// Table schema. The schema is required only if the table does not exist.
@Nullable final TableSchema schema;
@Nullable final String jsonSchema;

// Options for creating the table. Valid values are CREATE_IF_NEEDED and
// CREATE_NEVER.
Expand Down Expand Up @@ -785,14 +785,15 @@ public Bound() {
WriteDisposition.WRITE_EMPTY, true, null);
}

private Bound(String name, String jsonTableRef,
SerializableFunction<BoundedWindow, TableReference> tableRefFunction, TableSchema schema,
private Bound(String name, @Nullable String jsonTableRef,
@Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
@Nullable String jsonSchema,
CreateDisposition createDisposition, WriteDisposition writeDisposition,
boolean validate, BigQueryServices testBigQueryServices) {
super(name);
this.jsonTableRef = jsonTableRef;
this.tableRefFunction = tableRefFunction;
this.schema = schema;
this.jsonSchema = jsonSchema;
this.createDisposition = createDisposition;
this.writeDisposition = writeDisposition;
this.validate = validate;
Expand All @@ -805,7 +806,7 @@ private Bound(String name, String jsonTableRef,
* <p>Does not modify this object.
*/
public Bound named(String name) {
return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition,
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
writeDisposition, validate, testBigQueryServices);
}

Expand All @@ -825,7 +826,7 @@ public Bound to(String tableSpec) {
* <p>Does not modify this object.
*/
public Bound to(TableReference table) {
return new Bound(name, toJsonString(table), tableRefFunction, schema, createDisposition,
return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition,
writeDisposition, validate, testBigQueryServices);
}

Expand Down Expand Up @@ -854,7 +855,7 @@ public Bound to(
*/
public Bound toTableReference(
SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition,
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
writeDisposition, validate, testBigQueryServices);
}

Expand All @@ -865,8 +866,8 @@ public Bound toTableReference(
* <p>Does not modify this object.
*/
public Bound withSchema(TableSchema schema) {
return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition,
writeDisposition, validate, testBigQueryServices);
return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema),
createDisposition, writeDisposition, validate, testBigQueryServices);
}

/**
Expand All @@ -875,7 +876,7 @@ public Bound withSchema(TableSchema schema) {
* <p>Does not modify this object.
*/
public Bound withCreateDisposition(CreateDisposition createDisposition) {
return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition,
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
writeDisposition, validate, testBigQueryServices);
}

Expand All @@ -885,7 +886,7 @@ public Bound withCreateDisposition(CreateDisposition createDisposition) {
* <p>Does not modify this object.
*/
public Bound withWriteDisposition(WriteDisposition writeDisposition) {
return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition,
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
writeDisposition, validate, testBigQueryServices);
}

Expand All @@ -895,13 +896,13 @@ public Bound withWriteDisposition(WriteDisposition writeDisposition) {
* <p>Does not modify this object.
*/
public Bound withoutValidation() {
return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition,
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
writeDisposition, false, testBigQueryServices);
}

@VisibleForTesting
Bound withTestServices(BigQueryServices testServices) {
return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition,
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
writeDisposition, validate, testServices);
}

Expand Down Expand Up @@ -942,7 +943,7 @@ public PDone apply(PCollection<TableRow> input) {
+ "transform");
}

if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && schema == null) {
if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && jsonSchema == null) {
throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, "
+ "however no schema was provided.");
}
Expand Down Expand Up @@ -984,7 +985,7 @@ public PDone apply(PCollection<TableRow> input) {
+ "supported for unbounded PCollections or when using tablespec functions.");
}

return input.apply(new StreamWithDeDup(table, tableRefFunction, schema));
return input.apply(new StreamWithDeDup(table, tableRefFunction, getSchema()));
}

String tempLocation = options.getTempLocation();
Expand All @@ -1001,13 +1002,11 @@ public PDone apply(PCollection<TableRow> input) {
}
String jobIdToken = UUID.randomUUID().toString();
String tempFilePrefix = tempLocation + "/BigQuerySinkTemp/" + jobIdToken;
String jsonTable = toJsonString(checkNotNull(table, "table"));
String jsonSchema = toJsonString(schema);
BigQueryServices bqServices = getBigQueryServices();
return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to(
new BigQuerySink(
jobIdToken,
jsonTable,
jsonTableRef,
jsonSchema,
getWriteDisposition(),
getCreateDisposition(),
Expand All @@ -1033,7 +1032,7 @@ public WriteDisposition getWriteDisposition() {

/** Returns the table schema. */
public TableSchema getSchema() {
return schema;
return fromJsonString(jsonSchema, TableSchema.class);
}

/** Returns the table reference, or {@code null} if a . */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws Interrupted
try {
return BackOffUtils.next(sleeper, backoff);
} catch (IOException e) {
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
Expand All @@ -37,6 +38,7 @@
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.util.BigQueryServices;
import com.google.cloud.dataflow.sdk.util.BigQueryServices.Status;
import com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.util.CoderUtils;

import org.hamcrest.Matchers;
Expand Down Expand Up @@ -194,7 +196,7 @@ private void checkWriteObjectWithValidate(
assertEquals(project, bound.getTable().getProjectId());
assertEquals(dataset, bound.getTable().getDatasetId());
assertEquals(table, bound.getTable().getTableId());
assertEquals(schema, bound.schema);
assertEquals(schema, bound.jsonSchema);
assertEquals(createDisposition, bound.createDisposition);
assertEquals(writeDisposition, bound.writeDisposition);
assertEquals(validate, bound.validate);
Expand Down Expand Up @@ -336,7 +338,11 @@ public void testCustomSink() throws Exception {
new TableRow().set("name", "c").set("number", 3)))
.setCoder(TableRowJsonCoder.of())
.apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema().setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER"))))
.withTestServices(fakeBqServices)
.withoutValidation());
p.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

Expand All @@ -57,6 +59,7 @@
/**
* Tests for {@link BigQueryServicesImpl}.
*/
@RunWith(JUnit4.class)
public class BigQueryServicesImplTest {
@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryServicesImpl.class);
Expand Down

0 comments on commit ff7706e

Please sign in to comment.