diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java index c116bacefc..81cf835701 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriter.java @@ -10,25 +10,21 @@ import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoRows; import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto.ProtoSchema; import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest; -import com.google.cloud.bigquery.storage.v1alpha2.Storage.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; import com.google.protobuf.MessageLite; - import java.io.IOException; import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.logging.Logger; /** - * Writer that can help user to write data to BigQuery. This is a simplified version of the Write API. - * For users writing with COMMITTED stream and don't care about row deduplication, it is recommended to use this Writer. + * Writer that can help user to write data to BigQuery. This is a simplified version of the Write + * API. For users writing with COMMITTED stream and don't care about row deduplication, it is + * recommended to use this Writer. * - * It supports message batching and flow control. It handles stream creation and schema update. + *

It supports message batching and flow control. It handles stream creation and schema update. * *

{@code
  * DataProto data1;
@@ -52,13 +48,32 @@ public class DirectWriter implements AutoCloseable {
 
   /**
    * Constructor of DirectWriter.
-   * @param tableName Name of the table for ingest in format of 'projects/{pid}/datasets/{did}/tables/{tid}'.
-   * @param messageDescriptor The descriptor of the input message, to be used to interpret the input messages.
+   *
+   * @param tableName Name of the table for ingest in format of
+   *     'projects/{pid}/datasets/{did}/tables/{tid}'.
+   * @param messageDescriptor The descriptor of the input message, to be used to interpret the input
+   *     messages.
    */
   public DirectWriter(Builder builder) throws Exception {
     userSchema = ProtoSchemaConverter.convert(builder.userSchema);
     writerCache = WriterCache.getInstance();
-    writer = writerCache.getWriter(builder.tableName);
+    StreamWriter writer1 = writerCache.getWriter(builder.tableName);
+    // If user specifies a different setting, then create a new writer according to the setting.
+    if ((builder.batchingSettings != null
+            && builder.batchingSettings != writer1.getBatchingSettings())
+        || (builder.retrySettings != null && builder.retrySettings != writer1.getRetrySettings())) {
+      StreamWriter.Builder writerBuilder = StreamWriter.newBuilder(writer1.getStreamNameString());
+      if (builder.batchingSettings != null
+          && builder.batchingSettings != writer1.getBatchingSettings()) {
+        writerBuilder.setBatchingSettings(builder.batchingSettings);
+      }
+      if (builder.retrySettings != null && builder.retrySettings != writer1.getRetrySettings()) {
+        writerBuilder.setRetrySettings(builder.retrySettings);
+      }
+      writer1.close();
+      writer1 = writerBuilder.build();
+    }
+    writer = writer1;
   }
 
   @Override
@@ -67,11 +82,13 @@ public void close() {
   }
 
   /**
-   * The row is represented in proto buffer messages and it must be compatible to the table's schema in BigQuery.
+   * The row is represented in proto buffer messages and it must be compatible to the table's schema
+   * in BigQuery.
    *
-   * @param protoRows rows in proto buffer format. They must be compatible with the schema set on the writer.
-   * @return A future that contains the offset at which the append happened. Only when the future returns with valid
-   *         offset, then the append actually happened.
+   * @param protoRows rows in proto buffer format. They must be compatible with the schema set on
+   *     the writer.
+   * @return A future that contains the offset at which the append happened. Only when the future
+   *     returns with valid offset, then the append actually happened.
    * @throws Exception
    */
   public ApiFuture append(List protoRows) throws Exception {
@@ -87,7 +104,7 @@ public ApiFuture append(List protoRows) throws Exception {
 
     return ApiFutures.transform(
         writer.append(AppendRowsRequest.newBuilder().setProtoRows(data.build()).build()),
-        new ApiFunction(){
+        new ApiFunction() {
           @Override
           public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
             return Long.valueOf(appendRowsResponse.getOffset());
@@ -97,40 +114,55 @@ public Long apply(Storage.AppendRowsResponse appendRowsResponse) {
   }
 
   /**
-   * After this call, messages will be appended using the new schema. Note that user is responsible to keep
-   * the schema here in sync with the table's actual schema. If they ran out of date, the append may fail.
-   * User can keep trying, until the table's new schema is picked up.
+   * After this call, messages will be appended using the new schema. Note that user is responsible
+   * to keep the schema here in sync with the table's actual schema. If they ran out of date, the
+   * append may fail. User can keep trying, until the table's new schema is picked up.
+   *
    * @param newSchema
    * @throws IOException
    * @throws InterruptedException
    */
-  public void updateSchema(Descriptors.Descriptor newSchema) throws IOException, InterruptedException {
+  public void updateSchema(Descriptors.Descriptor newSchema)
+      throws IOException, InterruptedException {
     Preconditions.checkArgument(newSchema != null);
     writer.refreshAppend();
     userSchema = ProtoSchemaConverter.convert(newSchema);
   }
 
-  public static DirectWriter.Builder newBuilder(String tableName, Descriptors.Descriptor userSchema) {
+  /** Returns the batch settings on the writer. */
+  public BatchingSettings getBatchSettings() {
+    return writer.getBatchingSettings();
+  }
+
+  /** Returns the retry settings on the writer. */
+  public RetrySettings getRetrySettings() {
+    return writer.getRetrySettings();
+  }
+
+  @VisibleForTesting
+  public int getCachedTableCount() {
+    return writerCache.cachedTableCount();
+  }
+
+  @VisibleForTesting
+  public int getCachedStreamCount(String tableName) {
+    return writerCache.cachedStreamCount(tableName);
+  }
+
+  public static DirectWriter.Builder newBuilder(
+      String tableName, Descriptors.Descriptor userSchema) {
     return new DirectWriter.Builder(tableName, userSchema);
   }
 
-  /** A builder of {@link DirectWriter}s. */
+  /** A builder of {@link DirectWriter}s.
+   *  As of now, user can specify only the batch and retry settings, but not other common connection settings.
+   **/
   public static final class Builder {
     private final String tableName;
     private final Descriptors.Descriptor userSchema;
 
-    // Connection settings
-    private static final int THREADS_PER_CPU = 5;
-    ExecutorProvider executorProvider =
-        InstantiatingExecutorProvider.newBuilder()
-        .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors())
-        .build();
-    private CredentialsProvider credentialsProvider =
-        BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
-    TransportChannelProvider channelProvider =
-        BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
-
-    // {@code StreamWriter} settings, if null, default to the settings on {@code StreamWriter}.
+    // If null, default to the settings on the writer in the cache, which in term defaults to existing settings on
+    // {@code StreamWriter}.
     RetrySettings retrySettings = null;
     BatchingSettings batchingSettings = null;
 
@@ -139,24 +171,6 @@ private Builder(String tableName, Descriptors.Descriptor userSchema) {
       this.userSchema = Preconditions.checkNotNull(userSchema);
     }
 
-    /**
-     * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
-     * API endpoint.
-     *
-     * 

For performance, this client benefits from having multiple underlying connections. See - * {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}. - */ - public Builder setChannelProvider(TransportChannelProvider channelProvider) { - this.channelProvider = Preconditions.checkNotNull(channelProvider); - return this; - } - - /** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */ - public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { - this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider); - return this; - } - /** Sets the {@code BatchSettings} on the writer. */ public Builder setBatchingSettings(BatchingSettings batchingSettings) { this.batchingSettings = Preconditions.checkNotNull(batchingSettings); @@ -169,12 +183,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) { return this; } - /** Gives the ability to set a custom executor to be used by the library. */ - public Builder setExecutorProvider(ExecutorProvider executorProvider) { - this.executorProvider = Preconditions.checkNotNull(executorProvider); - return this; - } - /** Builds the {@code DirectWriter}. */ public DirectWriter build() throws Exception { return new DirectWriter(this); diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index e1729d1cd4..0471faa3c8 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -46,17 +46,16 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; -import org.threeten.bp.Duration; - import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.threeten.bp.Duration; /** * A BigQuery Stream Writer that can be used to write data into BigQuery Table. * - * This is to be used to managed streaming write when you are working with PENDING streams or want to explicitly - * manage offset. In that most common cases when writing with COMMITTED stream without offset, please use a simpler - * writer {@code DirectWriter}. + *

This is to be used to managed streaming write when you are working with PENDING streams or + * want to explicitly manage offset. In that most common cases when writing with COMMITTED stream + * without offset, please use a simpler writer {@code DirectWriter}. * *

A {@link StreamWrier} provides built-in capabilities to: handle batching of messages; * controlling memory utilization (through flow control); automatic connection re-establishment and @@ -75,7 +74,8 @@ public class StreamWriter implements AutoCloseable { private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName()); - private static String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/.*"; + private static String streamPatternString = + "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/.*"; private static Pattern streamPattern = Pattern.compile(streamPatternString); @@ -119,7 +119,8 @@ public static long getApiMaxInflightRequests() { private StreamWriter(Builder builder) throws Exception { Matcher matcher = streamPattern.matcher(builder.streamName); if (!matcher.matches()) { - throw new InvalidArgumentException(null, GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT), false); + throw new InvalidArgumentException( + null, GrpcStatusCode.of(Status.Code.INVALID_ARGUMENT), false); } streamName = builder.streamName; tableName = matcher.group(1); @@ -809,7 +810,7 @@ public void onError(Throwable t) { try { // Establish a new connection. streamWriter.refreshAppend(); - } catch (IOException e) { + } catch (IOException | InterruptedException e) { LOG.info("Failed to establish a new connection"); } } @@ -829,7 +830,7 @@ private static class MessagesBatch { private int batchedBytes; private final BatchingSettings batchingSettings; private Boolean attachSchema = true; - final private String streamName; + private final String streamName; private MessagesBatch(BatchingSettings batchingSettings, String streamName) { this.batchingSettings = batchingSettings; @@ -839,7 +840,8 @@ private MessagesBatch(BatchingSettings batchingSettings, String streamName) { // Get all the messages out in a batch. private InflightBatch popBatch() { - InflightBatch batch = new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema); + InflightBatch batch = + new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema); this.attachSchema = false; reset(); return batch; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java index ee2e0b4861..a3cf73da59 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCache.java @@ -1,5 +1,6 @@ package com.google.cloud.bigquery.storage.v1alpha2; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Date; import java.util.Map; @@ -14,19 +15,18 @@ * minutes if not used. Code sample: WriterCache cache = WriterCache.getInstance(); StreamWriter * writer = cache.getWriter(); // Use... cache.returnWriter(writer); */ -public class StreamCache { - private static final Logger LOG = Logger.getLogger(StreamCache.class.getName()); +public class WriterCache { + private static final Logger LOG = Logger.getLogger(WriterCache.class.getName()); - private static StreamCache instance; + private static WriterCache instance; - private Duration expireTime = Duration.ofSeconds(300); private ConcurrentHashMap>> cacheWithTimeout; private final BigQueryWriteClient stub; private final BigQueryWriteSettings stubSettings; private final CleanerThread cleanerThread; - private StreamCache() throws Exception { + private WriterCache(Duration expireTime) throws Exception { cacheWithTimeout = new ConcurrentHashMap<>(); stubSettings = BigQueryWriteSettings.newBuilder().build(); stub = BigQueryWriteClient.create(stubSettings); @@ -41,9 +41,17 @@ public void run() { }); } - public static StreamCache getInstance() throws Exception { + public static WriterCache getInstance() throws Exception { if (instance == null) { - instance = new StreamCache(); + instance = new WriterCache(Duration.ofMinutes(5)); + } + return instance; + } + + @VisibleForTesting + public static WriterCache getInstance(Duration expireTime) throws Exception { + if (instance == null) { + instance = new WriterCache(expireTime); } return instance; } @@ -123,6 +131,22 @@ public void returnWriter(StreamWriter writer) { } } + public int cachedTableCount() { + synchronized (cacheWithTimeout) { + return cacheWithTimeout.keySet().size(); + } + } + + public int cachedStreamCount(String tableName) { + synchronized (cacheWithTimeout) { + if (cacheWithTimeout.contains(tableName)) { + return cacheWithTimeout.get(tableName).values().size(); + } else { + return 0; + } + } + } + private class CleanerThread extends Thread { private long expiryInMillis; private ConcurrentHashMap>> timeMap; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java index 2cfbf1f77d..aa3063e164 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/DirectWriterTest.java @@ -1,4 +1,3 @@ package com.google.cloud.bigquery.storage.v1alpha2; -public class DirectWriterTest { -} +public class DirectWriterTest {} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java index 4387edbdf7..34b37072fa 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/WriterCacheTest.java @@ -1,4 +1,3 @@ package com.google.cloud.bigquery.storage.v1alpha2; -public class WriterCacheTest { -} +public class WriterCacheTest {} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java index f6a8d43ee9..86e203e8e9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -28,14 +28,15 @@ import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.protobuf.Int64Value; +import com.google.protobuf.MessageLite; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.logging.Logger; - -import com.google.protobuf.MessageLite; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -340,14 +341,19 @@ public void testSWStreamReconnect() throws Exception { assertEquals(1L, response.get().getOffset()); } } + @Test public void testMultipleDWMultiThread() throws Exception { FooType fa = FooType.newBuilder().setFoo("aaa").build(); FooType fb = FooType.newBuilder().setFoo("bbb").build(); - - try (DirectWriter writer = DirectWriter.newBuilder(tableId, FooType.getDescriptor()).build()) { - ApiFuture response = writer.append(new ArrayList<>(Arrays.asList((MessageLite)fa, (MessageLite)fb))); - assertEquals(0L, response.get().longValue()); + ExecutorService executor = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; i++) { + try (DirectWriter writer = + DirectWriter.newBuilder(tableId, FooType.getDescriptor()).build()) { + ApiFuture response = + writer.append(new ArrayList<>(Arrays.asList((MessageLite) fa, (MessageLite) fb))); + assertEquals(0L, response.get().longValue()); + } } } }