Skip to content

Commit

Permalink
fix: It seems GAPIC is not generated expected headers for bidi stream…
Browse files Browse the repository at this point in the history
…ing client lib, apply a temp fix to unblock customers (#1017)

* repro

* .

* .

* .

* .

* .

* .

* .

* .

* format

* .
  • Loading branch information
yirutang authored Apr 20, 2021
1 parent 827764e commit 9c1ed55
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
Expand Down Expand Up @@ -160,6 +161,10 @@ private StreamWriterV2(Builder builder) throws IOException {
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
// (b/185842996): Temporily fix this by explicitly providing the header.
.setHeaderProvider(
FixedHeaderProvider.create(
"x-goog-request-params", "write_stream=" + this.streamName))
.build();
this.client = BigQueryWriteClient.create(stubSettings);
this.ownsBigQueryWriteClient = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ public class ITBigQueryWriteManualClientTest {
private static final Logger LOG =
Logger.getLogger(ITBigQueryWriteManualClientTest.class.getName());
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String TABLE2 = "complicatedtable";
private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";

private static BigQueryWriteClient client;
private static TableInfo tableInfo;
private static TableInfo tableInfo2;
private static TableInfo tableInfoEU;
private static String tableId;
private static String tableId2;
private static String tableIdEU;
private static BigQuery bigquery;

@BeforeClass
Expand Down Expand Up @@ -110,6 +113,25 @@ public static void beforeClass() throws IOException {
String.format(
"projects/%s/datasets/%s/tables/%s",
ServiceOptions.getDefaultProjectId(), DATASET, TABLE2);
DatasetInfo datasetInfoEU =
DatasetInfo.newBuilder(/* datasetId = */ DATASET_EU)
.setLocation("EU")
.setDescription(DESCRIPTION)
.build();
bigquery.create(datasetInfoEU);
tableInfoEU =
TableInfo.newBuilder(
TableId.of(DATASET_EU, TABLE),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING)
.build())))
.build();
tableIdEU =
String.format(
"projects/%s/datasets/%s/tables/%s",
ServiceOptions.getDefaultProjectId(), DATASET_EU, TABLE);
bigquery.create(tableInfoEU);
}

@AfterClass
Expand Down Expand Up @@ -206,6 +228,54 @@ public void testBatchWriteWithCommittedStream()
}
}

ProtoRows CreateProtoRows(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
FooType foo = FooType.newBuilder().setFoo(message).build();
rows.addSerializedRows(foo.toByteString());
}
return rows.build();
}

@Test
public void testBatchWriteWithCommittedStreamEU()
throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableIdEU)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
StreamWriterV2 streamWriter =
StreamWriterV2.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.build();
LOG.info("Sending one message");

ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 0);
assertEquals(0, response.get().getAppendResult().getOffset().getValue());

LOG.info("Sending two more messages");
ApiFuture<AppendRowsResponse> response1 =
streamWriter.append(CreateProtoRows(new String[] {"bbb", "ccc"}), 1);
ApiFuture<AppendRowsResponse> response2 =
streamWriter.append(CreateProtoRows(new String[] {"ddd"}), 3);
assertEquals(1, response1.get().getAppendResult().getOffset().getValue());
assertEquals(3, response2.get().getAppendResult().getOffset().getValue());

TableResult result =
bigquery.listTableData(
tableInfoEU.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals("aaa", iter.next().get(0).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals("ddd", iter.next().get(0).getStringValue());
assertEquals(false, iter.hasNext());
}

@Test
public void testJsonStreamWriterCommittedStream()
throws IOException, InterruptedException, ExecutionException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.ReadSession;
import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
import java.util.regex.Pattern;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -43,6 +46,9 @@ public class ResourceHeaderTest {
private static final String TEST_TABLE_REFERENCE =
"projects/project/datasets/dataset/tables/table";

private static final String WRITE_STREAM_NAME =
"projects/project/datasets/dataset/tables/table/streams/stream";

private static final String TEST_STREAM_NAME = "streamName";

private static final String NAME = "resource-header-test:123";
Expand All @@ -52,6 +58,20 @@ public class ResourceHeaderTest {
private static final Pattern READ_SESSION_NAME_PATTERN =
Pattern.compile(
".*" + "read_session\\.table=projects/project/datasets/dataset/tables/table" + ".*");

private static final Pattern PARENT_PATTERN =
Pattern.compile(".*" + "parent=projects/project/datasets/dataset/tables/table" + ".*");

private static final Pattern NAME_PATTERN =
Pattern.compile(
".*" + "name=projects/project/datasets/dataset/tables/table/streams/stream" + ".*");

private static final Pattern WRITE_STREAM_PATTERN =
Pattern.compile(
".*"
+ "write_stream=projects/project/datasets/dataset/tables/table/streams/stream"
+ ".*");

private static final Pattern READ_STREAM_PATTERN =
Pattern.compile(".*" + "read_stream=streamName" + ".*");
private static final Pattern STREAM_NAME_PATTERN =
Expand All @@ -64,7 +84,9 @@ public class ResourceHeaderTest {
private static InProcessServer<?> server;

private LocalChannelProvider channelProvider;
private LocalChannelProvider channelProvider2;
private BigQueryReadClient client;
private BigQueryWriteClient writeClient;

@BeforeClass
public static void setUpClass() throws Exception {
Expand All @@ -81,6 +103,12 @@ public void setUp() throws Exception {
.setHeaderProvider(FixedHeaderProvider.create(TEST_HEADER_NAME, TEST_HEADER_VALUE))
.setTransportChannelProvider(channelProvider);
client = BigQueryReadClient.create(settingsBuilder.build());
channelProvider2 = LocalChannelProvider.create(NAME);
BigQueryWriteSettings.Builder writeSettingsBuilder =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(channelProvider2);
writeClient = BigQueryWriteClient.create(writeSettingsBuilder.build());
}

@After
Expand Down Expand Up @@ -129,6 +157,63 @@ public void splitReadStreamTest() {
verifyHeaderSent(STREAM_NAME_PATTERN);
}

@Test
public void createWriteStreamTest() {
try {
writeClient.createWriteStream(
"projects/project/datasets/dataset/tables/table",
WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build());
} catch (UnimplementedException e) {
// Ignore the error: none of the methods are actually implemented.
}
boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, PARENT_PATTERN);
assertWithMessage("Generated header was sent").that(headerSent).isTrue();
}

@Test
public void getWriteStreamTest() {
try {
writeClient.getWriteStream(WRITE_STREAM_NAME);
} catch (UnimplementedException e) {
// Ignore the error: none of the methods are actually implemented.
}
boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, NAME_PATTERN);
assertWithMessage("Generated header was sent").that(headerSent).isTrue();
}

// Following tests will work after b/185842996 is fixed.
// @Test
// public void appendRowsTest() {
// try {
// AppendRowsRequest req =
// AppendRowsRequest.newBuilder().setWriteStream(WRITE_STREAM_NAME).build();
// BidiStream<AppendRowsRequest, AppendRowsResponse> bidiStream =
// writeClient.appendRowsCallable().call();
// bidiStream.send(req);
// } catch (UnimplementedException e) {
// // Ignore the error: none of the methods are actually implemented.
// }
// boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN);
// assertWithMessage("Generated header was sent").that(headerSent).isTrue();
// }
//
// @Test
// public void appendRowsManualTest() {
// try {
// StreamWriterV2 streamWriter =
// StreamWriterV2.newBuilder(WRITE_STREAM_NAME, writeClient)
// .setWriterSchema(ProtoSchema.newBuilder().build())
// .build();
// streamWriter.append(ProtoRows.newBuilder().build(), 1);
// } catch (UnimplementedException e) {
// // Ignore the error: none of the methods are actually implemented.
// } catch (IOException e) {
// // Ignore the error: none of the methods are actually implemented.
// }
// boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN);
// assertWithMessage("Generated header was sent").that(headerSent).isTrue();
// }

private void verifyHeaderSent(Pattern... patterns) {
for (Pattern pattern : patterns) {
boolean headerSent = channelProvider.isHeaderSent(HEADER_NAME, pattern);
Expand Down

0 comments on commit 9c1ed55

Please sign in to comment.