Skip to content

Commit

Permalink
Direct Writer 2
Browse files Browse the repository at this point in the history
  • Loading branch information
yirutang committed Apr 8, 2020
1 parent 39ea964 commit de2cb8c
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,21 @@
import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows;
import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoSchema;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.MessageLite;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Logger;

/**
* Writer that can help user to write data to BigQuery. This is a simplified version of the Write API.
* For users writing with COMMITTED stream and don't care about row deduplication, it is recommended to use this Writer.
* Writer that can help user to write data to BigQuery. This is a simplified version of the Write
* API. For users writing with COMMITTED stream and don't care about row deduplication, it is
* recommended to use this Writer.
*
* It supports message batching and flow control. It handles stream creation and schema update.
* <p>It supports message batching and flow control. It handles stream creation and schema update.
*
* <pre>{@code
* DataProto data1;
Expand All @@ -52,13 +48,32 @@ public class DirectWriter implements AutoCloseable {

/**
* Constructor of DirectWriter.
* @param tableName Name of the table for ingest in format of 'projects/{pid}/datasets/{did}/tables/{tid}'.
* @param messageDescriptor The descriptor of the input message, to be used to interpret the input messages.
*
* @param tableName Name of the table for ingest in format of
* 'projects/{pid}/datasets/{did}/tables/{tid}'.
* @param messageDescriptor The descriptor of the input message, to be used to interpret the input
* messages.
*/
public DirectWriter(Builder builder) throws Exception {
userSchema = ProtoSchemaConverter.convert(builder.userSchema);
writerCache = WriterCache.getInstance();
writer = writerCache.getWriter(builder.tableName);
StreamWriter writer1 = writerCache.getWriter(builder.tableName);
// If user specifies a different setting, then create a new writer according to the setting.
if ((builder.batchingSettings != null
&& builder.batchingSettings != writer1.getBatchingSettings())
|| (builder.retrySettings != null && builder.retrySettings != writer1.getRetrySettings())) {
StreamWriter.Builder writerBuilder = StreamWriter.newBuilder(writer1.getStreamNameString());
if (builder.batchingSettings != null
&& builder.batchingSettings != writer1.getBatchingSettings()) {
writerBuilder.setBatchingSettings(builder.batchingSettings);
}
if (builder.retrySettings != null && builder.retrySettings != writer1.getRetrySettings()) {
writerBuilder.setRetrySettings(builder.retrySettings);
}
writer1.close();
writer1 = writerBuilder.build();
}
writer = writer1;
}

@Override
Expand All @@ -67,11 +82,13 @@ public void close() {
}

/**
* The row is represented in proto buffer messages and it must be compatible to the table's schema in BigQuery.
* The row is represented in proto buffer messages and it must be compatible to the table's schema
* in BigQuery.
*
* @param protoRows rows in proto buffer format. They must be compatible with the schema set on the writer.
* @return A future that contains the offset at which the append happened. Only when the future returns with valid
* offset, then the append actually happened.
* @param protoRows rows in proto buffer format. They must be compatible with the schema set on
* the writer.
* @return A future that contains the offset at which the append happened. Only when the future
* returns with valid offset, then the append actually happened.
* @throws Exception
*/
public ApiFuture<Long> append(List<MessageLite> protoRows) throws Exception {
Expand All @@ -87,7 +104,7 @@ public ApiFuture<Long> append(List<MessageLite> protoRows) throws Exception {

return ApiFutures.<Storage.AppendRowsResponse, Long>transform(
writer.append(AppendRowsRequest.newBuilder().setProtoRows(data.build()).build()),
new ApiFunction<Storage.AppendRowsResponse, Long>(){
new ApiFunction<Storage.AppendRowsResponse, Long>() {
@Override
public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
return Long.valueOf(appendRowsResponse.getOffset());
Expand All @@ -97,40 +114,55 @@ public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
}

/**
* After this call, messages will be appended using the new schema. Note that user is responsible to keep
* the schema here in sync with the table's actual schema. If they ran out of date, the append may fail.
* User can keep trying, until the table's new schema is picked up.
* After this call, messages will be appended using the new schema. Note that user is responsible
* to keep the schema here in sync with the table's actual schema. If they ran out of date, the
* append may fail. User can keep trying, until the table's new schema is picked up.
*
* @param newSchema
* @throws IOException
* @throws InterruptedException
*/
public void updateSchema(Descriptors.Descriptor newSchema) throws IOException, InterruptedException {
public void updateSchema(Descriptors.Descriptor newSchema)
throws IOException, InterruptedException {
Preconditions.checkArgument(newSchema != null);
writer.refreshAppend();
userSchema = ProtoSchemaConverter.convert(newSchema);
}

public static DirectWriter.Builder newBuilder(String tableName, Descriptors.Descriptor userSchema) {
/** Returns the batch settings on the writer. */
public BatchingSettings getBatchSettings() {
return writer.getBatchingSettings();
}

/** Returns the retry settings on the writer. */
public RetrySettings getRetrySettings() {
return writer.getRetrySettings();
}

@VisibleForTesting
public int getCachedTableCount() {
return writerCache.cachedTableCount();
}

@VisibleForTesting
public int getCachedStreamCount(String tableName) {
return writerCache.cachedStreamCount(tableName);
}

public static DirectWriter.Builder newBuilder(
String tableName, Descriptors.Descriptor userSchema) {
return new DirectWriter.Builder(tableName, userSchema);
}

/** A builder of {@link DirectWriter}s. */
/** A builder of {@link DirectWriter}s.
* As of now, user can specify only the batch and retry settings, but not other common connection settings.
**/
public static final class Builder {
private final String tableName;
private final Descriptors.Descriptor userSchema;

// Connection settings
private static final int THREADS_PER_CPU = 5;
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
.build();
private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
TransportChannelProvider channelProvider =
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();

// {@code StreamWriter} settings, if null, default to the settings on {@code StreamWriter}.
// If null, default to the settings on the writer in the cache, which in term defaults to existing settings on
// {@code StreamWriter}.
RetrySettings retrySettings = null;
BatchingSettings batchingSettings = null;

Expand All @@ -139,24 +171,6 @@ private Builder(String tableName, Descriptors.Descriptor userSchema) {
this.userSchema = Preconditions.checkNotNull(userSchema);
}

/**
* {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
* API endpoint.
*
* <p>For performance, this client benefits from having multiple underlying connections. See
* {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}.
*/
public Builder setChannelProvider(TransportChannelProvider channelProvider) {
this.channelProvider = Preconditions.checkNotNull(channelProvider);
return this;
}

/** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider);
return this;
}

/** Sets the {@code BatchSettings} on the writer. */
public Builder setBatchingSettings(BatchingSettings batchingSettings) {
this.batchingSettings = Preconditions.checkNotNull(batchingSettings);
Expand All @@ -169,12 +183,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
return this;
}

/** Gives the ability to set a custom executor to be used by the library. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
return this;
}

/** Builds the {@code DirectWriter}. */
public DirectWriter build() throws Exception {
return new DirectWriter(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,16 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.threeten.bp.Duration;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.threeten.bp.Duration;

/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* This is to be used to managed streaming write when you are working with PENDING streams or want to explicitly
* manage offset. In that most common cases when writing with COMMITTED stream without offset, please use a simpler
* writer {@code DirectWriter}.
* <p>This is to be used to managed streaming write when you are working with PENDING streams or
* want to explicitly manage offset. In that most common cases when writing with COMMITTED stream
* without offset, please use a simpler writer {@code DirectWriter}.
*
* <p>A {@link StreamWrier} provides built-in capabilities to: handle batching of messages;
* controlling memory utilization (through flow control); automatic connection re-establishment and
Expand All @@ -75,7 +74,8 @@
public class StreamWriter implements AutoCloseable {
private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName());

private static String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/.*";
private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/.*";

private static Pattern streamPattern = Pattern.compile(streamPatternString);

Expand Down Expand Up @@ -119,7 +119,8 @@ public static long getApiMaxInflightRequests() {
private StreamWriter(Builder builder) throws Exception {
Matcher matcher = streamPattern.matcher(builder.streamName);
if (!matcher.matches()) {
throw new InvalidArgumentException(null, GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT), false);
throw new InvalidArgumentException(
null, GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT), false);
}
streamName = builder.streamName;
tableName = matcher.group(1);
Expand Down Expand Up @@ -809,7 +810,7 @@ public void onError(Throwable t) {
try {
// Establish a new connection.
streamWriter.refreshAppend();
} catch (IOException e) {
} catch (IOException | InterruptedException e) {
LOG.info("Failed to establish a new connection");
}
}
Expand All @@ -829,7 +830,7 @@ private static class MessagesBatch {
private int batchedBytes;
private final BatchingSettings batchingSettings;
private Boolean attachSchema = true;
final private String streamName;
private final String streamName;

private MessagesBatch(BatchingSettings batchingSettings, String streamName) {
this.batchingSettings = batchingSettings;
Expand All @@ -839,7 +840,8 @@ private MessagesBatch(BatchingSettings batchingSettings, String streamName) {

// Get all the messages out in a batch.
private InflightBatch popBatch() {
InflightBatch batch = new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema);
InflightBatch batch =
new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema);
this.attachSchema = false;
reset();
return batch;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Date;
import java.util.Map;
Expand All @@ -14,19 +15,18 @@
* minutes if not used. Code sample: WriterCache cache = WriterCache.getInstance(); StreamWriter
* writer = cache.getWriter(); // Use... cache.returnWriter(writer);
*/
public class StreamCache {
private static final Logger LOG = Logger.getLogger(StreamCache.class.getName());
public class WriterCache {
private static final Logger LOG = Logger.getLogger(WriterCache.class.getName());

private static StreamCache instance;
private static WriterCache instance;

private Duration expireTime = Duration.ofSeconds(300);
private ConcurrentHashMap<String, Map<String, Pair<StreamWriter, Long>>> cacheWithTimeout;

private final BigQueryWriteClient stub;
private final BigQueryWriteSettings stubSettings;
private final CleanerThread cleanerThread;

private StreamCache() throws Exception {
private WriterCache(Duration expireTime) throws Exception {
cacheWithTimeout = new ConcurrentHashMap<>();
stubSettings = BigQueryWriteSettings.newBuilder().build();
stub = BigQueryWriteClient.create(stubSettings);
Expand All @@ -41,9 +41,17 @@ public void run() {
});
}

public static StreamCache getInstance() throws Exception {
public static WriterCache getInstance() throws Exception {
if (instance == null) {
instance = new StreamCache();
instance = new WriterCache(Duration.ofMinutes(5));
}
return instance;
}

@VisibleForTesting
public static WriterCache getInstance(Duration expireTime) throws Exception {
if (instance == null) {
instance = new WriterCache(expireTime);
}
return instance;
}
Expand Down Expand Up @@ -123,6 +131,22 @@ public void returnWriter(StreamWriter writer) {
}
}

public int cachedTableCount() {
synchronized (cacheWithTimeout) {
return cacheWithTimeout.keySet().size();
}
}

public int cachedStreamCount(String tableName) {
synchronized (cacheWithTimeout) {
if (cacheWithTimeout.contains(tableName)) {
return cacheWithTimeout.get(tableName).values().size();
} else {
return 0;
}
}
}

private class CleanerThread extends Thread {
private long expiryInMillis;
private ConcurrentHashMap<String, Map<String, Pair<StreamWriter, Long>>> timeMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
package com.google.cloud.bigquery.storage.v1alpha2;

public class DirectWriterTest {
}
public class DirectWriterTest {}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
package com.google.cloud.bigquery.storage.v1alpha2;

public class WriterCacheTest {
}
public class WriterCacheTest {}
Loading

0 comments on commit de2cb8c

Please sign in to comment.