diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 75506bab08..97c45a198b 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -36,8 +36,8 @@ import com.google.api.gax.rpc.StreamController; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.oauth2.GoogleCredentials; -import com.google.cloud.bigquery.storage.v1beta2.StorageProto.*; import com.google.common.base.Preconditions; +import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.io.IOException; @@ -412,7 +412,7 @@ private static final class InflightBatch { this.inflightRequests = inflightRequests; this.offsetList = new ArrayList(inflightRequests.size()); for (AppendRequestAndFutureResponse request : inflightRequests) { - if (request.message.getOffset().getValue() > 0) { + if (request.message.hasOffset()) { offsetList.add(new Long(request.message.getOffset().getValue())); } else { offsetList.add(new Long(-1)); @@ -485,17 +485,15 @@ private void onFailure(Throwable t) { private void onSuccess(AppendRowsResponse response) { for (int i = 0; i < inflightRequests.size(); i++) { AppendRowsResponse.Builder singleResponse = response.toBuilder(); - // if (offsetList.get(i) > 0) { - // singleResponse.setOffset(offsetList.get(i)); - // } else { - // long actualOffset = response.getOffset(); - // for (int j = 0; j < i; j++) { - // actualOffset += - // - // inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount(); - // } - // singleResponse.setOffset(actualOffset); - // } + if (response.getAppendResult().hasOffset()) { + long actualOffset = response.getAppendResult().getOffset().getValue(); + for (int j = 0; j < i; j++) { + actualOffset += + inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount(); + } + singleResponse.setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(actualOffset))); + } inflightRequests.get(i).appendResult.set(singleResponse.build()); } } @@ -850,27 +848,28 @@ public void onResponse(AppendRowsResponse response) { } // Currently there is nothing retryable. If the error is already exists, then ignore it. if (response.hasError()) { - if (response.getError().getCode() != 6 /* ALREADY_EXISTS */) { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.fromCodeValue(response.getError().getCode()) - .withDescription(response.getError().getMessage())); + StatusRuntimeException exception = + new StatusRuntimeException( + Status.fromCodeValue(response.getError().getCode()) + .withDescription(response.getError().getMessage())); + inflightBatch.onFailure(exception); + } else { + if (inflightBatch.getExpectedOffset() > 0 + && (response.getAppendResult().hasOffset() + && response.getAppendResult().getOffset().getValue() + != inflightBatch.getExpectedOffset())) { + IllegalStateException exception = + new IllegalStateException( + String.format( + "The append result offset %s does not match " + "the expected offset %s.", + response.getAppendResult().getOffset().getValue(), + inflightBatch.getExpectedOffset())); inflightBatch.onFailure(exception); + abortInflightRequests(exception); + } else { + inflightBatch.onSuccess(response); } } - // Temp for Breaking Change. - // if (inflightBatch.getExpectedOffset() > 0 - // && response.getOffset() != inflightBatch.getExpectedOffset()) { - // IllegalStateException exception = - // new IllegalStateException( - // String.format( - // "The append result offset %s does not match " + "the expected offset %s.", - // response.getOffset(), inflightBatch.getExpectedOffset())); - // inflightBatch.onFailure(exception); - // abortInflightRequests(exception); - // } else { - inflightBatch.onSuccess(response); - // } } finally { streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java index 7323a39204..720f13f481 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -426,6 +426,7 @@ public void testComplicateSchemaWithPendingStream() .setParent(tableId2) .setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build()) .build()); + FinalizeWriteStreamResponse finalizeResponse = FinalizeWriteStreamResponse.getDefaultInstance(); try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { LOG.info("Sending two messages"); ApiFuture response = @@ -449,24 +450,25 @@ public void testComplicateSchemaWithPendingStream() Iterator iter = result.getValues().iterator(); assertEquals(false, iter.hasNext()); - FinalizeWriteStreamResponse finalizeResponse = + finalizeResponse = client.finalizeWriteStream( FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); ApiFuture response3 = streamWriter.append( createAppendRequestComplicateType(writeStream.getName(), new String[] {"ccc"}) - .setOffset(Int64Value.of(1L)) + .setOffset(Int64Value.of(2L)) .build()); try { assertEquals(2, response3.get().getOffset()); fail("Append to finalized stream should fail."); } catch (Exception expected) { // The exception thrown is not stable. Opened a bug to fix it. + LOG.info("Got exception: " + expected.toString()); } } // Finalize row count is not populated. - // assertEquals(1, finalizeResponse.getRowCount()); + assertEquals(2, finalizeResponse.getRowCount()); BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse = client.batchCommitWriteStreams( BatchCommitWriteStreamsRequest.newBuilder() diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java index 54ee9fbcaf..ebe712cc86 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java @@ -31,6 +31,7 @@ import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType2; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; import java.io.IOException; import java.util.*; @@ -244,14 +245,15 @@ public void testSingleAppendSimpleJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(0L, appendFuture.get().getOffset()); + assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue()); appendFuture.get(); assertEquals( 1, @@ -291,15 +293,16 @@ public void testSingleAppendMultipleSimpleJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(0L, appendFuture.get().getOffset()); + assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue()); appendFuture.get(); assertEquals( 4, @@ -332,20 +335,30 @@ public void testMultipleAppendSimpleJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); ApiFuture appendFuture; for (int i = 0; i < 4; i++) { appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals((long) i, appendFuture.get().getOffset()); + assertEquals((long) i, appendFuture.get().getAppendResult().getOffset().getValue()); appendFuture.get(); assertEquals( 1, @@ -424,15 +437,16 @@ public void testSingleAppendComplexJson() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, COMPLEX_TABLE_SCHEMA).build()) { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(0L, appendFuture.get().getOffset()); + assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue()); appendFuture.get(); assertEquals( 1, @@ -458,19 +472,23 @@ public void testAppendMultipleSchemaUpdate() throws Exception { try (JsonStreamWriter writer = getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { // Add fake resposne for FakeBigQueryWrite, first response has updated schema. - // Temp for Breaking Change. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - // .setOffset(0) + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - // .setOffset(1) + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) .setUpdatedSchema(UPDATED_TABLE_SCHEMA_2) .build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); // First append JSONObject foo = new JSONObject(); foo.put("foo", "allen"); @@ -489,9 +507,7 @@ public void testAppendMultipleSchemaUpdate() throws Exception { millis += 100; } assertTrue(writer.getDescriptor().getFields().size() == 2); - // Temp for Breaking Change. - // assertEquals(0L, appendFuture1.get().getOffset()); - appendFuture1.get(); + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); assertEquals( 1, testBigQueryWrite @@ -528,9 +544,7 @@ public void testAppendMultipleSchemaUpdate() throws Exception { millis += 100; } assertTrue(writer.getDescriptor().getFields().size() == 3); - // Temp for Breaking Change. - // assertEquals(1L, appendFuture2.get().getOffset()); - appendFuture2.get(); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); assertEquals( 1, testBigQueryWrite @@ -559,14 +573,12 @@ public void testAppendMultipleSchemaUpdate() throws Exception { ApiFuture appendFuture3 = writer.append(updatedJsonArr2, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(2L, appendFuture3.get().getOffset()); - appendFuture3.get(); + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); assertEquals( 1, testBigQueryWrite .getAppendRequests() - .get(1) + .get(2) .getProtoRows() .getRows() .getSerializedRowsCount()); @@ -583,33 +595,13 @@ public void testAppendMultipleSchemaUpdate() throws Exception { .setBaz("allen3") .build() .toByteString()); - // // Check if writer schemas were added in for both connections. + // Check if writer schemas were added in for both connections. assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); assertTrue(testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); assertTrue(testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()); } } - @Test - // This might be a bug but it is the current behavior. Investigate. - public void testAppendAlreadyExists_doesNotThrowxception() - throws DescriptorValidationException, IOException, InterruptedException, ExecutionException { - try (JsonStreamWriter writer = - getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setError(com.google.rpc.Status.newBuilder().setCode(6).build()) - .build()); - JSONObject foo = new JSONObject(); - foo.put("foo", "allen"); - JSONArray jsonArr = new JSONArray(); - jsonArr.put(foo); - ApiFuture appendFuture = - writer.append(jsonArr, -1, /* allowUnknownFields */ false); - appendFuture.get(); - } - } - @Test public void testAppendOutOfRangeException() throws Exception { try (JsonStreamWriter writer = @@ -642,9 +634,11 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception { .setError(com.google.rpc.Status.newBuilder().setCode(11).build()) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); - // Temp for Breaking Change. - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); JSONObject foo = new JSONObject(); foo.put("foo", "allen"); @@ -676,8 +670,7 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception { ApiFuture appendFuture2 = writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(0L, appendFuture2.get().getOffset()); + assertEquals(0L, appendFuture2.get().getAppendResult().getOffset().getValue()); appendFuture2.get(); assertEquals( 1, @@ -712,16 +705,22 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { .setElementCountThreshold(2L) .build()) .build()) { - // Temp for Breaking Change. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - // .setOffset(0) + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); // First append JSONObject foo = new JSONObject(); foo.put("foo", "allen"); @@ -735,11 +734,8 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { ApiFuture appendFuture3 = writer.append(jsonArr, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(0L, appendFuture1.get().getOffset()); - // assertEquals(1L, appendFuture2.get().getOffset()); - appendFuture1.get(); - appendFuture2.get(); + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); assertEquals( 2, testBigQueryWrite @@ -765,9 +761,7 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { .getSerializedRows(1), FooType.newBuilder().setFoo("allen").build().toByteString()); - // Temp for Breaking Change. - // assertEquals(2L, appendFuture3.get().getOffset()); - appendFuture3.get(); + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); assertEquals( 1, testBigQueryWrite @@ -805,9 +799,7 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception { ApiFuture appendFuture4 = writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(3L, appendFuture4.get().getOffset()); - appendFuture4.get(); + assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); assertEquals( 1, testBigQueryWrite @@ -852,10 +844,13 @@ public void testMultiThreadAppendNoSchemaUpdate() throws Exception { int thread_nums = 5; Thread[] thread_arr = new Thread[thread_nums]; for (int i = 0; i < thread_nums; i++) { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) - // i).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder() + .setOffset(Int64Value.of(i)) + .build()) + .build()); offsetSets.add((long) i); Thread t = new Thread( @@ -865,7 +860,7 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - // offsetSets.remove(response.getOffset()); + offsetSets.remove(response.getAppendResult().getOffset().getValue()); } catch (Exception e) { LOG.severe("Thread execution failed: " + e.getMessage()); @@ -879,7 +874,7 @@ public void run() { for (int i = 0; i < thread_nums; i++) { thread_arr[i].join(); } - // assertTrue(offsetSets.size() == 0); + assertTrue(offsetSets.size() == 0); for (int i = 0; i < thread_nums; i++) { assertEquals( 1, @@ -921,17 +916,22 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception { Thread[] thread_arr = new Thread[numberThreads]; for (int i = 0; i < numberThreads; i++) { if (i == 2) { - // Temp for Breaking Change. testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() - // .setOffset((long) i) + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder() + .setOffset(Int64Value.of(i)) + .build()) .setUpdatedSchema(UPDATED_TABLE_SCHEMA) .build()); } else { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse( - // AppendRowsResponse.newBuilder().setOffset((long) i).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder() + .setOffset(Int64Value.of(i)) + .build()) + .build()); } offsetSets.add((long) i); @@ -943,7 +943,7 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - // offsetSets.remove(response.getOffset()); + offsetSets.remove(response.getAppendResult().getOffset().getValue()); } catch (Exception e) { LOG.severe("Thread execution failed: " + e.getMessage()); } @@ -956,7 +956,7 @@ public void run() { for (int i = 0; i < numberThreads; i++) { thread_arr[i].join(); } - // assertTrue(offsetSets.size() == 0); + assertTrue(offsetSets.size() == 0); for (int i = 0; i < numberThreads; i++) { assertEquals( 1, @@ -991,10 +991,13 @@ public void run() { jsonArr2.put(foo); for (int i = numberThreads; i < numberThreads + 5; i++) { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset((long) - // i).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder() + .setOffset(Int64Value.of(i)) + .build()) + .build()); offsetSets.add((long) i); Thread t = new Thread( @@ -1004,7 +1007,7 @@ public void run() { ApiFuture appendFuture = writer.append(jsonArr2, -1, /* allowUnknownFields */ false); AppendRowsResponse response = appendFuture.get(); - // offsetSets.remove(response.getOffset()); + offsetSets.remove(response.getAppendResult().getOffset().getValue()); } catch (Exception e) { LOG.severe("Thread execution failed: " + e.getMessage()); } @@ -1017,7 +1020,7 @@ public void run() { for (int i = 0; i < numberThreads; i++) { thread_arr[i].join(); } - // assertTrue(offsetSets.size() == 0); + assertTrue(offsetSets.size() == 0); for (int i = 0; i < numberThreads; i++) { assertEquals( 1, diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 64b4a1d77c..6e89ef5dd8 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -37,6 +37,7 @@ import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.common.base.Strings; import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Int64Value; import com.google.protobuf.Timestamp; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -128,8 +129,7 @@ private AppendRowsRequest createAppendRequest(String[] messages, long offset) { rows.addSerializedRows(foo.toByteString()); } if (offset > 0) { - // Temp for Breaking Change. - // requestBuilder.setOffset(Int64Value.of(offset)); + requestBuilder.setOffset(Int64Value.of(offset)); } return requestBuilder .setProtoRows(dataBuilder.setRows(rows.build()).build()) @@ -168,9 +168,11 @@ public void testAppendByDuration() throws Exception { .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .build(); - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -178,9 +180,8 @@ public void testAppendByDuration() throws Exception { assertFalse(appendFuture2.isDone()); fakeExecutor.advanceTime(Duration.ofSeconds(10)); - // Temp for Breaking Change. - // assertEquals(0L, appendFuture1.get().getOffset()); - // assertEquals(1L, appendFuture2.get().getOffset()); + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); appendFuture1.get(); appendFuture2.get(); assertEquals(1, testBigQueryWrite.getAppendRequests().size()); @@ -209,31 +210,30 @@ public void testAppendByNumBatchedMessages() throws Exception { .setDelayThreshold(Duration.ofSeconds(100)) .build()) .build(); - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - // Temp for Breaking Change. - appendFuture1.get(); - appendFuture2.get(); - // assertEquals(0L, appendFuture1.get().getOffset()); - // assertEquals(1L, appendFuture2.get().getOffset()); + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); assertFalse(appendFuture3.isDone()); ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"}); - // Temp for Breaking Change. - appendFuture3.get(); - appendFuture4.get(); - // assertEquals(2L, appendFuture3.get().getOffset()); - // assertEquals(3L, appendFuture4.get().getOffset()); + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); assertEquals(2, testBigQueryWrite.getAppendRequests().size()); assertEquals( @@ -272,33 +272,36 @@ public void testAppendByNumBytes() throws Exception { .build()) .build(); - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - // Temp for Breaking Change. - // assertEquals(0L, appendFuture1.get().getOffset()); - // assertEquals(1L, appendFuture2.get().getOffset()); - appendFuture1.get(); - appendFuture2.get(); + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertFalse(appendFuture3.isDone()); // This message is big enough to trigger send on the previous message and itself. ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {Strings.repeat("A", 100)}); - // Temp for Breaking Change. - // assertEquals(2L, appendFuture3.get().getOffset()); - // assertEquals(3L, appendFuture4.get().getOffset()); - appendFuture3.get(); - appendFuture4.get(); + assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); + assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); assertEquals(3, testBigQueryWrite.getAppendRequests().size()); @@ -316,11 +319,16 @@ public void testWriteByShutdown() throws Exception { .setElementCountThreshold(10L) .build()) .build(); - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) + .build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -333,9 +341,8 @@ public void testWriteByShutdown() throws Exception { // Verify the appends completed assertTrue(appendFuture1.isDone()); assertTrue(appendFuture2.isDone()); - // Temp for Breaking Change. - // assertEquals(0L, appendFuture1.get().getOffset()); - // assertEquals(1L, appendFuture2.get().getOffset()); + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); } @Test @@ -349,35 +356,31 @@ public void testWriteMixedSizeAndDuration() throws Exception { .setDelayThreshold(Duration.ofSeconds(5)) .build()) .build()) { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - - fakeExecutor.advanceTime(Duration.ofSeconds(2)); assertFalse(appendFuture1.isDone()); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B", "C"}); // Write triggered by batch size - // Temp for Breaking Change. - // assertEquals(0L, appendFuture1.get().getOffset()); - // assertEquals(1L, appendFuture2.get().getOffset()); - appendFuture1.get(); - appendFuture2.get(); + assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"D"}); - assertFalse(appendFuture3.isDone()); - - // Write triggered by time - fakeExecutor.advanceTime(Duration.ofSeconds(5)); - - // assertEquals(2L, appendFuture3.get().getOffset()); + // Eventually will be triggered by time elapsed. + assertEquals(3L, appendFuture3.get().getAppendResult().getOffset().getValue()); assertEquals( 3, @@ -389,16 +392,16 @@ public void testWriteMixedSizeAndDuration() throws Exception { .getSerializedRowsCount()); assertEquals( true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - // assertEquals( - // 1, - // testBigQueryWrite - // .getAppendRequests() - // .get(1) // this gives IndexOutOfBounds error at the moment - // .getProtoRows() - // .getRows() - // .getSerializedRowsCount()); - // assertEquals( - // false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(1) // this gives IndexOutOfBounds error at the moment + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); } } @@ -419,11 +422,16 @@ public void testFlowControlBehaviorBlock() throws Exception { .build()) .build(); - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); @@ -444,9 +452,7 @@ public void run() { Thread.sleep(5000L); fakeExecutor.advanceTime(Duration.ofSeconds(10)); // The first requests gets back while the second one is blocked. - // Temp for Breaking Change. - // assertEquals(2L, appendFuture1.get().getOffset()); - appendFuture1.get(); + assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue()); Thread.sleep(5000L); // Wait is necessary for response to be scheduled before timer is advanced. fakeExecutor.advanceTime(Duration.ofSeconds(10)); @@ -478,9 +484,11 @@ public void testFlowControlBehaviorException() throws Exception { .getFlowControlSettings() .getMaxOutstandingElementCount() .longValue()); - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) + .build()); testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); @@ -495,9 +503,7 @@ public void testFlowControlBehaviorException() throws Exception { "java.util.concurrent.ExecutionException: The maximum number of batch elements: 1 have been reached.", e.toString()); } - // Temp for Breaking Change. - // assertEquals(1L, appendFuture1.get().getOffset()); - appendFuture1.get(); + assertEquals(1L, appendFuture1.get().getAppendResult().getOffset().getValue()); } } @@ -515,14 +521,15 @@ public void testStreamReconnectionTransient() throws Exception { StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); testBigQueryWrite.addException(transientError); - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"}); assertEquals(false, future1.isDone()); // Retry is scheduled to be 7 seconds later. - // Temp for Breaking Change. - // assertEquals(0L, future1.get().getOffset()); + assertEquals(0L, future1.get().getAppendResult().getOffset().getValue()); future1.get(); writer.close(); } @@ -591,11 +598,17 @@ public void testOffset() throws Exception { .build()) .build()) { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(10)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(13)).build()) + .build()); + AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); ApiFuture appendFuture1 = writer.append(request1); AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L); @@ -604,14 +617,10 @@ public void testOffset() throws Exception { ApiFuture appendFuture3 = writer.append(request3); AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L); ApiFuture appendFuture4 = writer.append(request4); - // assertEquals(10L, appendFuture1.get().getOffset()); - // assertEquals(11L, appendFuture2.get().getOffset()); - // assertEquals(13L, appendFuture3.get().getOffset()); - // assertEquals(15L, appendFuture4.get().getOffset()); - appendFuture1.get(); - appendFuture2.get(); - appendFuture3.get(); - appendFuture4.get(); + assertEquals(10L, appendFuture1.get().getAppendResult().getOffset().getValue()); + assertEquals(11L, appendFuture2.get().getAppendResult().getOffset().getValue()); + assertEquals(13L, appendFuture3.get().getAppendResult().getOffset().getValue()); + assertEquals(15L, appendFuture4.get().getAppendResult().getOffset().getValue()); } } @@ -625,15 +634,16 @@ public void testOffsetMismatch() throws Exception { .setElementCountThreshold(1L) .build()) .build()) { - // Temp for Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(11)).build()) + .build()); AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); ApiFuture appendFuture1 = writer.append(request1); appendFuture1.get(); - // Temp for Breaking Change. - // fail("Should throw exception"); + fail("Should throw exception"); } catch (Exception e) { assertEquals( "java.lang.IllegalStateException: The append result offset 11 does not match the expected offset 10.", @@ -909,13 +919,21 @@ public void testFlushAll() throws Exception { .build()) .build(); - // Temp Breaking Change. - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); - // testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) + .build()); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) + .build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index 9e30160147..17fce83fa4 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -180,7 +180,7 @@ public void testBatchWriteWithCommittedStream() ApiFuture response = streamWriter.append( createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); - // assertEquals(0, response.get().getOffset()); + assertEquals(0, response.get().getAppendResult().getOffset().getValue()); LOG.info("Sending two more messages"); ApiFuture response1 = @@ -189,10 +189,8 @@ public void testBatchWriteWithCommittedStream() ApiFuture response2 = streamWriter.append( createAppendRequest(writeStream.getName(), new String[] {"ddd"}).build()); - // Waiting for API breaking change to be generated in new client. - // assertEquals(1, response1.get().getOffset()); - // assertEquals(3, response2.get().getOffset()); - response2.get(); + assertEquals(1, response1.get().getAppendResult().getOffset().getValue()); + assertEquals(3, response2.get().getAppendResult().getOffset().getValue()); TableResult result = bigquery.listTableData( @@ -247,52 +245,47 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream() .build()) .build()) { LOG.info("Sending one message"); - JSONObject testStr = new JSONObject(); - testStr.put("test_str", "aaa"); - JSONObject testNumerics = new JSONObject(); - testNumerics.put("test_numerics", new JSONArray(new String[] {"123.4", "-9000000"})); - JSONObject testDateTime = new JSONObject(); - testDateTime.put("test_datetime", "2020-10-1 12:00:00"); - JSONArray row = new JSONArray(new JSONObject[] {testStr, testNumerics, testDateTime}); + JSONObject row1 = new JSONObject(); + row1.put("test_str", "aaa"); + row1.put("test_numerics", new JSONArray(new String[] {"123.4", "-9000000"})); + row1.put("test_datetime", "2020-10-1 12:00:00"); + JSONArray jsonArr1 = new JSONArray(new JSONObject[] {row1}); - ApiFuture response = - jsonStreamWriter.append(row, -1, /* allowUnknownFields */ false); + ApiFuture response1 = + jsonStreamWriter.append(jsonArr1, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - response.get(); - // assertEquals(0, response.get().getOffset()); + assertEquals(0, response1.get().getAppendResult().getOffset().getValue()); - LOG.info("Sending two more messages"); - JSONObject row1 = new JSONObject(); - row1.put("test_str", "bbb"); JSONObject row2 = new JSONObject(); - row2.put("test_str", "ccc"); - JSONArray jsonArr1 = new JSONArray(); - jsonArr1.put(row1); - jsonArr1.put(row2); - + row1.put("test_str", "bbb"); JSONObject row3 = new JSONObject(); - row3.put("test_str", "ddd"); + row2.put("test_str", "ccc"); JSONArray jsonArr2 = new JSONArray(); - jsonArr2.put(row3); + jsonArr2.put(row1); + jsonArr2.put(row2); - ApiFuture response1 = - jsonStreamWriter.append(jsonArr1, -1, /* allowUnknownFields */ false); + JSONObject row4 = new JSONObject(); + row4.put("test_str", "ddd"); + JSONArray jsonArr3 = new JSONArray(); + jsonArr3.put(row4); + + LOG.info("Sending two more messages"); ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(1, response1.get().getOffset()); - // assertEquals(3, response2.get().getOffset()); - response1.get(); - response2.get(); + LOG.info("Sending one more message"); + ApiFuture response3 = + jsonStreamWriter.append(jsonArr3, -1, /* allowUnknownFields */ false); + assertEquals(1, response2.get().getAppendResult().getOffset().getValue()); + assertEquals(3, response3.get().getAppendResult().getOffset().getValue()); TableResult result = bigquery.listTableData( tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); Iterator iter = result.getValues().iterator(); - assertEquals("aaa", iter.next().get(0).getStringValue()); - assertEquals("-9000000", iter.next().get(1).getRepeatedValue().get(1).getStringValue()); - assertEquals("2020-10-01T12:00:00", iter.next().get(2).getStringValue()); + FieldValueList currentRow = iter.next(); + assertEquals("aaa", currentRow.get(0).getStringValue()); + assertEquals("-9000000", currentRow.get(1).getRepeatedValue().get(1).getStringValue()); + assertEquals("2020-10-01T12:00:00", currentRow.get(2).getStringValue()); assertEquals("bbb", iter.next().get(0).getStringValue()); assertEquals("ccc", iter.next().get(0).getStringValue()); assertEquals("ddd", iter.next().get(0).getStringValue()); @@ -340,9 +333,7 @@ public void testJsonStreamWriterSchemaUpdate() ApiFuture response = jsonStreamWriter.append(jsonArr, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(0, response.get().getOffset()); - response.get(); + assertEquals(0, response.get().getAppendResult().getOffset().getValue()); // 2). Schema update and wait until querying it returns a new schema. try { com.google.cloud.bigquery.Table table = bigquery.getTable(DATASET, tableName); @@ -386,9 +377,7 @@ public void testJsonStreamWriterSchemaUpdate() for (int i = 1; i < 100; i++) { ApiFuture response2 = jsonStreamWriter.append(jsonArr2, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(i, response2.get().getOffset()); - response2.get(); + assertEquals(i, response2.get().getAppendResult().getOffset().getValue()); if (response2.get().hasUpdatedSchema()) { next = i; break; @@ -416,8 +405,7 @@ public void testJsonStreamWriterSchemaUpdate() for (int i = 0; i < 10; i++) { ApiFuture response3 = jsonStreamWriter.append(updatedJsonArr, -1, /* allowUnknownFields */ false); - // Temp for Breaking Change. - // assertEquals(next + 1 + i, response3.get().getOffset()); + assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue()); response3.get(); } @@ -447,6 +435,7 @@ public void testComplicateSchemaWithPendingStream() .setParent(tableId2) .setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build()) .build()); + FinalizeWriteStreamResponse finalizeResponse = FinalizeWriteStreamResponse.getDefaultInstance(); try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { LOG.info("Sending two messages"); ApiFuture response = @@ -454,16 +443,14 @@ public void testComplicateSchemaWithPendingStream() createAppendRequestComplicateType(writeStream.getName(), new String[] {"aaa"}) .setOffset(Int64Value.of(0L)) .build()); - // assertEquals(0, response.get().getOffset()); + assertEquals(0, response.get().getAppendResult().getOffset().getValue()); ApiFuture response2 = streamWriter.append( createAppendRequestComplicateType(writeStream.getName(), new String[] {"bbb"}) .setOffset(Int64Value.of(1L)) .build()); - // Waiting for API breaking change to be generated in new client. - // assertEquals(1, response2.get().getOffset()); - response2.get(); + assertEquals(1, response2.get().getAppendResult().getOffset().getValue()); // Nothing showed up since rows are not committed. TableResult result = @@ -472,26 +459,24 @@ public void testComplicateSchemaWithPendingStream() Iterator iter = result.getValues().iterator(); assertEquals(false, iter.hasNext()); - FinalizeWriteStreamResponse finalizeResponse = + finalizeResponse = client.finalizeWriteStream( FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); ApiFuture response3 = streamWriter.append( createAppendRequestComplicateType(writeStream.getName(), new String[] {"ccc"}) - .setOffset(Int64Value.of(1L)) + .setOffset(Int64Value.of(2L)) .build()); try { - // Temp for Breaking Change. - // assertEquals(2, response3.get().getOffset()); response3.get(); - // fail("Append to finalized stream should fail."); + Assert.fail("Append to finalized stream should fail."); } catch (Exception expected) { - // The exception thrown is not stable. Opened a bug to fix it. + LOG.info("Got exception: " + expected.toString()); } } // Finalize row count is not populated. - // assertEquals(1, finalizeResponse.getRowCount()); + assertEquals(2, finalizeResponse.getRowCount()); BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse = client.batchCommitWriteStreams( BatchCommitWriteStreamsRequest.newBuilder() @@ -532,9 +517,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio ApiFuture response = streamWriter.append( createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); - // Temp for Breaking Change. - // assertEquals(0L, response.get().getOffset()); - response.get(); + assertEquals(0L, response.get().getAppendResult().getOffset().getValue()); // Send in a bogus stream name should cause in connection error. ApiFuture response2 = streamWriter.append( @@ -552,9 +535,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio ApiFuture response3 = streamWriter.append( createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); - // Waiting for API breaking change to be generated in new client. - // assertEquals(1L, response3.get().getOffset()); - response3.get(); + assertEquals(1L, response3.get().getAppendResult().getOffset().getValue()); } finally { } } @@ -574,9 +555,7 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec createAppendRequest(writeStream.getName(), new String[] {"aaa"}) .setOffset(Int64Value.of(0L)) .build()); - // Temp for Breaking Change. - // assertEquals(0L, response.get().getOffset()); - response.get(); + assertEquals(0L, response.get().getAppendResult().getOffset().getValue()); } try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { @@ -587,9 +566,7 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec createAppendRequest(writeStream.getName(), new String[] {"bbb"}) .setOffset(Int64Value.of(1L)) .build()); - // Temp for Breaking Change. - // assertEquals(1L, response.get().getOffset()); - response.get(); + assertEquals(1L, response.get().getAppendResult().getOffset().getValue()); } } }