Skip to content

Commit

Permalink
feat: Direct writer (#165)
Browse files Browse the repository at this point in the history
* feat:Direct Writer

	new file:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
	new file:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
	new file:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
	new file:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java

    r 39ea964 feat:Direct Writer
    r de2cb8c feat:Direct Writer 2
    pick 8e67681 feat:direct writer 3
	new file:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
	new file:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
	new file:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
	new file:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java

* feat:Direct Writer 2

* feat:direct writer 3

	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/MockBigQueryWriteImpl.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java

* Fix a logging

* Add very basic schema compact check

	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java
	new file:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java
	new file:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java

* fix e2e

	modified:   google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java
	modified:   google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
  • Loading branch information
yirutang authored Apr 16, 2020
1 parent e739f5f commit ed718c1
Show file tree
Hide file tree
Showing 12 changed files with 1,211 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.api.core.*;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

/**
* Writer that can help user to write data to BigQuery. This is a simplified version of the Write
* API. For users writing with COMMITTED stream and don't care about row deduplication, it is
* recommended to use this Writer.
*
* <pre>{@code
* DataProto data;
* ApiFuture<Long> response = DirectWriter.<DataProto>append("projects/pid/datasets/did/tables/tid", Arrays.asList(data1));
* }</pre>
*
* <p>{@link DirectWriter} will use the credentials set on the channel, which uses application
* default credentials through {@link GoogleCredentials#getApplicationDefault} by default.
*/
public class DirectWriter {
private static final Logger LOG = Logger.getLogger(DirectWriter.class.getName());
private static WriterCache cache = null;
private static Lock cacheLock = new ReentrantLock();

/**
* Append rows to the given table.
*
* @param tableName table name in the form of "projects/{pName}/datasets/{dName}/tables/{tName}"
* @param protoRows rows in proto buffer format.
* @return A future that contains the offset at which the append happened. Only when the future
* returns with valid offset, then the append actually happened.
* @throws IOException, InterruptedException, InvalidArgumentException
*/
public static <T extends Message> ApiFuture<Long> append(String tableName, List<T> protoRows)
throws IOException, InterruptedException, InvalidArgumentException {
if (protoRows.isEmpty()) {
throw new InvalidArgumentException(
new Exception("Empty rows are not allowed"),
GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT),
false);
}
try {
cacheLock.lock();
if (cache == null) {
cache = WriterCache.getInstance();
}
} finally {
cacheLock.unlock();
}

StreamWriter writer = cache.getTableWriter(tableName, protoRows.get(0).getDescriptorForType());
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
Descriptors.Descriptor descriptor = null;
for (Message protoRow : protoRows) {
rowsBuilder.addSerializedRows(protoRow.toByteString());
}

AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
data.setWriterSchema(ProtoSchemaConverter.convert(protoRows.get(0).getDescriptorForType()));
data.setRows(rowsBuilder.build());

return ApiFutures.<Storage.AppendRowsResponse, Long>transform(
writer.append(AppendRowsRequest.newBuilder().setProtoRows(data.build()).build()),
new ApiFunction<Storage.AppendRowsResponse, Long>() {
@Override
public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
return Long.valueOf(appendRowsResponse.getOffset());
}
},
MoreExecutors.directExecutor());
}

@VisibleForTesting
public static void testSetStub(
BigQueryWriteClient stub, int maxTableEntry, SchemaCompact schemaCheck) {
cache = WriterCache.getTestInstance(stub, maxTableEntry, schemaCheck);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* A class that checks the schema compatibility between user schema in proto descriptor and Bigquery
* table schema. If this check is passed, then user can write to BigQuery table using the user
* schema, otherwise the write will fail.
*
* <p>The implementation as of now is not complete, which measn, if this check passed, there is
* still a possbility of writing will fail.
*/
public class SchemaCompact {
private BigQuery bigquery;
private static SchemaCompact compact;
private static String tablePatternString = "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)";
private static Pattern tablePattern = Pattern.compile(tablePatternString);

private SchemaCompact(BigQuery bigquery) {
this.bigquery = bigquery;
}

/**
* Gets a singleton {code SchemaCompact} object.
*
* @return
*/
public static SchemaCompact getInstance() {
if (compact == null) {
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
compact = new SchemaCompact(bigqueryHelper.getOptions().getService());
}
return compact;
}

/**
* Gets a {code SchemaCompact} object with custom BigQuery stub.
*
* @param bigquery
* @return
*/
@VisibleForTesting
public static SchemaCompact getInstance(BigQuery bigquery) {
return new SchemaCompact(bigquery);
}

private TableId getTableId(String tableName) {
Matcher matcher = tablePattern.matcher(tableName);
if (!matcher.matches() || matcher.groupCount() != 3) {
throw new IllegalArgumentException("Invalid table name: " + tableName);
}
return TableId.of(matcher.group(1), matcher.group(2), matcher.group(3));
}

/**
* Checks if the userSchema is compatible with the table's current schema for writing. The current
* implementatoin is not complete. If the check failed, the write couldn't succeed.
*
* @param tableName The name of the table to write to.
* @param userSchema The schema user uses to append data.
* @throws IllegalArgumentException the check failed.
*/
public void check(String tableName, Descriptors.Descriptor userSchema)
throws IllegalArgumentException {
Table table = bigquery.getTable(getTableId(tableName));
Schema schema = table.getDefinition().getSchema();
// TODO: We only have very limited check here. More checks to be added.
if (schema.getFields().size() != userSchema.getFields().size()) {
throw new IllegalArgumentException(
"User schema doesn't have expected field number with BigQuery table schema, expected: "
+ schema.getFields().size()
+ " actual: "
+ userSchema.getFields().size());
}
}
}
Loading

0 comments on commit ed718c1

Please sign in to comment.