diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index 22b90538a4639..6a3bc594149f1 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -741,10 +741,10 @@ public static Bound withoutValidation() { public static class Bound extends PTransform, PDone> { @Nullable final String jsonTableRef; - final SerializableFunction tableRefFunction; + @Nullable final SerializableFunction tableRefFunction; // Table schema. The schema is required only if the table does not exist. - @Nullable final TableSchema schema; + @Nullable final String jsonSchema; // Options for creating the table. Valid values are CREATE_IF_NEEDED and // CREATE_NEVER. @@ -785,14 +785,15 @@ public Bound() { WriteDisposition.WRITE_EMPTY, true, null); } - private Bound(String name, String jsonTableRef, - SerializableFunction tableRefFunction, TableSchema schema, + private Bound(String name, @Nullable String jsonTableRef, + @Nullable SerializableFunction tableRefFunction, + @Nullable String jsonSchema, CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate, BigQueryServices testBigQueryServices) { super(name); this.jsonTableRef = jsonTableRef; this.tableRefFunction = tableRefFunction; - this.schema = schema; + this.jsonSchema = jsonSchema; this.createDisposition = createDisposition; this.writeDisposition = writeDisposition; this.validate = validate; @@ -805,7 +806,7 @@ private Bound(String name, String jsonTableRef, *

Does not modify this object. */ public Bound named(String name) { - return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition, + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, writeDisposition, validate, testBigQueryServices); } @@ -825,7 +826,7 @@ public Bound to(String tableSpec) { *

Does not modify this object. */ public Bound to(TableReference table) { - return new Bound(name, toJsonString(table), tableRefFunction, schema, createDisposition, + return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition, writeDisposition, validate, testBigQueryServices); } @@ -854,7 +855,7 @@ public Bound to( */ public Bound toTableReference( SerializableFunction tableRefFunction) { - return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition, + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, writeDisposition, validate, testBigQueryServices); } @@ -865,8 +866,8 @@ public Bound toTableReference( *

Does not modify this object. */ public Bound withSchema(TableSchema schema) { - return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition, - writeDisposition, validate, testBigQueryServices); + return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema), + createDisposition, writeDisposition, validate, testBigQueryServices); } /** @@ -875,7 +876,7 @@ public Bound withSchema(TableSchema schema) { *

Does not modify this object. */ public Bound withCreateDisposition(CreateDisposition createDisposition) { - return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition, + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, writeDisposition, validate, testBigQueryServices); } @@ -885,7 +886,7 @@ public Bound withCreateDisposition(CreateDisposition createDisposition) { *

Does not modify this object. */ public Bound withWriteDisposition(WriteDisposition writeDisposition) { - return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition, + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, writeDisposition, validate, testBigQueryServices); } @@ -895,13 +896,13 @@ public Bound withWriteDisposition(WriteDisposition writeDisposition) { *

Does not modify this object. */ public Bound withoutValidation() { - return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition, + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, writeDisposition, false, testBigQueryServices); } @VisibleForTesting Bound withTestServices(BigQueryServices testServices) { - return new Bound(name, jsonTableRef, tableRefFunction, schema, createDisposition, + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, writeDisposition, validate, testServices); } @@ -942,7 +943,7 @@ public PDone apply(PCollection input) { + "transform"); } - if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && schema == null) { + if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && jsonSchema == null) { throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, " + "however no schema was provided."); } @@ -984,7 +985,7 @@ public PDone apply(PCollection input) { + "supported for unbounded PCollections or when using tablespec functions."); } - return input.apply(new StreamWithDeDup(table, tableRefFunction, schema)); + return input.apply(new StreamWithDeDup(table, tableRefFunction, getSchema())); } String tempLocation = options.getTempLocation(); @@ -1001,13 +1002,11 @@ public PDone apply(PCollection input) { } String jobIdToken = UUID.randomUUID().toString(); String tempFilePrefix = tempLocation + "/BigQuerySinkTemp/" + jobIdToken; - String jsonTable = toJsonString(checkNotNull(table, "table")); - String jsonSchema = toJsonString(schema); BigQueryServices bqServices = getBigQueryServices(); return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to( new BigQuerySink( jobIdToken, - jsonTable, + jsonTableRef, jsonSchema, getWriteDisposition(), getCreateDisposition(), @@ -1033,7 +1032,7 @@ public WriteDisposition getWriteDisposition() { /** Returns the table schema. */ public TableSchema getSchema() { - return schema; + return fromJsonString(jsonSchema, TableSchema.class); } /** Returns the table reference, or {@code null} if a . */ diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java index d0268b235e566..25eb6aa9d788d 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -190,7 +190,7 @@ private boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws Interrupted try { return BackOffUtils.next(sleeper, backoff); } catch (IOException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index 9937a21413916..691a33c9bb05b 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -21,6 +21,7 @@ import com.google.api.client.util.Data; import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; @@ -37,6 +38,7 @@ import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.util.BigQueryServices; import com.google.cloud.dataflow.sdk.util.BigQueryServices.Status; +import com.google.common.collect.ImmutableList; import com.google.cloud.dataflow.sdk.util.CoderUtils; import org.hamcrest.Matchers; @@ -194,7 +196,7 @@ private void checkWriteObjectWithValidate( assertEquals(project, bound.getTable().getProjectId()); assertEquals(dataset, bound.getTable().getDatasetId()); assertEquals(table, bound.getTable().getTableId()); - assertEquals(schema, bound.schema); + assertEquals(schema, bound.jsonSchema); assertEquals(createDisposition, bound.createDisposition); assertEquals(writeDisposition, bound.writeDisposition); assertEquals(validate, bound.validate); @@ -336,7 +338,11 @@ public void testCustomSink() throws Exception { new TableRow().set("name", "c").set("number", 3))) .setCoder(TableRowJsonCoder.of()) .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id") - .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) .withTestServices(fakeBqServices) .withoutValidation()); p.run(); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java index ab517a450c075..200e8a72908fd 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java @@ -47,6 +47,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -57,6 +59,7 @@ /** * Tests for {@link BigQueryServicesImpl}. */ +@RunWith(JUnit4.class) public class BigQueryServicesImplTest { @Rule public ExpectedException thrown = ExpectedException.none(); @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryServicesImpl.class);