diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index a7e4a0058c..2d4733c9fe 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -20,6 +20,7 @@ import com.google.api.gax.batching.FlowController; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback; @@ -388,6 +389,11 @@ ApiFuture append(StreamWriter streamWriter, ProtoRows rows, requestBuilder.setWriteStream(streamWriter.getStreamName()); requestBuilder.putAllMissingValueInterpretations( streamWriter.getMissingValueInterpretationMap()); + if (streamWriter.getDefaultValueInterpretation() + != MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED) { + requestBuilder.setDefaultMissingValueInterpretation( + streamWriter.getDefaultValueInterpretation()); + } return appendInternal(streamWriter, requestBuilder.build()); } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index 4f5b2c2c38..6c7a8b89df 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -346,6 +346,22 @@ public Builder setCompressorName(String compressorName) { return this; } + /** + * Sets the default missing value interpretation value if the column is not presented in the + * missing_value_interpretations map. + * + *

If this value is set to `DEFAULT_VALUE`, we will always populate default value if the + * field is missing from json and default value is defined in the column. + * + *

If this value is set to `NULL_VALUE`, we will always not populate default value. + */ + public Builder setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { + this.schemaAwareStreamWriterBuilder.setDefaultMissingValueInterpretation( + defaultMissingValueInterpretation); + return this; + } + /** * Builds JsonStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index db617d2013..8082ae0340 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -20,6 +20,7 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; import com.google.cloud.bigquery.storage.v1.Exceptions.RowIndexToErrorException; import com.google.common.base.Preconditions; @@ -97,6 +98,8 @@ private SchemaAwareStreamWriter(Builder builder) builder.compressorName); streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool); streamWriterBuilder.setLocation(builder.location); + streamWriterBuilder.setDefaultMissingValueInterpretation( + builder.defaultMissingValueInterpretation); this.streamWriter = streamWriterBuilder.build(); this.streamName = builder.streamName; this.tableSchema = builder.tableSchema; @@ -433,6 +436,9 @@ public static final class Builder { private String location; private String compressorName; + private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = + MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + private static final String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; private static final String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)"; @@ -627,6 +633,16 @@ public Builder setCompressorName(String compressorName) { return this; } + /** + * Sets the default missing value interpretation value if the column is not presented in the + * missing_value_interpretations map. + */ + public Builder setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { + this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; + return this; + } + /** * Builds SchemaAwareStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 10aeee8965..510f11ceca 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -22,6 +22,7 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auto.value.AutoOneOf; import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse; import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp; import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; @@ -90,6 +91,13 @@ public class StreamWriter implements AutoCloseable { */ private final String writerId = UUID.randomUUID().toString(); + /** + * The default missing value interpretation if the column has default value defined but not + * presented in the missing value map. + */ + private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = + MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + /** * Stream can access a single connection or a pool of connection depending on whether multiplexing * is enabled. @@ -201,6 +209,7 @@ public static SingleConnectionOrConnectionPool ofConnectionPool( private StreamWriter(Builder builder) throws IOException { this.streamName = builder.streamName; this.writerSchema = builder.writerSchema; + this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation; BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder); if (!builder.enableConnectionPool) { this.location = builder.location; @@ -312,6 +321,10 @@ static boolean isDefaultStream(String streamName) { return streamMatcher.find(); } + AppendRowsRequest.MissingValueInterpretation getDefaultValueInterpretation() { + return defaultMissingValueInterpretation; + } + static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException { BigQueryWriteSettings.Builder settingsBuilder = null; if (builder.client != null) { @@ -602,6 +615,10 @@ public static final class Builder { private String compressorName = null; + // Default missing value interpretation value. + private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation = + MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED; + private Builder(String streamName) { this.streamName = Preconditions.checkNotNull(streamName); this.client = null; @@ -729,6 +746,16 @@ public Builder setCompressorName(String compressorName) { return this; } + /** + * Sets the default missing value interpretation value if the column is not presented in the + * missing_value_interpretations map. + */ + public Builder setDefaultMissingValueInterpretation( + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { + this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; + return this; + } + /** Builds the {@code StreamWriterV2}. */ public StreamWriter build() throws IOException { return new StreamWriter(this); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index da73d60499..32e3c265e2 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -33,6 +33,7 @@ import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.cloud.bigquery.storage.test.Test.RepetitionType; import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings; import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode; @@ -45,8 +46,10 @@ import java.io.IOException; import java.math.BigDecimal; import java.math.RoundingMode; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -64,6 +67,7 @@ @RunWith(JUnit4.class) public class JsonStreamWriterTest { + private static final int NUMERIC_SCALE = 9; private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default"; private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default"; @@ -514,6 +518,9 @@ public void testSingleAppendMultipleSimpleJson() throws Exception { .getSerializedRows(i), expectedProto.toByteString()); } + assertEquals( + testBigQueryWrite.getAppendRequests().get(0).getDefaultMissingValueInterpretation(), + MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED); } } @@ -1015,6 +1022,79 @@ public void testSchemaUpdateInMultiplexing_singleConnection() throws Exception { writer2.close(); } + @Test + public void testMissingValueInterpretation_multiplexingCase() throws Exception { + // Set min connection count to be 1 to force sharing connection. + ConnectionWorkerPool.setOptions( + Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); + testBigQueryWrite.addResponse( + WriteStream.newBuilder() + .setName(TEST_STREAM) + .setTableSchema(TABLE_SCHEMA) + .setLocation("us") + .build()); + // The following two writers have different stream name and schema, but will share the same + // connection . + JsonStreamWriter writer1 = + getTestJsonStreamWriterBuilder(TEST_STREAM) + .setEnableConnectionPool(true) + .setLocation("us") + .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE) + .build(); + JsonStreamWriter writer2 = + getTestJsonStreamWriterBuilder(TEST_STREAM_2) + .setEnableConnectionPool(true) + .setLocation("us") + .setDefaultMissingValueInterpretation(MissingValueInterpretation.NULL_VALUE) + .build(); + + long appendCountPerStream = 5; + for (int i = 0; i < appendCountPerStream * 4; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + JSONObject foo = new JSONObject(); + foo.put("foo", "aaa"); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + List> futures = new ArrayList<>(); + // In total insert append `appendCountPerStream` * 4 requests. + // We insert using the pattern of + // jsonStreamWriter1, jsonStreamWriter1, jsonStreamWriter2, jsonStreamWriter2 + for (int i = 0; i < appendCountPerStream; i++) { + ApiFuture appendFuture1 = writer1.append(jsonArr); + ApiFuture appendFuture2 = writer1.append(jsonArr); + ApiFuture appendFuture3 = writer2.append(jsonArr); + ApiFuture appendFuture4 = writer2.append(jsonArr); + appendFuture1.get(); + appendFuture2.get(); + appendFuture3.get(); + appendFuture4.get(); + } + + for (int i = 0; i < appendCountPerStream * 4; i++) { + AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i); + if (i % 4 <= 1) { + assertEquals( + appendRowsRequest.getDefaultMissingValueInterpretation(), + MissingValueInterpretation.DEFAULT_VALUE); + } else { + assertEquals( + appendRowsRequest.getDefaultMissingValueInterpretation(), + MissingValueInterpretation.NULL_VALUE); + } + } + + writer1.close(); + writer2.close(); + } + @Test public void testSchemaUpdateInMultiplexing_multipleWriterForSameStreamName() throws Exception { // Set min connection count to be 1 to force sharing connection. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 54cca9ad03..c07b86e17d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -38,6 +38,7 @@ import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.UnknownException; import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation; import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings; import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException; import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode; @@ -849,6 +850,73 @@ public void testProtoSchemaPiping_multiplexingCase() throws Exception { appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance()); assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2); } + assertEquals( + appendRowsRequest.getDefaultMissingValueInterpretation(), + MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED); + } + + writer1.close(); + writer2.close(); + } + + @Test + public void testDefaultValueInterpretation_multiplexingCase() throws Exception { + // Use the shared connection mode. + ConnectionWorkerPool.setOptions( + Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build()); + ProtoSchema schema1 = createProtoSchema("Schema1"); + ProtoSchema schema2 = createProtoSchema("Schema2"); + StreamWriter writer1 = + StreamWriter.newBuilder(TEST_STREAM_1, client) + .setWriterSchema(schema1) + .setLocation("US") + .setEnableConnectionPool(true) + .setMaxInflightRequests(1) + .setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE) + .build(); + StreamWriter writer2 = + StreamWriter.newBuilder(TEST_STREAM_2, client) + .setWriterSchema(schema2) + .setMaxInflightRequests(1) + .setEnableConnectionPool(true) + .setLocation("US") + .setDefaultMissingValueInterpretation(MissingValueInterpretation.NULL_VALUE) + .build(); + + long appendCountPerStream = 5; + for (int i = 0; i < appendCountPerStream * 4; i++) { + testBigQueryWrite.addResponse(createAppendResponse(i)); + } + + // In total insert append `appendCountPerStream` * 4 requests. + // We insert using the pattern of streamWriter1, streamWriter1, streamWriter2, streamWriter2 + for (int i = 0; i < appendCountPerStream; i++) { + ApiFuture appendFuture1 = + writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4); + ApiFuture appendFuture2 = + writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 1); + ApiFuture appendFuture3 = + writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 2); + ApiFuture appendFuture4 = + writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 3); + appendFuture1.get(); + appendFuture2.get(); + appendFuture3.get(); + appendFuture4.get(); + } + + for (int i = 0; i < appendCountPerStream * 4; i++) { + AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i); + assertEquals(i, appendRowsRequest.getOffset().getValue()); + if (i % 4 <= 1) { + assertEquals( + appendRowsRequest.getDefaultMissingValueInterpretation(), + MissingValueInterpretation.DEFAULT_VALUE); + } else { + assertEquals( + appendRowsRequest.getDefaultMissingValueInterpretation(), + MissingValueInterpretation.NULL_VALUE); + } } writer1.close();