Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: It seems GAPIC is not generated expected headers for bidi streaming client lib, apply a temp fix to unblock customers #1017

Merged
merged 11 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest.ProtoData;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
Expand Down Expand Up @@ -160,6 +161,10 @@ private StreamWriterV2(Builder builder) throws IOException {
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
// (b/185842996): Temporily fix this by explicitly providing the header.
.setHeaderProvider(
FixedHeaderProvider.create(
"x-goog-request-params", "write_stream=" + this.streamName))
.build();
this.client = BigQueryWriteClient.create(stubSettings);
this.ownsBigQueryWriteClient = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,18 @@ public class ITBigQueryWriteManualClientTest {
private static final Logger LOG =
Logger.getLogger(ITBigQueryWriteManualClientTest.class.getName());
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
private static final String DATASET_EU = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String TABLE2 = "complicatedtable";
private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";

private static BigQueryWriteClient client;
private static TableInfo tableInfo;
private static TableInfo tableInfo2;
private static TableInfo tableInfoEU;
private static String tableId;
private static String tableId2;
private static String tableIdEU;
private static BigQuery bigquery;

@BeforeClass
Expand Down Expand Up @@ -110,6 +113,25 @@ public static void beforeClass() throws IOException {
String.format(
"projects/%s/datasets/%s/tables/%s",
ServiceOptions.getDefaultProjectId(), DATASET, TABLE2);
DatasetInfo datasetInfoEU =
DatasetInfo.newBuilder(/* datasetId = */ DATASET_EU)
.setLocation("EU")
.setDescription(DESCRIPTION)
.build();
bigquery.create(datasetInfoEU);
tableInfoEU =
TableInfo.newBuilder(
TableId.of(DATASET_EU, TABLE),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING)
.build())))
.build();
tableIdEU =
String.format(
"projects/%s/datasets/%s/tables/%s",
ServiceOptions.getDefaultProjectId(), DATASET_EU, TABLE);
bigquery.create(tableInfoEU);
}

@AfterClass
Expand Down Expand Up @@ -206,6 +228,54 @@ public void testBatchWriteWithCommittedStream()
}
}

ProtoRows CreateProtoRows(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
FooType foo = FooType.newBuilder().setFoo(message).build();
rows.addSerializedRows(foo.toByteString());
}
return rows.build();
}

@Test
public void testBatchWriteWithCommittedStreamEU()
throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableIdEU)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
StreamWriterV2 streamWriter =
StreamWriterV2.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor()))
.build();
LOG.info("Sending one message");

ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 0);
assertEquals(0, response.get().getAppendResult().getOffset().getValue());

LOG.info("Sending two more messages");
ApiFuture<AppendRowsResponse> response1 =
streamWriter.append(CreateProtoRows(new String[] {"bbb", "ccc"}), 1);
ApiFuture<AppendRowsResponse> response2 =
streamWriter.append(CreateProtoRows(new String[] {"ddd"}), 3);
assertEquals(1, response1.get().getAppendResult().getOffset().getValue());
assertEquals(3, response2.get().getAppendResult().getOffset().getValue());

TableResult result =
bigquery.listTableData(
tableInfoEU.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals("aaa", iter.next().get(0).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals("ddd", iter.next().get(0).getStringValue());
assertEquals(false, iter.hasNext());
}

@Test
public void testJsonStreamWriterCommittedStream()
throws IOException, InterruptedException, ExecutionException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadGrpc.BigQueryReadImplBase;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.ReadSession;
import com.google.cloud.bigquery.storage.v1beta2.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1beta2.WriteStream;
import java.util.regex.Pattern;
import org.junit.After;
import org.junit.AfterClass;
Expand All @@ -43,6 +46,9 @@ public class ResourceHeaderTest {
private static final String TEST_TABLE_REFERENCE =
"projects/project/datasets/dataset/tables/table";

private static final String WRITE_STREAM_NAME =
"projects/project/datasets/dataset/tables/table/streams/stream";

private static final String TEST_STREAM_NAME = "streamName";

private static final String NAME = "resource-header-test:123";
Expand All @@ -52,6 +58,20 @@ public class ResourceHeaderTest {
private static final Pattern READ_SESSION_NAME_PATTERN =
Pattern.compile(
".*" + "read_session\\.table=projects/project/datasets/dataset/tables/table" + ".*");

private static final Pattern PARENT_PATTERN =
Pattern.compile(".*" + "parent=projects/project/datasets/dataset/tables/table" + ".*");

private static final Pattern NAME_PATTERN =
Pattern.compile(
".*" + "name=projects/project/datasets/dataset/tables/table/streams/stream" + ".*");

private static final Pattern WRITE_STREAM_PATTERN =
Pattern.compile(
".*"
+ "write_stream=projects/project/datasets/dataset/tables/table/streams/stream"
+ ".*");

private static final Pattern READ_STREAM_PATTERN =
Pattern.compile(".*" + "read_stream=streamName" + ".*");
private static final Pattern STREAM_NAME_PATTERN =
Expand All @@ -64,7 +84,9 @@ public class ResourceHeaderTest {
private static InProcessServer<?> server;

private LocalChannelProvider channelProvider;
private LocalChannelProvider channelProvider2;
private BigQueryReadClient client;
private BigQueryWriteClient writeClient;

@BeforeClass
public static void setUpClass() throws Exception {
Expand All @@ -81,6 +103,12 @@ public void setUp() throws Exception {
.setHeaderProvider(FixedHeaderProvider.create(TEST_HEADER_NAME, TEST_HEADER_VALUE))
.setTransportChannelProvider(channelProvider);
client = BigQueryReadClient.create(settingsBuilder.build());
channelProvider2 = LocalChannelProvider.create(NAME);
BigQueryWriteSettings.Builder writeSettingsBuilder =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(channelProvider2);
writeClient = BigQueryWriteClient.create(writeSettingsBuilder.build());
}

@After
Expand Down Expand Up @@ -129,6 +157,63 @@ public void splitReadStreamTest() {
verifyHeaderSent(STREAM_NAME_PATTERN);
}

@Test
public void createWriteStreamTest() {
try {
writeClient.createWriteStream(
"projects/project/datasets/dataset/tables/table",
WriteStream.newBuilder().setType(WriteStream.Type.BUFFERED).build());
} catch (UnimplementedException e) {
// Ignore the error: none of the methods are actually implemented.
}
boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, PARENT_PATTERN);
assertWithMessage("Generated header was sent").that(headerSent).isTrue();
}

@Test
public void getWriteStreamTest() {
try {
writeClient.getWriteStream(WRITE_STREAM_NAME);
} catch (UnimplementedException e) {
// Ignore the error: none of the methods are actually implemented.
}
boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, NAME_PATTERN);
assertWithMessage("Generated header was sent").that(headerSent).isTrue();
}

// Following tests will work after b/185842996 is fixed.
// @Test
// public void appendRowsTest() {
// try {
// AppendRowsRequest req =
// AppendRowsRequest.newBuilder().setWriteStream(WRITE_STREAM_NAME).build();
// BidiStream<AppendRowsRequest, AppendRowsResponse> bidiStream =
// writeClient.appendRowsCallable().call();
// bidiStream.send(req);
// } catch (UnimplementedException e) {
// // Ignore the error: none of the methods are actually implemented.
// }
// boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN);
// assertWithMessage("Generated header was sent").that(headerSent).isTrue();
// }
//
// @Test
// public void appendRowsManualTest() {
// try {
// StreamWriterV2 streamWriter =
// StreamWriterV2.newBuilder(WRITE_STREAM_NAME, writeClient)
// .setWriterSchema(ProtoSchema.newBuilder().build())
// .build();
// streamWriter.append(ProtoRows.newBuilder().build(), 1);
// } catch (UnimplementedException e) {
// // Ignore the error: none of the methods are actually implemented.
// } catch (IOException e) {
// // Ignore the error: none of the methods are actually implemented.
// }
// boolean headerSent = channelProvider2.isHeaderSent(HEADER_NAME, WRITE_STREAM_PATTERN);
// assertWithMessage("Generated header was sent").that(headerSent).isTrue();
// }

private void verifyHeaderSent(Pattern... patterns) {
for (Pattern pattern : patterns) {
boolean headerSent = channelProvider.isHeaderSent(HEADER_NAME, pattern);
Expand Down